<a href="https://colab.research.google.com/github/wallacelw/Price-Forecasting-With-LSTM/blob/main/Prices_Forecasting_Best.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Datasets available @

BTC, AAPL, MSFT, TSLA, ^IXIC(NASDAQ), ^BVSP(IBOVESPA):
https://finance.yahoo.com/

S&P 500:
https://www.kaggle.com/datasets/andrewmvd/sp-500-stocks?select=sp500_index.csv

## Preprocessing

In [1]:
import pandas as pd
import datetime

def str_to_datetime(s):
    """ Converts a string object to the respective datetime objects"""

    year, month, day = [int(i) for i in s.split('-')]
    return datetime.datetime(year=year, month=month, day=day)


price_dict = {
    "Adj Close" : "Price",
    "S&P500" : "Price",
}

def load_df(filename):
    """
    Create dataframe, filter only Price column,
    convert date to datetime and make it the index
    """

    df = pd.read_csv(filename)
    df.rename(columns = price_dict, inplace = True)

    # Univariate analysis
    df = df[["Date", "Price"]]

    # Convert date type objects to datetime object
    df["Date"] = df["Date"].apply(str_to_datetime)

    # Turn "Date" Column into dataframe index
    df.index = df.pop("Date")

    return df


df = load_df("MSFT.csv")

df

FileNotFoundError: ignored

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10,6))
plt.plot(df.index, df["Price"])
plt.title("Full Dataset")
plt.show()

In [None]:
# Choose the amount of days to consider from the dataset
days = 5000 # ~13 years

# numbers of days to consider in the input of the model
lookback = 15 #


def df_to_windowed(fullDF, n=lookback, daysSelected=days):
    """
    Create a windowed Dataframe (converting into a supervised problem).
    Therefore, the last {lookback} days prices will be the (input)
    and will generate the next day price (output)
    """

    tmp_df = pd.DataFrame()
    for i in range(n, 0, -1):
        tmp_df[f"Last-{i} Price"] = fullDF["Price"].shift(periods=i)
    tmp_df["Price"] = fullDF["Price"]

    return tmp_df.dropna()[-daysSelected:]


windowed_df = df_to_windowed(df)

windowed_df

In [None]:
windowed_df["Price"].describe()

## Models

In [None]:
# Command to disable GPU:

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers

# model input: (last {lookback} days prices, 1 feature = "price")
models = [
    Sequential([ # CNN+LSTM+Dropout
       layers.Input((lookback, 1)),
       layers.Conv1D(128, kernel_size=3, activation="relu", padding="same"),
       layers.MaxPooling1D(pool_size=2, padding="same"),
       layers.LSTM(128, return_sequences=True),
       layers.Flatten(),
       layers.Dropout(0.3),
       layers.Dense(128),
       layers.Dense(1)
    ]),

    Sequential([ # LSTM
        layers.Input((lookback, 1)),
        layers.LSTM(128, return_sequences=True),
        layers.Flatten(),
        layers.Dropout(0.3),
        layers.Dense(128),
        layers.Dense(1)
    ]),

    Sequential([ # CNN
        layers.Input((lookback, 1)),
        layers.Conv1D(128, kernel_size=3, activation="relu", padding="same"),
        layers.MaxPooling1D(pool_size=2, padding="same"),
        layers.Flatten(),
        layers.Dense(128),
        layers.Dense(1)
    ]),

    Sequential([ # Rede Neural Simples
        layers.Input((lookback, 1)),
        layers.Flatten(),
        layers.Dense(128),
        layers.Dense(128),
        layers.Dense(1)
    ]),
]

In [None]:
for model in models:
    print(model.summary())

## Model Training

### Auxilary Functions

In [None]:
# For each year, 60% train, 20% validation, 20% test
def sliding_window_generator(windowed, trainSize=2100, valiSize=450, testSize=450, step=60):
    """
    Sliding Window Generator
    """

    for i in range(0, len(windowed) - trainSize - testSize - valiSize + 1, step):
        train_slice = windowed[i : i+trainSize]
        vali_slice = windowed[i+trainSize : i+trainSize+valiSize]
        test_slice = windowed[i+trainSize+valiSize : i+trainSize+valiSize+testSize]
        yield (train_slice, vali_slice, test_slice)

In [None]:
# Plot windows' intervals and count numbers of windows
plot_generator = sliding_window_generator(windowed_df)
plt.figure(figsize=(10,6))

windows_cnt = 0
for train, vali, test in plot_generator:
    plt.axvline(train.index[0], color="tab:gray")
    plt.plot(train.index, train["Price"], color="tab:blue")
    plt.plot(vali.index, vali["Price"], color="tab:orange")
    plt.plot(test.index, test["Price"], color="tab:green")
    plt.axvline(test.index[-1], color="tab:gray")
    windows_cnt += 1;

plt.title(f"Number of Selected Windows: {windows_cnt}")
plt.legend([
    "Training Observations",
    "Validation Observations",
    "Testing Observations",
])
plt.show()

In [None]:
def split_xy(windowedNP):
    """
    Split np.array into X and y
    """

    X = windowedNP[:, :-1]
    y = windowedNP[:, -1]
    return (X, y)

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix

def compute_accuracy_and_cm(y_val, y_test, y_pred):
    """
    Computes the accuracy score and the confusion matrix
    For simplicity, zero price change are considered as positive
    """

    sz = len(y_test)
    y_ref = np.append(y_val[-1], y_test)

    y_test_label = np.zeros(sz)
    y_pred_label = np.zeros(sz)

    acc = 0
    for i in range(sz):
        y_test_label[i] = 1 if ((y_test[i] - y_ref[i]) >= 0) else -1
        y_pred_label[i]  = 1 if ((y_pred[i] - y_ref[i]) >= 0) else -1

        if y_test_label[i] == y_pred_label[i]:
            acc += 1

    cm = confusion_matrix(y_true=y_test_label, y_pred=y_pred_label)
    return acc/sz, cm

In [None]:
from matplotlib import patches
patienceSelected = 50

def plot_loss_curve(history, model_idx, i, patience=patienceSelected):
    aux_list = [(val, i) for i, val in enumerate(history.history['combine_metric'])]
    best = min(aux_list)
    last = len(history.history['combine_metric'])

    plt.figure(figsize=(10,6))
    plt.title(f"Loss Curve for: Model {model_idx}, Window {i}")
    plt.plot(history.history["loss"], label="Training Loss")
    plt.plot(history.history["val_loss"], label="Validation Loss")
    plt.plot(history.history["combine_metric"], label="Combined Loss")
    plt.ylabel("Loss")
    plt.xlabel("Epoch")

    plt.axvline(last-1, color="tab:gray", ymax=0.3, linestyle='--')
    plt.axvline(last-patience-1, color="tab:gray", ymax=0.3, linestyle='--')
    plt.axvline(best[1], color="tab:red", ymax=0.3, linestyle='--')

    red_patch = patches.Patch(color="tab:red", label=f"best epoch={best[1]}")
    gray_patch = patches.Patch(color="tab:gray", label=f"Early Stop Limits ({last-patience-1}, {last-1})")

    handles, labels = plt.gca().get_legend_handles_labels()
    handles.extend([red_patch, gray_patch])

    plt.legend(handles=handles, loc="upper right")
    plt.show()

In [None]:
def plot_predictions(dates, ys, metrics, model_idx, i):
    dates_train, dates_vali, dates_test = dates
    y_train, y_vali, y_test, y_result = ys
    rmse, mae, mape, r2, acc = metrics

    plt.figure(figsize=(10,6))
    plt.plot(dates_train, y_train)
    plt.plot(dates_vali, y_vali)
    plt.plot(dates_test, y_test)
    plt.plot(dates_test, y_result)
    plt.legend([
        "Training Observations",
        "Validation Observations",
        "Testing Observations",
        "Testing Predictions"
    ])
    plt.title(f"Model {model_idx}, Window {i}, RMSE={rmse:.3f}, MAE={mae:.3f}, MAPE={mape:.3f}, R2={r2:.3f}")
    plt.show()

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

def plot_confusion_matrix(cm, metrics, model_idx, i):
    rmse, mae, mape, r2, acc = metrics

    cm_plt = ConfusionMatrixDisplay(cm, display_labels=["Positive", "Negative"])
    cm_plt.plot()
    cm_plt.ax_.set(
        title= f"Model {model_idx}, Window {i}, Accuracy={acc:.3f}",
        xlabel= "Predicted Price Change",
        ylabel= "Actual Price Change"
    )
    plt.show()

In [None]:
from keras.callbacks import EarlyStopping , Callback, ModelCheckpoint
import h5py

class CombineCallback(Callback):
    def __init__(self, **kargs):
        super(CombineCallback, self).__init__(**kargs)
    def on_epoch_end(self, epoch, logs={}):
        val_factor = 0.2 # 80% training loss, 20% validation loss
        logs['combine_metric'] = val_factor * logs['val_loss'] + (1-val_factor) * logs['loss']

combined_cb = CombineCallback()
model_checkpoint = ModelCheckpoint(
    filepath="tmp_best_model.h5",
    monitor="combine_metric",
    mode="min",
    save_best_only=True,
    save_weights_only=True,
    verbose=False
)
earlyStop = EarlyStopping(monitor="combine_metric", min_delta=0, patience=patienceSelected, mode="min", verbose=False)

### Main Function

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error

def cross_validation(model, generator, model_idx, flag_plot=0):
    """
    Performs Cross validation for all models and all sliding windows;
    Calculates the cross validation score ("RMSE", "MAE", "MAPE", "R2", "Accuracy");
    Accuracy is computed by considering if the relative price change for day i was positive or negative
    """

    cv_score = pd.DataFrame(columns=["RMSE", "MAE", "MAPE", "R2", "Acc"])

    for i, (train, vali, test) in enumerate(generator):

        # Get Dates = [dates_train, dates_vali, dates_test]
        dates = [i.index for i in [train, vali, test]]

        # Get Scaled Data
        scaler = StandardScaler()
        X_train_sc, y_train_sc = split_xy(scaler.fit_transform(train))
        X_vali_sc, y_vali_sc = split_xy(scaler.transform(vali))
        X_test_sc, y_test_sc = split_xy(scaler.transform(test))

        # Fit, save best model and Predict
        model.load_weights("empty_model.h5", skip_mismatch=True, by_name=True)
        model.reset_states()
        history = model.fit(
            X_train_sc, y_train_sc,
            validation_data=(X_vali_sc, y_vali_sc),
            epochs=200, # maximum number of epochs
            batch_size=64, # better for jumping local minimas
            verbose=False,
            callbacks=[combined_cb, earlyStop, model_checkpoint]
        )
        model.load_weights("tmp_best_model.h5", skip_mismatch=True, by_name=True)
        preds_sc = model.predict(X_test_sc, verbose=False)

        # Get Non-Scaled Data
        X_train, y_train = split_xy(train.to_numpy())
        X_vali, y_vali = split_xy(vali.to_numpy())
        X_test, y_test = split_xy(test.to_numpy())
        X_result, y_result = split_xy(scaler.inverse_transform(np.hstack((X_test_sc, preds_sc))))
        ys = [y_train, y_vali, y_test, y_result]

        # Compute Metrics
        rmse = mean_squared_error(y_true=y_test, y_pred=y_result, squared=False)
        mae = mean_absolute_error(y_true=y_test, y_pred=y_result)
        mape = mean_absolute_percentage_error(y_true=y_test, y_pred=y_result)
        r2 = r2_score(y_true=y_test, y_pred=y_result)
        acc, cm = compute_accuracy_and_cm(y_vali, y_test, y_result)

        metrics = [rmse, mae, mape, r2, acc]

        # Plot All Curves and Metrics; Also loss curves
        if flag_plot == 2:
            plot_loss_curve(history, model_idx, i)
            plot_predictions(dates, ys, metrics, model_idx, i)
            plot_confusion_matrix(cm, metrics, model_idx, i)

        # Plot last 5 Curves and Metrics; and bad fits, also the respective loss curves
        elif (flag_plot == 1 and ((i >= (windows_cnt - 5)) or (r2 < 0) )):
            plot_loss_curve(history, model_idx, i)
            plot_predictions(dates, ys, metrics, model_idx, i)
            plot_confusion_matrix(cm, metrics, model_idx, i)

        # Append Result
        cv_score.loc[len(cv_score)] = metrics

    return cv_score


# For each model, perform a cross validation training,
# plot graphs and compute metrics if wanted
cv_scores = []
for i, model in enumerate(models):
    model.compile(
        loss="mean_squared_error",
        optimizer=Adam(learning_rate=0.0001)
    )
    model.save_weights("empty_model.h5")
    generator = sliding_window_generator(windowed_df)
    cv_score = cross_validation(model, generator, i, 1)
    cv_scores.append(cv_score)

## Results

In [None]:
# Output summary (mean, std, min, max)
for i, cv_score in enumerate(cv_scores):
    print(f"Model {i}")
    print(cv_score.describe(), "\n\n")

In [None]:
# Section by Parameter
metrics = ["RMSE", "MAE", "MAPE", "R2", "Acc"]

evaluation = {
    "RMSE": {},
    "MAE": {},
    "MAPE": {},
    "R2": {},
    "Acc": {},
}

for model_idx, cv_score in enumerate(cv_scores):
    for param in metrics:
        evaluation[param][f"Model {model_idx}"] = cv_score[param].mean()

for param in metrics:
    plt.figure(figsize=(10,6))
    plt.title(param)
    plt.bar(list(evaluation[param].keys()), list(evaluation[param].values()))
    plt.xlabel("Models")
    plt.ylabel("Metrics Value")
    plt.show()

    plt.figure(figsize=(10,6))
    plt.title(param + " in Log Scale")
    plt.bar(list(evaluation[param].keys()), list(evaluation[param].values()), color="tab:orange")
    plt.xlabel("Models")
    plt.ylabel("Metrics Value")
    plt.yscale('log')
    plt.show()

In [None]:
# Output complete results
for i, cv_score in enumerate(cv_scores):
    print(f"Model {i}")
    print(cv_score)