## Here's the CNN-LSTM model with comments.

(we put it seperate to make it more clear compare to the experiment.ipynb, the output of the code can be seen in the original file)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
import time
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

# Define the number of days to use for training in each sample
DAYS_FOR_TRAIN = 40 

# Compute Relative Strength Index (RSI) for a given series
def compute_RSI(series, window=14):
    delta = series.diff().dropna()
    up = delta.where(delta > 0, 0.0)
    down = -delta.where(delta < 0, 0.0)
    roll_up = up.rolling(window).mean()
    roll_down = down.rolling(window).mean()
    RS = roll_up / roll_down
    RSI = 100.0 - (100.0 / (1.0 + RS))
    return RSI

# Create dataset for training and testing
def create_dataset(data, close_prices, days_for_train):
    dataset_x, dataset_y = [], []
    for i in range(len(data) - days_for_train):
        _x = data[i:(i + days_for_train), :]
        dataset_x.append(_x)
        dataset_y.append(close_prices[i + days_for_train])
    return np.array(dataset_x), np.array(dataset_y)

# Define the Enhanced CNN-LSTM model
class Enhanced_CNN_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size=32, output_size=1, cnn_filters=64):
        super().__init__()
        # Different branches of CNN with different kernel sizes
        self.branch1 = nn.Sequential(
            nn.Conv1d(in_channels=input_size, out_channels=cnn_filters, kernel_size=3),
            nn.ReLU(),
            nn.Conv1d(in_channels=cnn_filters, out_channels=cnn_filters, kernel_size=3),
            nn.ReLU()
        )
        self.branch2 = nn.Sequential(
            nn.Conv1d(in_channels=input_size, out_channels=cnn_filters, kernel_size=5),
            nn.ReLU()
        )

        self.bn = nn.BatchNorm1d(cnn_filters * 2)  # Batch Normalization

        # LSTM layer
        self.lstm = nn.LSTM(cnn_filters * 2, hidden_size, num_layers=2, batch_first=False, dropout=0.2, bidirectional=False)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Reshape x for CNN
        x = x.permute(1,2,0)  # (batch, feature_dim, seq_len)
        out1 = self.branch1(x)  # (batch, cnn_filters, seq_len-4)
        out2 = self.branch2(x)  # (batch, cnn_filters, seq_len-4)
        
        # Ensure the sequences have the same length
        min_len = min(out1.shape[2], out2.shape[2])
        out1 = out1[:, :, :min_len]
        out2 = out2[:, :, :min_len]

        # Concatenate the outputs from both branches
        out = torch.cat([out1, out2], dim=1)  # (batch, 2*cnn_filters, seq_len_x)
        out = self.bn(out)

        # Reshape back for LSTM
        out = out.permute(2, 0, 1)  # (seq_len, batch, features)
        out, _ = self.lstm(out)
        out = out[-1,:,:]  # Taking the last output
        out = self.fc(out)  # Fully connected layer
        return out

if __name__ == '__main__':
    t0 = time.time()  # Start time for training duration

    # Read and preprocess the data
    data = pd.read_csv('tsla_history.csv')
    data['Date'] = pd.to_datetime(data['Date'])
    data.sort_values('Date', inplace=True)
    data.reset_index(drop=True, inplace=True)

    # Calculate additional features: Returns, Moving Averages, and RSI
    data['Return'] = data['Close'].pct_change()
    data['MA5'] = data['Close'].rolling(5).mean()
    data['MA10'] = data['Close'].rolling(10).mean()
    data['RSI'] = compute_RSI(data['Close'], window=14)

    # Drop rows with NA values
    data.dropna(inplace=True)

    # Select features and normalize the data
    features = ['Close', 'Open', 'High', 'Low', 'Volume', 'MA5', 'MA10', 'RSI']
    data_features = data[features].astype('float32').values
    close_prices = data['Close'].astype('float32').values

    max_features = np.max(data_features, axis=0)
    min_features = np.min(data_features, axis=0)
    data_features = (data_features - min_features) / (max_features - min_features)

    max_close = np.max(close_prices)
    min_close = np.min(close_prices)
    close_prices = (close_prices - min_close) / (max_close - min_close)

    # Create dataset for training, validation, and testing
    dataset_x, dataset_y = create_dataset(data_features, close_prices, DAYS_FOR_TRAIN)
    total_size = len(dataset_x)
    train_size = int(total_size * 0.7)
    val_size = int(total_size * 0.2)

    train_x = dataset_x[:train_size]
    train_y = dataset_y[:train_size]
    val_x = dataset_x[train_size:train_size + val_size]
    val_y = dataset_y[train_size:train_size + val_size]
    test_x = dataset_x[train_size + val_size:]
    test_y = dataset_y[train_size + val_size:]

    # Extract dates for plotting
    dates = data['Date'].values
    train_dates = dates[DAYS_FOR_TRAIN:train_size + DAYS_FOR_TRAIN]
    val_dates = dates[train_size + DAYS_FOR_TRAIN:train_size + val_size + DAYS_FOR_TRAIN]
    test_dates = dates[train_size + val_size + DAYS_FOR_TRAIN:]

    # Transpose and reshape the datasets for training
    train_x = train_x.transpose(1, 0, 2)
    train_y = train_y.reshape(-1, 1)
    val_x = val_x.transpose(1, 0, 2)
    val_y = val_y.reshape(-1)
    test_x = test_x.transpose(1, 0, 2)
    test_y = test_y.reshape(-1)

    # Convert numpy arrays to torch tensors
    train_x = torch.from_numpy(train_x).float()
    train_y = torch.from_numpy(train_y).float()
    val_x = torch.from_numpy(val_x).float()
    test_x = torch.from_numpy(test_x).float()

    # Initialize the model, loss function, and optimizer
    model = Enhanced_CNN_LSTM(input_size=len(features), hidden_size=32, output_size=1, cnn_filters=64)
    loss_function = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    # Training loop
    for i in range(400):
        model.train()
        out = model(train_x)
        loss = loss_function(out, train_y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if (i + 1) % 10 == 0:
            print(f'Epoch: {i + 1}, Loss: {loss.item():.5f}')

    # Evaluate the model on validation and test sets
    model.eval()
    with torch.no_grad():
        train_pred = model(train_x).view(-1).data.numpy()
        val_pred = model(val_x).view(-1).data.numpy()
        test_pred = model(test_x).view(-1).data.numpy()

    # Rescale predictions back to original scale
    train_pred_real = train_pred * (max_close - min_close) + min_close
    val_pred_real = val_pred * (max_close - min_close) + min_close
    test_pred_real = test_pred * (max_close - min_close) + min_close

    train_y_real = train_y.numpy().flatten() * (max_close - min_close) + min_close
    val_y_real = val_y * (max_close - min_close) + min_close
    test_y_real = test_y * (max_close - min_close) + min_close

    # Compute directional movements for accuracy metrics
    def get_direction(y):
        return (np.diff(y) > 0).astype(int)

    train_true_dir = get_direction(train_y_real)
    train_pred_dir = get_direction(train_pred_real)

    val_true_dir = get_direction(val_y_real)
    val_pred_dir = get_direction(val_pred_real)

    test_true_dir = get_direction(test_y_real)
    test_pred_dir = get_direction(test_pred_real)

    # Helper function to print evaluation metrics
    def print_metrics(name, y_true, y_pred):
        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred, zero_division=0)
        rec = recall_score(y_true, y_pred, zero_division=0)
        f1 = f1_score(y_true, y_pred, zero_division=0)
        print(f"{name} Set Metrics:")
        print(f"Accuracy: {acc:.4f}, Precision: {prec:.4f}, Recall: {rec:.4f}, F1-Score: {f1:.4f}")
        print(classification_report(y_true, y_pred, zero_division=0))

    # Print metrics for train, validation, and test sets
    print_metrics("Train", train_true_dir, train_pred_dir)
    print_metrics("Validation", val_true_dir, val_pred_dir)
    print_metrics("Test", test_true_dir, test_pred_dir)

    # Plot the real and predicted prices
    plt.figure(figsize=(12, 8))
    plt.plot(train_dates, train_y_real, label='Train Set', color='blue')
    plt.plot(train_dates, train_pred_real, label='Prediction', color='green')

    plt.plot(val_dates, val_y_real, label='Validation Set', color='orange')
    plt.plot(val_dates, val_pred_real, color='green')

    plt.plot(test_dates, test_y_real, label='Test Set', color='red')
    plt.plot(test_dates, test_pred_real, color='green')

    plt.title('Enhanced CNN+LSTM Regression Predicted Prices')
    plt.xlabel('Date')
    plt.ylabel('Price($)')
    plt.legend(fontsize=12)
    plt.grid()
    plt.show()

    # Print the duration of the training process
    t1 = time.time()
    print(f'Training completed in {(t1 - t0) / 60:.2f} minutes.')