In [1]:
%%capture
%pip install keras-tuner

In [2]:
from google.colab import drive
import pandas as pd
import keras
import scipy
import torch
import numpy as np
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
from keras_tuner.tuners import RandomSearch
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import datetime as datetime
import plotly.express as px
import matplotlib.pyplot as plt

# Check if a GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available and being used.")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU instead.")

Mounted at /content/drive
GPU is available and being used.


In [None]:
# Use google drive to save files - specify location here if local saving needed too.
drive.mount('/content/drive')
file_location = '/content/drive/MyDrive/Colab Notebooks/Dissertation/ic_fspml'
log_dir = f'{file_location}/models/' # TensorBoard logs

<h1>LSTM Modelling</h1>

<h2> Load in dataset </h2>

In [3]:
# Load in prepared dataset, with news, sector, sentiment and price information populated
stock_news_and_price_df = pd.read_csv(f'{file_location}/data/ic_fspml_senti_price.csv')

In [4]:
# Function to get the top news tickers per sector (optional) based on the number of news articles associated
def get_top_news_tickers_per_sector(df, count_of_tickers, sector=''):
    if sector != '':
      df = df[df['sector']==sector]

    # Group by sector and ticker, then count the number of news articles
    news_count_by_ticker = df.groupby(['sector', 'Ticker'])['article_headline'].count().reset_index(name='NewsCount')

    # Sort the data within each sector by news count in descending order
    news_count_by_ticker = news_count_by_ticker.sort_values(by=['sector', 'NewsCount'], ascending=[True, False])

    # Get the top x tickers per sector
    top_tickers_per_sector = news_count_by_ticker.groupby('sector').head(count_of_tickers)

    return list(top_tickers_per_sector.Ticker)

<h2> Preprocessing the Data </h2>

In [8]:
# Function to preprocess data for model training, by adding sentiment & scaling data
def preprocess_data(df, feature_columns, target_column):
    df = df.copy()
    # Map sentiment labels to numerical values, needed for input to LSTM
    sentiment_mapping = {'negative': -1, 'neutral': 0, 'positive': 1}
    df['sentiment'] = df['sentiment'].map(sentiment_mapping)
    df['sentiment'].fillna(0, inplace=True)

    # Calculate the sum of sentiment per Date and sector
    sector_sentiment = df.groupby(['Date', 'sector'])['sentiment'].sum().reset_index()
    # Merge the sector sentiment back into the original DataFrame
    df = df.merge(sector_sentiment, on=['Date', 'sector'], suffixes=('', '_sector'))

    # Calculate the sum of sentiment per Date, 'globally'
    global_sentiment = df.groupby(['Date'])['sentiment'].sum().reset_index()
    # Merge the global sentiment back into the original DataFrame
    df = df.merge(global_sentiment, on=['Date'], suffixes=('', '_global'))

    # LSTMs are sensitive to the scale of the input data.
    # Normalising the features helps to stabilise and speed up the training process.
    if len(feature_columns) > 0:
      scaler = MinMaxScaler(feature_range=(0, 1))
      df[feature_columns] = scaler.fit_transform(df[feature_columns])

    # Scale the target column seperately, so it can be used in the predictions to unscale
    price_scaler = MinMaxScaler(feature_range=(0, 1))
    df[target_column] = price_scaler.fit_transform(df[target_column])

    df = df[feature_columns + target_column]

    return df, price_scaler

<h3> Generate Sequences </h3>

In [11]:
# Function to filter input df to a specific stock
def filter_to_stock(df, stock_name):
    df = df[df['Ticker']==stock_name].copy()
    df.drop(columns='Ticker', inplace=True)
    return df


# Function to split the data into train, validate and test for features & target
# Returns the datasets as generators, for memory efficiency and ease of use with Keras
def create_generators(input_data, feature_columns, target_column, sequence_length=5, test_size=0.2, val_size=0.5, batch_size=16, shuffle=False):

    # Extract features and target values from the input data
    features = input_data[feature_columns + target_column].values
    target = input_data[target_column].values

    # Split the data into training, validation, and test sets before generating sequences
    X_train, X_temp, y_train, y_temp = train_test_split(features, target, test_size=test_size, shuffle=shuffle)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=val_size, shuffle=shuffle)

    # Create the time series generators for each split. Note - DO NOT SHUFFLE
    train_generator = TimeseriesGenerator(X_train, y_train, length=sequence_length, batch_size=batch_size, shuffle=False)
    val_generator = TimeseriesGenerator(X_val, y_val, length=sequence_length, batch_size=batch_size, shuffle=False)
    test_generator = TimeseriesGenerator(X_test, y_test, length=sequence_length, batch_size=batch_size, shuffle=False)

    return train_generator, val_generator, test_generator


# Function to generate datasets for multiple tickers and feature sets
# Returns datasets and scalers, named with ticker & feature set name.
def generate_datasets(df, tickers, feature_set_dict, target_column, sequence_length=5, test_size=0.2, val_size=0.5, shuffle=False):
    datasets = {}
    scalers = {}

    # Loop over each feature set in the feature_set_dict
    for feature_set_name, feature_set_columns in feature_set_dict.items():
      # Loop over each ticker in the ticker list
      for ticker in tickers:
          # Filter and preprocess data per ticker
          filtered_df = filter_to_stock(df, ticker)
          processed_df, price_scaler = preprocess_data(filtered_df, feature_set_columns, target_column)

          # Generate train, validate and test timeseries data
          train_generator, val_generator, test_generator = create_generators(
              processed_df, feature_set_columns, target_column, sequence_length, test_size, val_size, 32, shuffle
          )

          # Store the timeseries and scalers in dictionaries
          datasets[ticker + feature_set_name] = (train_generator, val_generator, test_generator)
          scalers[ticker + feature_set_name] = price_scaler

    return datasets, scalers

In [12]:
# Generate a list of top tickers with the most news articles per sector
# The function returns the top 2 tickers from each sector in the DataFrame
ticker_list = get_top_news_tickers_per_sector(stock_news_and_price_df, 2, sector='')
target_column = ['Adj Close']

# Dictionary of feature sets to be used for generating datasets
# Each key is a feature set, with the associated list of feature columns
# The target column will automatically be included as input in each feature set
feature_set_dict = {'_close_only' : [],
                    '_open_close' : ['Open'],
                    '_open_close_sentiment' : ['Open', 'sentiment'],
                    '_open_close_sentiment_sector' : ['Open', 'sentiment', 'sentiment_sector'],
                    '_open_close_sentiment_sector_global' : ['Open', 'sentiment', 'sentiment_sector','sentiment_global']
                    }

# Preprocess, then generate datasets and corresponding scalers for the specified tickers and feature sets
datasets, scalers = generate_datasets(stock_news_and_price_df, ticker_list, feature_set_dict, target_column)

<h2> Create LSTM Model </h2>

In [13]:
# Load the TensorBoard notebook extension
# Can comment out without issue if not being used.
%load_ext tensorboard
%tensorboard --logdir '{file_location}/models/'

In [14]:
# Function to build the LSTM model with hyperparameter tuning using Keras-Tuner
def build_model(hp, input_shape):
    inputs = Input(shape=input_shape)

    # Add the first LSTM layer. Units selected with hyperparameter tuning
    x = LSTM(units=hp.Int('units', min_value=32, max_value=512, step=32),
             return_sequences=True)(inputs)

    # Add a Dropout layer to prevent overfitting. Dropout rate selected via hyperparameter tuning
    x = Dropout(rate=hp.Float('dropout', min_value=0.1, max_value=0.5, step=0.1))(x)

    # Add the second LSTM layer. Units selected with hyperparameter tuning
    x = LSTM(units=hp.Int('units', min_value=32, max_value=512, step=32))(x)

    # Add the second Dropout layer to prevent overfitting. Dropout rate selected via hyperparameter tuning
    x = Dropout(rate=hp.Float('dropout', min_value=0.1, max_value=0.5, step=0.1))(x)

    # Dense output layer with a single unit, for adj close price prediction
    outputs = Dense(1)(x)

    # Create and compile the model, with Adam optimiser, and MSE for loss function to be optimised
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='mean_squared_error')

    return model


# Function to train and evaluate the model using hyperparameter tuning
def train_and_evaluate_model(dataset_name, train_generator, val_generator, test_generator):
    # Automatically calculate input_shape from the generator, from the shape of the features (timesteps, num_features)
    input_shape = train_generator[0][0].shape[1:]

    # Define the Keras-Tuner per dataset name, passing input_shape to build_model, finding the best hyperparameters
    tuner = RandomSearch(
        lambda hp: build_model(hp, input_shape),
        objective='val_loss',
        max_trials=10,
        executions_per_trial=3,
        directory=f'{file_location}/models',
        project_name=f'stock_price_prediction_{dataset_name}'
    )

    # Start the hyperparameter search - epochs set to 150, as early stopping will prevent excessive fruitless runs
    tuner.search(train_generator, epochs=150, validation_data=val_generator,
                 callbacks=[
                     # Log training metrics for visualisation in TensorBoard, like ensuring raining loss convergence
                     TensorBoard(log_dir=f'{file_location}/models/logs_{dataset_name}', histogram_freq=1),
                     # Early stopping to prevent overfitting, with restoration of the best model weights. High patience, as GPUs are available.
                     EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
                 ]
    )

    # Retrieve the best model found by Keras-Tuner
    best_model = tuner.get_best_models(num_models=1)[0]

    # Output & evaluate the model on the test set using the test generator
    test_loss = best_model.evaluate(test_generator)
    print(f'{dataset_name} Test Loss: {test_loss}')

    # Save the best model with the dataset name in the filename
    best_model.save(f'{file_location}/models/ic_fspml_{dataset_name}_model.keras')

    return best_model

In [None]:
best_models = {}

# Loop over each of the datasets, training and validating the model, and storing the best model in the dictionary.
for dataset_name, (train_generator, val_generator, test_generator) in datasets.items():
    print(f"Training and evaluating model for {dataset_name}...")
    best_model = train_and_evaluate_model(dataset_name, train_generator, val_generator, test_generator)
    best_models[dataset_name] = best_model

<h1> Model Evaluation </h1>

In [18]:
# Function to get model predictions and true values from the test generator
def get_model_predictions(model, test_generator, scaler):
    # Get predicted output from the model and test generator
    y_pred = model.predict(test_generator)

    # Extract all true values (y) from the generator
    y_true = []
    for i in range(len(test_generator)):
        x, y = test_generator[i]
        y_true.extend(y)

    # Inverse transform the predictions and true values to their original scale
    y_true = np.array(y_true)
    y_true = scaler.inverse_transform(y_true.reshape(-1, 1)).flatten()
    y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()

    return y_true, y_pred


# Function to calculate the mean percentage error
def percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    # Calculate the absolute error
    error = np.abs(y_true - y_pred)
    # Calculate the percentage error
    percentage_error = (error / np.abs(y_true)) * 100

    return np.mean(percentage_error)


# Function to evaluate model performance using various metrics
def evaluate_model_performance(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_true, y_pred)
    percent_errors = percentage_error(y_true, y_pred)

    # Return the metrics as a dictionary
    metrics = {
        'mse': mse,
        'rmse': rmse,
        'mae': mae,
        'r2': r2,
        'percentage_error': percent_errors
    }

    return metrics


# Function to create a comparison table of model performance metrics
def create_comparison_table(results):
    # Initialise an empty dictionary to store metrics for each model
    comparison_data = {}

    # Populate the dictionary with metrics from evaluate_model_performance, for each model
    for model_name, data in results.items():
        comparison_data[model_name] = data['metrics']

    # Transpose the DataFrame so that models are columns and metrics are rows
    comparison_df = pd.DataFrame(comparison_data)
    comparison_df = comparison_df.transpose()
    comparison_df = comparison_df.reset_index()
    comparison_df = comparison_df.rename(columns={'index': 'ModelName'})

    # Save performance metrics for future use
    comparison_df.to_csv(f'{file_location}/results/performance_metrics.csv', index=False)

    return comparison_df.sort_values('ModelName')


# Function to plot and compare actual vs predicted stock prices
def plot_predictions(y_true, y_predicted):
    plt.figure(figsize=(12, 6))
    plt.plot(y_true, label='Actual')
    plt.plot(y_predicted, label='Predicted')
    plt.title('Actual vs Predicted Stock Prices')
    plt.xlabel('Time')
    plt.ylabel('Stock Adjusted Close Price')
    plt.legend()
    plt.savefig(f'{file_location}/results/plot_{dataset_name}.png')
    plt.show()

In [None]:
results = {}

# Loop over each dataset in datasets, generate performance metrics
for dataset_name, (train_generator, val_generator, test_generator) in datasets.items():
    print(f"Evaluating performance for {dataset_name}...")
    best_model = best_models[dataset_name]
    y_true, y_predicted = get_model_predictions(best_model, test_generator, scalers[dataset_name])
    metrics = evaluate_model_performance(y_true, y_predicted)

    results[dataset_name] = {
        'model': best_model,
        'metrics': metrics
    }

# Create a comparison table of the performance metrics across all datasets
comparison_table = create_comparison_table(results)
comparison_table

<h1> Plotting Results <h1>

In [None]:
for dataset_name, (train_generator, val_generator, test_generator) in datasets.items():
    print(f"Evaluating performance for {dataset_name}...")
    best_model = best_models[dataset_name]
    y_true, y_predicted = get_model_predictions(best_model, test_generator, scalers[dataset_name])
    plot_predictions(y_true, y_predicted)