In [1]:
# Importing necessary libraries
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
import plotly.graph_objects as go
from yfinance import download

In [30]:
stock_list =["NIO","XPEV","JOBY","TSLA","LI"]
tick = stock_list[4]
tick

'LI'

In [31]:
# Downloading historical stock market data for the company 'NIO'
# from January 1, 2020, to August 13, 2023, using the 'yfinance' library
data = download(tick, start='2010-01-01', end='2023-11-20')
# Calculate technical indicators (Moving Averages and RSI)
sma_period = 30
data['SMA'] = data['Close'].rolling(window=sma_period).mean()
data['RSI'] = 100 - (100 / (1 + (data['Close'].diff(1).fillna(0).apply(lambda x: max(0, x)).rolling(window=sma_period).mean() / data['Close'].diff(1).fillna(0).apply(lambda x: abs(x)).rolling(window=sma_period).mean())))


[*********************100%%**********************]  1 of 1 completed


In [32]:

# Extracting relevant data columns: Closing prices and Volume
# We focus on the 'Close' column, which represents the stock's closing price,
# and the 'Volume' column, which represents the trading volume of the stock.
df = data[['Close','Volume','SMA','RSI']][sma_period:]
df

Unnamed: 0_level_0,Close,Volume,SMA,RSI
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-09-11,16.209999,3915000,16.698333,33.147942
2020-09-14,17.469999,6429200,16.747333,34.375664
2020-09-15,18.190001,9376700,16.808333,34.611355
2020-09-16,17.520000,5713800,16.836000,33.912861
2020-09-17,17.260000,3141300,16.853000,33.688388
...,...,...,...,...
2023-11-13,39.459999,6057300,35.149667,36.581471
2023-11-14,39.849998,5174100,35.323333,37.275824
2023-11-15,40.529999,5387100,35.502000,37.361896
2023-11-16,38.869999,6367400,35.658000,36.797692


In [33]:
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import torch

# Data normalization: Scaling the data to a common range [0, 1]
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df.values)

# Creating the dataset for training, validation, and testing
seq_length = sma_period
X, y = [], []

for i in range(seq_length, len(scaled_data)):
    X.append(scaled_data[i - seq_length:i])
    y.append(scaled_data[i])

X = np.array(X)
y = np.array(y)

# Splitting the dataset into training, validation, and testing sets
train_size = int(0.6 * len(X))  # 60% for training
val_size = int(0.2 * len(X))    # 20% for validation
test_size = len(X) - train_size - val_size  # Remaining for testing

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# Converting data to PyTorch tensors for model training and evaluation
# Reshape and permute to match the expected input shape of the model
X_train_tensor = torch.FloatTensor(X_train).permute(0, 2, 1)
y_train_tensor = torch.FloatTensor(y_train)

X_val_tensor = torch.FloatTensor(X_val).permute(0, 2, 1)
y_val_tensor = torch.FloatTensor(y_val)

X_test_tensor = torch.FloatTensor(X_test).permute(0, 2, 1)
y_test_tensor = torch.FloatTensor(y_test)


In [34]:
X_train_tensor.shape

torch.Size([463, 4, 30])

In [35]:
y_train_tensor.shape

torch.Size([463, 4])

In [36]:
# Check for GPU availability and set the device accordingly
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [37]:
class MultiLayerCNNLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_lstm_layers, output_dim, kernel_size=3):
        super(MultiLayerCNNLSTM, self).__init__()

        # Store the number of LSTM layers and the hidden dimensionality
        self.num_lstm_layers = num_lstm_layers
        self.hidden_dim = hidden_dim

        # CNN Layers
        # ---------------
        self.cnn_layers = nn.Sequential(
            # First convolution layer. Input: [batch_size, input_dim, seq_length]
            nn.Conv1d(input_dim, 32, kernel_size=kernel_size, padding=1), # Output: [batch_size, 32, seq_length]
            nn.ReLU(), # Activation function

            # Second convolution layer
            nn.Conv1d(32, 64, kernel_size=kernel_size, padding=1), # Output: [batch_size, 64, seq_length]
            nn.ReLU(),

            # Max pooling to reduce the sequence length by half
            nn.MaxPool1d(2), # Output: [batch_size, 64, seq_length/2]

            # Third convolution layer
            nn.Conv1d(64, 128, kernel_size=kernel_size, padding=1), # Output: [batch_size, 128, seq_length/2]
            nn.ReLU(),

            # Another max pooling to reduce sequence length further
            nn.MaxPool1d(2)  # Output: [batch_size, 128, seq_length/4]
        )

        # LSTM Layers
        # ---------------
        # LSTM expects input of shape: [batch_size, seq_length, features].
        # Here, features is 128 (the output channels from the previous CNN layer)
        self.lstm = nn.LSTM(128, hidden_dim, num_lstm_layers, batch_first=True)

        # Fully connected layer to produce the final output
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Pass data through CNN layers
        x = self.cnn_layers(x)

        # Permute tensor dimensions to match the LSTM's expected input shape
        x = x.permute(0, 2, 1)  # Shape: [batch_size, seq_length, features]

        # Initialize LSTM hidden and cell states
        h0 = torch.zeros(self.num_lstm_layers, x.size(0), self.hidden_dim).requires_grad_().to(device)
        c0 = torch.zeros(self.num_lstm_layers, x.size(0), self.hidden_dim).requires_grad_().to(device)

        # Pass data through LSTM layers.
        # Output shape: [batch_size, seq_length, hidden_dim]. We're interested in the last sequence step for the FC layer.
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))

        # Pass the output of the last sequence step through the Fully connected layer
        out = self.fc(out[:, -1, :])

        return out.squeeze(-1)


In [38]:
class ConvBiLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_lstm_layers, output_dim, kernel_size=3):
        super(ConvBiLSTM, self).__init__()

        self.num_lstm_layers = num_lstm_layers
        self.hidden_dim = hidden_dim

        # Multiple CNN layers
        self.cnn_layers = nn.Sequential(
            nn.Conv1d(input_dim, 32, kernel_size=kernel_size, padding=1),
            nn.ReLU(),  # Activation function
            nn.Conv1d(32, 64, kernel_size=kernel_size, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2),  # Pooling to reduce dimensionality
            nn.Conv1d(64, 128, kernel_size=kernel_size, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )

        # Bi-directional LSTM layers
        self.bilstm = nn.LSTM(128, hidden_dim, num_layers=num_lstm_layers, batch_first=True, bidirectional=True)

        # Fully connected layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # Multiplying by 2 for bidirectional

    def forward(self, x):
        # Pass data through CNN layers
        x = self.cnn_layers(x)

        x = x.permute(0, 2, 1)  # Shape: [batch_size, seq_length, features]

        # Initialize LSTM hidden and cell states
        h0 = torch.zeros(self.num_lstm_layers * 2, x.size(0), self.hidden_dim).requires_grad_().to(device)  # * 2 for bidirectional
        c0 = torch.zeros(self.num_lstm_layers * 2, x.size(0), self.hidden_dim).requires_grad_().to(device)

        # Pass data through BiLSTM layers
        out, (hn, cn) = self.bilstm(x, (h0.detach(), c0.detach()))

        # Pass the output of BiLSTM layer to Fully connected layer
        out = self.fc(out[:, -1, :])

        return out.squeeze(-1)

In [39]:
import itertools

# Define ranges for hyperparameters
hidden_dim_range      = [32, 64]
num_lstm_layers_range = [1, 2]
kernel_size_range     = [3, 5]
num_epochs_range      = [300, 500]
lr_rates              = [1e-3, 1e-4]


best_loss = float('inf')
best_params = {}

for hidden_dim, num_lstm_layers, kernel_size, num_epochs, lr in itertools.product(hidden_dim_range,
                                                                              num_lstm_layers_range,
                                                                              kernel_size_range,
                                                                              num_epochs_range,
                                                                              lr_rates):
    print(f"Training with hidden_dim={hidden_dim}, \
    num_lstm_layers={num_lstm_layers},\
    kernel_size={kernel_size}, \
    num_epochs={num_epochs},\
    lr ={lr}")

    # Create an instance of the ConvBiLSTM model
    input_dim = 4  # Two features: closing price and volume
    output_dim = 4
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = MultiLayerCNNLSTM(input_dim,
                              hidden_dim,
                              num_lstm_layers,
                              output_dim,
                              kernel_size)
    model.to(device)

    # Training the model
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(num_epochs):
        model.train()
        outputs = model(X_train_tensor.to(device))
        loss = criterion(outputs, y_train_tensor.squeeze().to(device))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Evaluate model on validation set and store best parameters
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_tensor.to(device))
        val_loss = criterion(val_outputs, y_val_tensor.squeeze().to(device))
        if val_loss < best_loss:
            print(f'Best validation loss : Epoch [{epoch+1}/{num_epochs}], Validation Loss: {val_loss.item():.4e}')
            best_loss = val_loss
            best_params = {
                'hidden_dim': hidden_dim,
                'num_lstm_layers': num_lstm_layers,
                'kernel_size': kernel_size,
                'num_epochs': num_epochs,
                'lr':lr
            }

print("Hyperparameter tuning completed.")
print("Best parameters:", best_params)

Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=3,     num_epochs=300,    lr =0.001
Best validation loss : Epoch [300/300], Validation Loss: 2.0196e-03
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=3,     num_epochs=300,    lr =0.0001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=3,     num_epochs=500,    lr =0.001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=3,     num_epochs=500,    lr =0.0001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=5,     num_epochs=300,    lr =0.001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=5,     num_epochs=300,    lr =0.0001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=5,     num_epochs=500,    lr =0.001
Training with hidden_dim=32,     num_lstm_layers=1,    kernel_size=5,     num_epochs=500,    lr =0.0001
Training with hidden_dim=32,     num_lstm_layers=2,    kernel_size=3,     num_epochs=300,    lr =0.001
T

In [50]:
input_dim  = 4
output_dim = 4



hidden_dim      = best_params["hidden_dim"]
num_lstm_layers = best_params["num_lstm_layers"]
kernel_size     = best_params["kernel_size"]
lr              = best_params["lr"]
num_epochs      = best_params["num_epochs"]

print(f"Training with hidden_dim={hidden_dim}, \
num_lstm_layers={num_lstm_layers},\
kernel_size={kernel_size}, \
num_epochs={num_epochs},\
lr ={lr}")

Training with hidden_dim=64, num_lstm_layers=1,kernel_size=3, num_epochs=300,lr =0.001


In [51]:
# Data normalization: Scaling the data to a common range [0, 1]
# This step ensures that all data values are within the same scale,
# which can improve the training process of the neural network.
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df.values)

# Creating the dataset for training and testing
# We prepare input sequences of historical data (X) and their corresponding future data (y).
# 'seq_length' is the number of previous data points used to predict the next data point.
seq_length = sma_period
X, y = [], []

# Loop through the scaled data to create input sequences and targets
for i in range(seq_length, len(scaled_data)):
    # For each iteration, collect the previous 'seq_length' data points as features
    X.append(scaled_data[i-seq_length:i])
    # The next data point after those 'seq_length' data points becomes the target variable
    y.append(scaled_data[i])

# Convert the collected data into numpy arrays for further processing
X = np.array(X)
y = np.array(y)

# Splitting the dataset into training and testing sets
# We use 80% of the data for training and the remaining 20% for testing
train_size = int(0.8 * len(X))
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]

# Converting data to PyTorch tensors for model training and evaluation
# We reshape the data tensors to match the expected input shape of the model:
# (batch_size, num_features, sequence_length), which is different from the given shape.
# Permute is used to swap the last two dimensions.
X_train_tensor = torch.FloatTensor(X_train).permute(0, 2, 1)
y_train_tensor = torch.FloatTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test).permute(0, 2, 1)
y_test_tensor = torch.FloatTensor(y_test)


In [52]:
model = MultiLayerCNNLSTM(input_dim,
                          hidden_dim,
                          num_lstm_layers,
                          output_dim,
                          kernel_size).to(device)


# Training the model
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

for epoch in range(num_epochs):
    model.train()
    outputs = model(X_train_tensor.to(device))
    loss = criterion(outputs, y_train_tensor.squeeze().to(device))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch +1 ) %50 == 0 :
      print(f'Best loss : Epoch [{epoch+1}/{num_epochs}], Validation Loss: {loss.item():.4e}')

Best loss : Epoch [50/300], Validation Loss: 9.2983e-03
Best loss : Epoch [100/300], Validation Loss: 3.4925e-03
Best loss : Epoch [150/300], Validation Loss: 2.2282e-03
Best loss : Epoch [200/300], Validation Loss: 1.8633e-03
Best loss : Epoch [250/300], Validation Loss: 1.6685e-03
Best loss : Epoch [300/300], Validation Loss: 1.5321e-03


In [53]:
# Evaluation and plotting
# Put the model in evaluation mode
model.eval()

# Get predictions for training and testing sets
train_preds = model(X_train_tensor.to(device)).detach().cpu().numpy()
test_preds = model(X_test_tensor.to(device)).detach().cpu().numpy()

# Reshape the predictions and actuals to match shapes
train_preds = train_preds.reshape(-1, 4)  # Two features: closing price and volume
test_preds = test_preds.reshape(-1, 4)

# Inverse scaling of the predictions
train_preds_unscaled = scaler.inverse_transform(train_preds)
test_preds_unscaled = scaler.inverse_transform(test_preds)

# Inverse scaling of the actuals
y_train_unscaled = scaler.inverse_transform(y_train)
y_test_unscaled = scaler.inverse_transform(y_test)

# Extract dates for plotting
train_dates = df.index[seq_length:seq_length+len(y_train)]
test_dates = df.index[seq_length+len(y_train):]

# Create a Plotly figure for closing prices
fig_closing = go.Figure()

# Plotting actual train closing prices
fig_closing.add_trace(go.Scatter(x=train_dates, y=y_train_unscaled[:, 0], mode='lines', name='Actual Train Closing Prices'))

# Plotting predicted train closing prices
fig_closing.add_trace(go.Scatter(x=train_dates, y=train_preds_unscaled[:, 0], mode='lines', name='Predicted Train Closing Prices'))

# Plotting actual test closing prices
fig_closing.add_trace(go.Scatter(x=test_dates, y=y_test_unscaled[:, 0], mode='lines', name='Actual Test Closing Prices'))

# Plotting predicted test closing prices
fig_closing.add_trace(go.Scatter(x=test_dates, y=test_preds_unscaled[:, 0], mode='lines', name='Predicted Test Closing Prices'))

# Customize the layout of the closing price plot
fig_closing.update_layout(title=f'{tick} Stock Closing Price Predictions',
                          xaxis_title='Date',
                          yaxis_title='Closing Price')

# Display the closing price plot
fig_closing.show()

# Create a Plotly figure for volume
fig_volume = go.Figure()

# Plotting actual train volumes
fig_volume.add_trace(go.Scatter(x=train_dates, y=y_train_unscaled[:, 1], mode='lines', name='Actual Train Volumes'))

# Plotting predicted train volumes
fig_volume.add_trace(go.Scatter(x=train_dates, y=train_preds_unscaled[:, 1], mode='lines', name='Predicted Train Volumes'))

# Plotting actual test volumes
fig_volume.add_trace(go.Scatter(x=test_dates, y=y_test_unscaled[:, 1], mode='lines', name='Actual Test Volumes'))

# Plotting predicted test volumes
fig_volume.add_trace(go.Scatter(x=test_dates, y=test_preds_unscaled[:, 1], mode='lines', name='Predicted Test Volumes'))

# Customize the layout of the volume plot
fig_volume.update_layout(title=f'{tick} Stock Volume Predictions',
                         xaxis_title='Date',
                         yaxis_title='Volume')

# Display the volume plot
fig_volume.show()


In [54]:
# Convert the last sequence from testing data to a PyTorch tensor
input_sequence = torch.FloatTensor(X_test_tensor[-1]).unsqueeze(0).to(device)

# Initialize lists to store predicted values for closing price and volume
predictions = []

# Loop over the number of days for which we want to predict (approximately 2 months)
for _ in range(60):
    # Use the model to make a prediction based on the current input_sequence
    prediction = model(input_sequence.to(device))

    # store the predicted values
    predictions.append(prediction.detach().cpu().numpy())

    # Create a new input sequence for the next prediction
    # This is done by removing the oldest values from the input_sequence and appending the current predictions to it
    new_input = prediction.unsqueeze(2)  # Add a new dimension for sequence length
    input_sequence = torch.cat([input_sequence[:, :, 1:], new_input], dim=2)


In [55]:
predictions_unscaled = scaler.inverse_transform(np.array(predictions).reshape(-1, 4))

In [56]:
#predictions_unscaled

In [57]:
closing_pred = predictions_unscaled[:, 0]
volume_pred = predictions_unscaled[:, 1]
mav_pred = predictions_unscaled[:, 2]
rsi_pred = predictions_unscaled[:, 3]


In [58]:
closing_pred.shape

(60,)

In [61]:
# Create a sequence of dates for the next 2 months
last_date = df.index[-1]
next_dates = pd.date_range(start=last_date, periods=60, freq='D')[1:]

# Create a Plotly figure for predicted closing prices
fig_closing_pred = go.Figure()

# Plotting predicted closing prices for the next 2 months
fig_closing_pred.add_trace(go.Scatter(x=next_dates, y=closing_pred.flatten(), mode='lines', name='Predicted Closing Prices'))

# Plotting predicted closing prices for the next 2 months
fig_closing_pred.add_trace(go.Scatter(x=next_dates, y=mav_pred.flatten(), mode='lines', name='Moving Average Prices'))



# Customize the layout of the predicted closing price plot
fig_closing_pred.update_layout(title=f'{tick} Stock Predicted Closing Prices for the Next 2 Months',
                               xaxis_title='Date',
                               yaxis_title='Closing Price')




# Display the predicted closing price plot
fig_closing_pred.show()



# Create a Plotly figure for predicted volumes
fig_volume_pred = go.Figure()

# Plotting predicted volumes for the next 2 months
fig_volume_pred.add_trace(go.Scatter(x=next_dates, y=volume_pred.flatten(), mode='lines', name='Predicted Volumes'))

# Customize the layout of the predicted volume plot
fig_volume_pred.update_layout(title=f'{tick} Stock Predicted Volumes for the Next 2 Months',
                              xaxis_title='Date',
                              yaxis_title='Volume')

# Display the predicted volume plot
fig_volume_pred.show()



# Create a Plotly figure for predicted volumes
fig_rsi_pred = go.Figure()

# Plotting predicted volumes for the next 2 months
fig_rsi_pred.add_trace(go.Scatter(x=next_dates, y=rsi_pred.flatten(), mode='lines', name='Predicted Volumes'))

# Customize the layout of the predicted volume plot
fig_rsi_pred.update_layout(title=f'{tick} Stock Predicted RSI for the Next 2 Months',
                              xaxis_title='Date',
                              yaxis_title='Relative Strength Index (RSI) ')

# Display the predicted volume plot
fig_rsi_pred.show()
