In [None]:
import os
import warnings
import datetime
import pandas as pd
import numpy as np
from numpy.random import seed
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler
import seaborn as sns
import matplotlib.pyplot as plt
plt.rcParams['figure.facecolor'] = 'white'
warnings.simplefilter('ignore')


In [None]:
class DataFormatting():
  
    def __init__(self):
       
        self.df_data = None
        self.df_datetime = None

    def dataset(df):

        # converting time colum from object type to datetime format
        df['time'] = pd.to_datetime(df['time'])
        # splitting the dataframe in to X and y 
        df_data = df[['open','high','low','close','tick_volume']]
        df_datetime =df[['time']]

        return df_data, df_datetime


data = pd.read_csv('../data/gold_mt5.csv',index_col=[0]) 

data_init = DataFormatting()
df_data, _ = DataFormatting.dataset(data)
print(df_data.head())


In [None]:
def train_test_split(data, train_split=0.9):

    """ This function will split the dataframe into training and testing set.
    Inputs: data: Pandas DatFrame
            train_split: default is set to 0.9. Its a ratio to split the trining and testing datset.
    """
    split = int(train_split*len(data)) # for training
    X_train = data.iloc[:split,:]
    X_test = data.iloc[split:,:]

    return X_train, X_test

X_train, X_test = train_test_split(df_data, train_split=0.9)


In [None]:
class Normalize():

    """ class Normalize uses standard scaler method to normalize the dataset"""
    def __init__(self):

        self.data_fit_transformed = None
        self.data_inverse_transformed = None

    def fit_transform(self, data):

        # initialize StandartScaler()
        scaler = StandardScaler()
        # fit the method on the dataset
        scaler = scaler.fit(data)
        # transform the dataset
        data_fit_transformed = scaler.transform(data)

        return data_fit_transformed

    def inverse_transform(self, data):

        # initialize StandartScaler()
        scaler = StandardScaler()
        # inverse transform the dataset
        data_inverse_transformed = scaler.inverse_transform(data)
        
        return data_inverse_transformed

# normalize
scaler_init = Normalize()
scaled_data = scaler_init.fit_transform(X_train)
print(scaled_data[0:11])


In [None]:
def data_transformation(data, lags = 5):

    """ this function transforms dataframe to required input shape for the model.
    It required 2 input arguments:
    1. data: this will be the pandas dataframe
    2. lags: how many previous price points to be used to predict the next future value, in
    this case the default is set to 5 for 'XAUUSD' commodity"""

    # initialize lists to store the dataset
    X_data = []
    y_data = []
    
    for i in range(lags, len(data)):
        X_data.append(data[i-lags: i, 0: data.shape[1]])
        y_data.append(data[i,3:4]) # extracts close price with specific lag as price to be predicted.

    # convert the list to numpy array

    X_data = np.array(X_data)
    y_data = np.array(y_data)

    return X_data, y_data


X_data, y_data = data_transformation(scaled_data, lags = 5)


In [None]:
print(X_data.shape[0],X_data.shape[1],X_data.shape[2],y_data.shape[1])


In [None]:
class LSTM_model():
    seed(42)
    tf.random.set_seed(42) 

    def __init__(self,n_hidden_layers, units, dropout, train_data_X, train_data_y, epochs):

        self.n_hidden_layers = n_hidden_layers
        self.units = units
        self.dropout = dropout
        self.train_data_X = train_data_X
        self.train_data_y = train_data_y
        self.epochs = epochs

    def build_model(self):
        
        model = Sequential()
        # first lstm layer
        model.add(LSTM(self.units, activation='relu', input_shape=(self.train_data_X.shape[1], self.train_data_X.shape[2]), return_sequences=True))
        # building hidden layers
        for i in range(1, self.n_hidden_layers):
            # for the last layer as the return sequence is False
            if i == self.n_hidden_layers -1:
                model.add(LSTM(int(self.units/(2**i)),  activation='relu', return_sequences=False))
            else:
                model.add(LSTM(int(self.units/(2**i)),  activation='relu', return_sequences=True))
        # adding droupout layer
        model.add(Dropout(self.dropout))
        # final layer
        model.add(Dense(self.train_data_y.shape[1]))
        return model
        #model.summary()

    def fit_lstm(self):

        lstm_model = self.build_model()
        metrics = [tf.keras.metrics.RootMeanSquaredError(), tf.keras.metrics.MeanAbsoluteError(), tf.keras.metrics.MeanAbsolutePercentageError()]
        lstm_model.compile(optimizer=Adam(learning_rate = 0.0001), loss='mse', metrics = metrics)
        
        model_name = 'lstm_'+ str(self.units)
        path_model = f"../Model_Outputs/model_lstm"
        path_metric = f"../Model_Outputs/model_lstm"
        cb = [
            tf.keras.callbacks.ModelCheckpoint(path_model+'/'+model_name),
            tf.keras.callbacks.CSVLogger(path_metric+'/'+'data.csv'),
            tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)]
        tf.logging.set_verbosity(tf.logging.ERROR)
        history = lstm_model.fit(self.train_data_X,self.train_data_y, 
                            epochs = self.epochs, 
                            batch_size = 8, 
                            validation_split=0.2, 
                            verbose = 1,
                            callbacks=[cb],
                            shuffle= False)


n_hidden_layers = 3
units = 128
dropout = 0.2
train_data_X = X_data 
train_data_y = y_data
epochs = 500


model = LSTM_model(n_hidden_layers, units, dropout, train_data_X, train_data_y, epochs)

summary = model.fit_lstm()
print(summary)






In [None]:
def metricplot(df, xlab, ylab_1,ylab_2, path):
    
    """
    This function plots metric curves and saves it
    to respective folder
    inputs: df : pandas dataframe 
            xlab: x-axis
            ylab_1 : yaxis_1
            ylab_2 : yaxis_2
            path: full path for saving the plot
            """
    plt.figure()
    sns.set_theme(style="darkgrid")
    sns.lineplot(x = df[xlab], y = df[ylab_1])
    sns.lineplot(x = df[xlab], y = df[ylab_2])
    plt.xlabel('Epochs',fontsize = 12)
    plt.ylabel(ylab_1,fontsize = 12)
    plt.xticks(fontsize = 12)
    plt.yticks(fontsize = 12)
    plt.legend([ylab_1,ylab_2], prop={"size":12})
    plt.savefig(path+'/'+ ylab_1)
    #plt.show()


In [None]:
data_met = pd.read_csv('../Model_Outputs/model_lstm/data.csv')
data_met

In [None]:
path = '../Model_Outputs/model_lstm'
df = pd.read_csv('../Model_Outputs/model_lstm/data.csv')

metricplot(df, 'epoch', 'loss','val_loss', path)
metricplot(df, 'epoch', 'mean_absolute_error','val_mean_absolute_error', path)
metricplot(df, 'epoch', 'mean_absolute_percentage_error','val_mean_absolute_percentage_error', path)
metricplot(df, 'epoch', 'root_mean_squared_error','val_root_mean_squared_error', path)
