In [1]:
import re
import numpy as np
import pandas_ta as pta
import pandas as pd
import yfinance as yf
import pytz
import joblib

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')
plt.style.use("fivethirtyeight")
%matplotlib inline

# For time stamps
from datetime import datetime# Get the stock quote
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn import set_config; set_config(display='diagram')
from sklearn.compose import ColumnTransformer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM

In [2]:
def get_sma(df, period=5, column='Close'):
    '''returns simple moving average of provide column and period'''
    return pta.sma(df[column],length=period)

def get_ema(df, period=10 , column='Close'):
    '''returns simple moving average of provide column and period'''
    return pta.ema(df[column], length=period)

def get_hma(df, period=10 ,column='Close'):
    '''returns simple moving average of provide column and period'''
    return pta.hma(df[column], length=period)

def get_rsi(df,period=14):
    '''returns relative strength index of provided period'''
    return pta.rsi(df['Close'], length = period)

def get_atr(df,period=14):
    '''returns average true range of provided period'''
    return pta.atr(df['High'],df['Low'],df['Close'],length=period)

def get_bband(df,period=20,std=2):
    '''returns Upper , Lower and Middle bolinger band of provided period and std'''
    return pta.bbands(df['Close'],length=period,std=std)

def get_macd(df,fast=12, slow=26, signal=9):
    '''returns Moving average convergence divergence (MACD)'''
    return pta.macd(df['Close'],fast=fast, slow=slow, signal=signal)

def get_adx(df,length=14):
    '''returns ADX of provided period'''
    return pta.adx(df['High'],df['Low'],df['Close'],length=length)

def get_vwap(df):
    '''returns Voumne weighted average'''
    return pta.vwap(df['High'],df['Low'],df['Close'], df['Volume'])


def get_donchian(df, lower_length=20, upper_length=20):
    '''returns Voumne weighted average'''
    return pta.donchian(df['High'],
                        df['Low'],
                        lower_length=20,
                        upper_length=20)

In [3]:
# create a TimeFeaturesEncoder

class TimeFeaturesEncoder(BaseEstimator, TransformerMixin):
    """
        Extracts the day of week (dow), the hour, the month and the year from a time column.
        Returns a copy of the DataFrame X with only four columns: 'dow', 'hour', 'month', 'year'.
    """

    def __init__(self, time_column, time_zone_name='UTC'):
        self.time_column = time_column
        self.time_zone_name = time_zone_name

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        assert isinstance(X, pd.DataFrame)
        X_ = X.copy()
        X_.index = pd.to_datetime(X[self.time_column])
        X_.index = X_.index.tz_localize(pytz.timezone(self.time_zone_name))
        X_.index = X_.index.tz_convert(self.time_zone_name)
        X_["weekday"] = X_.index.weekday
        X_["hour"] = X_.index.hour
        X_["month"] = X_.index.month
        X_["year"] = X_.index.year
        return X_[['weekday', 'month', 'year']]

In [4]:
def get_technical(symbol="INFY.NS",period = '5y'):
  '''returns a DataFrame with stock technical data'''
  ticker = yf.Ticker(symbol)
    
  df = ticker.history(period = period)
  df.drop(columns=['Dividends','Stock Splits'],inplace=True)
  df['ema12'] = get_ema(df, column='Close', period=12)
  df['ema21'] = get_ema(df, column='Close', period=21)
  df['ema26'] = get_ema(df, column='Close', period=26)
  df['ema34'] = get_ema(df, column='Close', period=34)
  df['ema55'] = get_ema(df, column='Close', period=55)
  df['ema99'] = get_ema(df, column='Close', period=99)
  df['ema200'] = get_ema(df, column='Close', period=200)
  df['hma12'] = get_hma(df, column='Close', period=12)
  df['hma21'] = get_hma(df, column='Close', period=21)
  df['hma26'] = get_hma(df, column='Close', period=26)
  df['hma34'] = get_hma(df, column='Close', period=34)
  df['hma55'] = get_hma(df, column='Close', period=55)
  df['hma99'] = get_hma(df, column='Close', period=99)
  df['hma200'] = get_hma(df, column='Close', period=200)
  df['rsi'] = get_rsi(df, period=14)
  df['atr'] = get_atr(df, period=14)
  df['bb_upper'] = get_bband(df, period=20, std=2)['BBU_20_2.0']
  df['bb_lower'] = get_bband(df, period=20, std=2)['BBL_20_2.0']
  df['macd_signal'] = get_macd(df, fast=12, slow=26, signal=9)['MACD_12_26_9']
  df['macd_line'] = get_macd(df, fast=12, slow=26, signal=9)['MACDs_12_26_9']
  df['adx'] = get_adx(df, length=14)['ADX_14']
  df['vwap'] = get_vwap(df)

  return df

In [22]:
def clean_data(df, test=False):
    '''returns a DataFrame without outliers and missing values'''
    df = df.dropna(how='any')
    #df = df.reset_index()
    return df

def compute_mpe(y_pred, y_true):
    return abs(y_pred / y_true).mean()
    


def set_pipeline(cleaned_data):
    '''returns a pipelined model'''
    data_pipe = Pipeline([('stdscaler', StandardScaler())])
    preproc_pipe = ColumnTransformer([
        ('data', data_pipe, cleaned_data.columns)
    ], remainder="drop")


    pipe = Pipeline([
        ('preproc', preproc_pipe)])
    
    scaled_data = pipe.fit_transform(cleaned_data)
    return scaled_data,pipe


def split_timeseries(scaled_data,X):
  # Create the training data set 
  # Create the scaled training data set
  train_data = scaled_data[0:int(scaled_data.shape[0]*0.80), :]
  X = scaled_data[0:int(scaled_data.shape[0]*0.80), :]
  # Split the data into x_train and y_train data sets
  x_train = []
  y_train = []

  for i in range(60, len(train_data)):
      x_train.append(train_data[i-60:i, 0])
      y_train.append(X[i, 0])
      if i<= 61:
          print(x_train)
          print(y_train)
          print()
          
  # Convert the x_train and y_train to numpy arrays 
  x_train, y_train = np.array(x_train), np.array(y_train)

  # Reshape the data
  x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
  # x_train.shape

  test_data = scaled_data[int(scaled_data.shape[0]*0.80) - 60: , :]
  X = scaled_data[int(scaled_data.shape[0]*0.80) - 60: , :]
  # Create the data sets x_test and y_test
  x_test = []
  y_test = scaled_data[int(scaled_data.shape[0]*0.80):, :]
  for i in range(60, len(test_data)):
      x_test.append(test_data[i-60:i, 0])
      
  # Convert the data to a numpy array
  x_test = np.array(x_test)

  # Reshape the data
  x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1 ))

  return x_train,y_train,x_test,y_test


def split_predict(scaled_data,X):
  # Create the training data set 
  # Create the scaled training data set
  train_data = scaled_data
  # Split the data into x_train and y_train data sets
  x_train = []
  y_train = []

  for i in range(60, len(train_data)):
      x_train.append(train_data[i-60:i, 0])
      y_train.append(X[i, 0])
      if i<= 61:
          print(x_train)
          print(y_train)
          print()
          
  # Convert the x_train and y_train to numpy arrays 
  x_train, y_train = np.array(x_train), np.array(y_train)

  # Reshape the data
  x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
  # x_train.shape

  return x_train,y_train


# Function to create model, required for KerasClassifier
def create_model(x_train):
  model = Sequential()
  model.add(LSTM(128, return_sequences=True, input_shape= (x_train.shape[1], 1)))
  model.add(LSTM(64, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  # Compile the model
  model.compile(optimizer='adam', loss='mean_squared_error')
  return model

# implement train() function
def train(x_train, y_train, model):
    '''returns a trained pipelined model'''
    model.fit(x_train, y_train)
    return model

# implement evaluate() function
def evaluate(x_test, y_test, model):
    '''returns the value of the RMSE'''
    y_pred = model.predict(x_test)
    mpe = compute_mpe(y_pred, y_test)
    return mpe

def save_model(model):
  """Save the model into a .joblib format"""
  folder_name  = datetime.now().strftime('%m_%d_%Y_%H_%M_%S') 
  joblib.dump(model, f'model.joblib')
  print("model.joblib saved locally", "green")


def get_prediction(symbol="INFY.NS"):
  """Save the model into a .joblib format"""
  df = get_technical(symbol=symbol,period = '5y')
  cleaned_data = clean_data(df)
  scaled_data,pipe = set_pipeline(cleaned_data)
  X = cleaned_data.to_numpy()[-61:,:]
  scaled_data = scaled_data[-61:,:]
  X,y = split_predict(scaled_data,X)
  #Load model trainned model in previous stage to predict future price
  model = joblib.load('model.joblib')
  results = model.predict(X)
  pred = float(results[0])
  return pred

In [23]:
#Get the data
data = get_technical(symbol="INFY.NS",period = '5y')
#Clean the data
cleaned_data = clean_data(data)
#Scale the data
scaled_data,pipe = set_pipeline(cleaned_data)
#Split data in trainning and testing
x_train,y_train,x_test,y_test = split_timeseries(scaled_data,data)
#Create Model
model = create_model(x_train)
#Train Model
model = train(x_train, y_train, model)
#Evaluate Model
mpe = evaluate(x_test, y_test, model)
#Print Root Mean Square Error
print(mpe)
#Save Model
save_model(model)

[array([-1.14173231, -1.14978397, -1.15590315, -1.14559713, -1.12938636,
       -1.1220861 , -1.11301471, -1.08021764, -1.08644412, -1.08510217,
       -1.03829506, -1.0166629 , -1.01666309, -1.03045806, -1.0059274 ,
       -0.98467086, -0.9769413 , -0.99352773, -0.98026933, -1.00147215,
       -1.01966885, -1.02713016, -1.02203092, -1.0650266 , -1.04779608,
       -1.04049607, -1.04564895, -1.04908445, -1.0585316 , -1.06175237,
       -1.03813396, -1.03448405, -1.06094733, -1.01397916, -1.01607256,
       -0.9968021 , -1.01022157, -1.01671672, -1.01387205, -0.99025359,
       -0.99975437, -0.99782191, -1.00592699, -1.01236878, -0.99803672,
       -0.98338262, -0.97887383, -0.97908845, -0.97908822, -0.97694158,
       -0.99626526, -1.01279803, -0.9898239 , -0.99572856, -1.01451577,
       -0.9931519 , -1.00055952, -1.0104898 , -1.02095707, -1.02772074])]
[-1.0177366191489245]

[array([-1.14173231, -1.14978397, -1.15590315, -1.14559713, -1.12938636,
       -1.1220861 , -1.11301471, -1.0

NotImplementedError: Cannot convert a symbolic Tensor (lstm_5/strided_slice:0) to a numpy array. This error may indicate that you're trying to pass a Tensor to a NumPy call, which is not supported

In [24]:
#Get Prediction
get_prediction()

[array([1.84392252, 1.84392252, 1.97770569, 1.87969343, 1.81077474,
       1.88684762, 1.94885054, 1.95123527, 1.96804772, 1.88183975,
       1.91546435, 1.95755486, 2.09431894, 2.05067831, 2.04900911,
       2.10624258, 2.12114713, 2.22059016, 2.18970806, 2.23907181,
       2.24574916, 2.24813389, 2.26124989, 2.25588425, 2.28140073,
       2.28509717, 2.11339676, 2.08346831, 2.0823953 , 2.17778442,
       2.21093219, 2.29702081, 2.24217207, 2.37702855, 2.3377996 ,
       2.32325282, 2.15155241, 2.03470074, 1.96315891, 1.8715853 ,
       1.81292105, 1.76284177, 1.8510767 , 1.96578205, 2.02039238,
       1.9965451 , 1.88446289, 1.86109244, 1.83199888, 1.89161707,
       1.94086176, 1.90354071, 1.78430432, 1.80576687, 1.89936744,
       1.91307962, 1.83199888, 1.81053633, 1.80791318, 1.92738799])]
[1690.0999755859375]



FileNotFoundError: [Errno 2] No such file or directory: 'model.joblib'