# To experiment with some deep learning models on the time series data given

In [42]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, TensorDataset

In [85]:
df = pd.read_csv("./data/Mastercard_stock_history.csv")

# df = df.set_index("Date")
# df.index = pd.to_datetime(df.index)
# df = df.asfreq("D")

# # forward fill na values
# df = df.ffill()
print(df.head())

df = df[["Close", "Volume", "High", "Low", "Open"]]
df.head()

         Date      Open      High       Low     Close     Volume  Dividends  \
0  2006-05-25  3.748967  4.283869  3.739664  4.279217  395343000        0.0   
1  2006-05-26  4.307126  4.348058  4.103398  4.179680  103044000        0.0   
2  2006-05-30  4.183400  4.184330  3.986184  4.093164   49898000        0.0   
3  2006-05-31  4.125723  4.219679  4.125723  4.180608   30002000        0.0   
4  2006-06-01  4.179678  4.474572  4.176887  4.419686   62344000        0.0   

   Stock Splits  
0           0.0  
1           0.0  
2           0.0  
3           0.0  
4           0.0  


Unnamed: 0,Close,Volume,High,Low,Open
0,4.279217,395343000,4.283869,3.739664,3.748967
1,4.17968,103044000,4.348058,4.103398,4.307126
2,4.093164,49898000,4.18433,3.986184,4.1834
3,4.180608,30002000,4.219679,4.125723,4.125723
4,4.419686,62344000,4.474572,4.176887,4.179678


## Create train and test set

In [86]:
train_df = df[:-100]
test_df = df[-100:]

In [81]:
class TimeSeriesLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout_rate):
        super().__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.bn = nn.BatchNorm1d(4)
        
        # Define the LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # Define the fully connected layer
        self.dense1 = nn.Linear(hidden_size, hidden_size * 2)
        self.dense2 = nn.Linear(hidden_size * 2, hidden_size * 2)
        self.dense3 = nn.Linear(hidden_size * 2, output_size)

        self.relu = nn.ReLU()
        self.elu = nn.ELU(alpha=0.5)
        self.lrelu = nn.LeakyReLU(negative_slope=0.02)
        self.sm = nn.Sigmoid()
        self.tanh = nn.Tanh()
        # add dropout layer
        self.dropout = nn.Dropout(dropout_rate)

        
    
    def forward(self, x):
        # Initialize hidden state and cell state
        # h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        # c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # # apply the dropout layer
        # x = self.dropout(x)
        x = self.bn(x)
        # Forward propagate LSTM
        op , _ = self.lstm(x)

        a1 = self.dense1(op)
        o1 = self.dropout(a1)
        a2 = self.relu(self.dense2(o1))
        # o2 = self.dropout(a2)
        a3 = self.dense3(a2)
        
        # Decode the hidden state of the last time stepout = self.fc(out[:, -1, :])
        return self.tanh(a3) 

In [46]:
class TimeSeriesDataSet(Dataset):
    def __init__(self, csv_path):
        super().__init__()
        self.data = pd.read_csv(csv_path)
        self.data = self.data.loc[:, 'Close'].values.reshape(-1, 1)
        self.seq_len = len(self.data)

    def __len__(self):
        return self.seq_len
    
    def __getitem__(self, idx):
        features = self.data[idx, :-1]
        output = self.data[idx, -1]
        return features, output 
    

def create_dataset(dataset, lookback):
    """Transform a time series into a prediction dataset
    
    Args:
        dataset: A numpy array of time series, first dimension is the time steps
        lookback: Size of window for prediction
    """
    X, y = [], []
    for i in range(len(dataset)-lookback):
        feature = dataset[i:i+lookback].reshape(lookback)
        target = dataset[i+lookback]

        X.append(feature)
        y.append(target)

    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)


def load_data(file_path, batch_size, shuffle=False):
    # Create the dataset
    dataset = TimeSeriesDataSet(file_path)
    
    # Create DataLoader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    
    return dataloader

In [48]:
train_df.to_numpy()

array([[  4.27921724],
       [  4.17967987],
       [  4.09316444],
       ...,
       [362.4145813 ],
       [362.18518066],
       [360.10025024]])

In [58]:
X_train, y_train = create_dataset(train_df.to_numpy(), lookback = 4)
print(X_train)
X_test, y_test = create_dataset(test_df.to_numpy(), lookback = 4)

tensor([[  4.2792,   4.1797,   4.0932,   4.1806],
        [  4.1797,   4.0932,   4.1806,   4.4197],
        [  4.0932,   4.1806,   4.4197,   4.3713],
        ...,
        [366.7141, 355.7509, 359.1426, 363.0231],
        [355.7509, 359.1426, 363.0231, 362.4146],
        [359.1426, 363.0231, 362.4146, 362.1852]])


In [83]:
model = TimeSeriesLSTM(
    input_size = 4,
    num_layers=5,
    hidden_size=64,
    output_size=1,
    dropout_rate=0.5
)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
# loader = load_data("data/processed/mastercard_stock_history_processed.csv", batch_size=8)
loader = DataLoader(TensorDataset(X_train, y_train), shuffle = False, batch_size = 8)
 
n_epochs = 100
for epoch in range(n_epochs):
    model.train()
    for X_batch, y_batch in loader:
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Validation
    if epoch % 1 != 0:
        continue
    model.eval()
    with torch.no_grad():
        y_pred = model(X_train)
        train_rmse = np.sqrt(loss_fn(y_pred, y_train))
        y_pred = model(X_test)
        test_rmse = np.sqrt(loss_fn(y_pred, y_test))
    print("Epoch %d: train RMSE %.4f, test RMSE %.4f" % (epoch, train_rmse, test_rmse))

Epoch 0: train RMSE 138.5047, test RMSE 362.2318


KeyboardInterrupt: 