In [None]:
import pandas as pd
import torch

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import datetime as dt

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available()  else 'cpu')
device

# Import Dataset
Examine the data a little

In [None]:
#stock_data = pd.read_csv("./NVDA.csv", index_col="Date")
maindf = pd.read_csv("./TSLA.csv")
maindf = maindf.rename(columns={'Date': 'date','Open':'open','High':'high','Low':'low','Close':'close',
                                'Adj Close':'adj_close','Volume':'volume'})
maindf.shape, maindf.head()

Check for any null or empty values in the data

In [None]:
print("Null values:", maindf.isnull().values.sum())
print("NA values:", maindf.isna().values.any())

# Preprocess Data

In [None]:
# convert date field from string to Date format 
print(maindf['date'].head)
maindf['date'] = pd.to_datetime(maindf.date)
maindf['date'].head()

# Data Exploration

In [None]:
print("Starting date: ",maindf.iloc[0][0])
print("Ending date: ", maindf.iloc[-1][0])
print("Duration: ", maindf.iloc[-1][0]-maindf.iloc[0][0])

In [None]:
plt.figure(figsize=(12,5))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=30))
plt.gcf().autofmt_xdate()
plt.plot(maindf['date'], maindf['close'], label="Close")
plt.plot(maindf['date'], maindf['open'], label="Open")
plt.xlabel("Time Scale")
plt.ylabel("USD")
plt.legend()
plt.show()

In [None]:
from itertools import cycle
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

# Preparing Closing Price - target data

In [None]:
closedf = maindf[['date','close']]
print("Shape of close dataframe:", closedf.shape)
closedf.head()

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, explained_variance_score, r2_score 
from sklearn.metrics import mean_poisson_deviance, mean_gamma_deviance, accuracy_score
from sklearn.preprocessing import MinMaxScaler
import numpy as np

In [None]:
del closedf['date']
closedf.head()

In [None]:
scaler=MinMaxScaler(feature_range=(0,1))
closedf=scaler.fit_transform(np.array(closedf).reshape(-1,1))
print(closedf.shape)
closedf[:5], closedf.min(), closedf.max()

#Prepare train and test data

In [None]:
training_size=int(len(closedf)*0.60)
test_size=len(closedf)-training_size
train_data,test_data=closedf[0:training_size,:],closedf[training_size:len(closedf),:1]
print("train_data: ", train_data.shape)
print("test_data: ", test_data.shape)

In [None]:
# convert an array of values into a dataset matrix
def create_dataset(dataset, time_step=1):
    dataX, dataY = [], []
    for i in range(len(dataset)-time_step-1):
        a = dataset[i:(i+time_step), 0]   ###i=0, 0,1,2,3-----99   100 
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])
    return np.array(dataX), np.array(dataY)

In [None]:
time_step = 15
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)

print("X_train: ", X_train.shape)
print("y_train: ", y_train.shape)
print("X_test: ", X_test.shape)
print("y_test", y_test.shape)

In [None]:
# reshape input to be [samples, time steps, features] which is required for LSTM
X_train =X_train.reshape(X_train.shape[0],X_train.shape[1] , 1)
X_test = X_test.reshape(X_test.shape[0],X_test.shape[1] , 1)

print("X_train: ", X_train.shape)
print("X_test: ", X_test.shape)

Convert the data to tensors

In [None]:
X_train = torch.from_numpy(X_train).type(torch.Tensor)
X_test = torch.from_numpy(X_test).type(torch.Tensor)
y_train = torch.from_numpy(y_train).type(torch.Tensor)
y_test = torch.from_numpy(y_test).type(torch.Tensor)
y_train.unsqueeze_(dim=1)
y_test.unsqueeze_(dim=1)

In [None]:
print('x_train.shape = ',X_train.shape)
print('y_train.shape = ',y_train.shape)
print('x_test.shape = ',X_test.shape)
print('y_test.shape = ',y_test.shape)

# Build the Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:


# Here we define our model as a class
class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTM, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim

        # Number of hidden layers
        self.num_layers = num_layers

        # batch_first=True causes input/output tensors to be of shape
        # (batch_dim, seq_dim, feature_dim)
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)

        # Readout layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()

        # Initialize cell state
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()

        # We need to detach as we are doing truncated backpropagation through time (BPTT)
        # If we don't, we'll backprop all the way to the start even after going through another batch
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        # Index hidden state of last time step
        # out.size() --> 100, 32, 100
        # out[:, -1, :] --> 100, 100 --> just want last time step hidden states! 
        out = self.fc(out[:, -1, :]) 
        # out.size() --> 100, 10
        return out


In [None]:
input_dim = 1
hidden_dim = 32
num_layers = 2 
output_dim = 1

torch.manual_seed(42)

model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)

loss_fn = torch.nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.05)

print(model.parameters())
print(model)
print(len(list(model.parameters())))
for i in range(len(list(model.parameters()))):
    print 
    print(list(model.parameters())[i].size())

In [None]:
# Train model
num_epochs = 95
hist = { 
    "loss" : np.zeros(num_epochs),
    "val_loss" : np.zeros(num_epochs)
}

for t in range(num_epochs):
    # Forward pass
    y_train_pred = model(X_train)

    loss = loss_fn(y_train_pred, y_train)
    hist["loss"][t] = loss.item()

    # Zero out gradient, else they will accumulate between epochs
    optimiser.zero_grad()
    # Backward pass
    loss.backward()
    # Update parameters
    optimiser.step()

    # validation
    model.eval()
    y_test_pred = model(X_test)
    val_loss = loss_fn(y_test_pred, y_test)
    hist["val_loss"][t] = val_loss.item()
    model.train()

    if t % 10 == 0 and t !=0:
        #print("Epoch ", t, "train MSE: ", loss.item())
        print(f"Epoch {t}  train MSE {loss.item():2.5f}  val MSE {val_loss.item():2.5f}")

print(f"Epoch {t}  train MSE {loss.item():2.5f}  val MSE {val_loss.item():2.5f}")
    

In [None]:
epochs = range(num_epochs)
plt.plot(epochs, hist["loss"], 'r', label="Training loss")
plt.plot(epochs, hist["val_loss"], 'b', label="Validation loss")
plt.title("training and validation loss")
plt.legend()
plt.figure()
plt.show()

# Model Evaluation

In [None]:
### Lets Do the prediction and check performance metrics
train_predict=model(X_train)
test_predict=model(X_test)
train_predict.shape, test_predict.shape

In [None]:
train_predict = scaler.inverse_transform(train_predict.detach().numpy())
test_predict = scaler.inverse_transform(test_predict.detach().numpy())
original_ytrain = scaler.inverse_transform(y_train.detach().numpy()) 
original_ytest = scaler.inverse_transform(y_test.detach().numpy()) 

In [None]:
import math

In [None]:
# Evaluation metrices RMSE and MAE
print("Train data RMSE: ", math.sqrt(mean_squared_error(original_ytrain,train_predict)))
print("Train data MSE: ", mean_squared_error(original_ytrain,train_predict))
print("Train data MAE: ", mean_absolute_error(original_ytrain,train_predict))
print("-------------------------------------------------------------------------------------")
print("Test data RMSE: ", math.sqrt(mean_squared_error(original_ytest,test_predict)))
print("Test data MSE: ", mean_squared_error(original_ytest,test_predict))
print("Test data MAE: ", mean_absolute_error(original_ytest,test_predict))

### Explained variance regression score
The explained variance score explains the dispersion of errors of a given dataset, and the formula is written as follows: Here, and Var(y) is the variance of prediction errors and actual values respectively. Scores close to 1.0 are highly desired, indicating better squares of standard deviations of errors.

In [None]:
print("Train data explained variance regression score:", explained_variance_score(original_ytrain, train_predict))
print("Test data explained variance regression score:", explained_variance_score(original_ytest, test_predict))

<a name="r2"></a>

### R<sup>2</sup> score for regression

R-squared (R2) is a statistical measure that represents the proportion of the variance for a dependent variable that's explained by an independent variable or variables in a regression model.

1 = Best <br>
0 or < 0 = worse

In [None]:
print("Train data R2 score:", r2_score(original_ytrain, train_predict))
print("Test data R2 score:", r2_score(original_ytest, test_predict))

<a name="cp"></a>

# Comparision of original stock close price and predicted close price

In [None]:
from itertools import cycle
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

In [None]:
time_step

In [None]:
# shift train predictions for plotting

look_back=time_step
trainPredictPlot = np.empty_like(closedf)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(train_predict)+look_back, :] = train_predict
print("Train predicted data: ", trainPredictPlot.shape)

# shift test predictions for plotting
testPredictPlot = np.empty_like(closedf)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(train_predict)+(look_back*2)+1:len(closedf)-1, :] = test_predict
print("Test predicted data: ", testPredictPlot.shape)

names = cycle(['Original close price','Train predicted close price','Test predicted close price'])


plotdf = pd.DataFrame({'date': maindf['date'],
                       'original_close': maindf['close'],
                      'train_predicted_close': trainPredictPlot.reshape(1,-1)[0].tolist(),
                      'test_predicted_close': testPredictPlot.reshape(1,-1)[0].tolist()})

fig = px.line(plotdf,x=plotdf['date'], y=[plotdf['original_close'],plotdf['train_predicted_close'],
                                          plotdf['test_predicted_close']],
              labels={'value':'Stock price','date': 'Date'})
fig.update_layout(title_text='Comparision between original close price vs predicted close price',
                  plot_bgcolor='white', font_size=15, font_color='black', legend_title_text='Close Price')
fig.for_each_trace(lambda t:  t.update(name = next(names)))

fig.update_xaxes(showgrid=True)
fig.update_yaxes(showgrid=False)
fig.show()