In [1]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from yahoo_fin import stock_info as si
from collections import deque

import os
import numpy as np
import pandas as pd
import random

In [2]:
# set seed, so we can get the same results after rerunning several times
np.random.seed(314)
tf.random.set_seed(314)
random.seed(314)

In [3]:
def shuffle_in_unison(a, b):
    # shuffle two arrays in the same way
    state = np.random.get_state()
    np.random.shuffle(a)
    np.random.set_state(state)
    np.random.shuffle(b)

def load_data(ticker, n_steps=50, scale=True, shuffle=True, lookup_step=1, split_by_date=True,
                test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low']):
    """
    Loads data from Yahoo Finance source, as well as scaling, shuffling, normalizing and splitting.
    Params:
        ticker (str/pd.DataFrame): the ticker you want to load, examples include AAPL, TESL, etc.
        n_steps (int): the historical sequence length (i.e window size) used to predict, default is 50
        scale (bool): whether to scale prices from 0 to 1, default is True
        shuffle (bool): whether to shuffle the dataset (both training & testing), default is True
        lookup_step (int): the future lookup step to predict, default is 1 (e.g next day)
        split_by_date (bool): whether we split the dataset into training/testing by date, setting it 
            to False will split datasets in a random way
        test_size (float): ratio for test data, default is 0.2 (20% testing data)
        feature_columns (list): the list of features to use to feed into the model, default is everything grabbed from yahoo_fin
    """
    # see if ticker is already a loaded stock from yahoo finance
    if isinstance(ticker, str):
        # load it from yahoo_fin library
        df = si.get_data(ticker)
    elif isinstance(ticker, pd.DataFrame):
        # already loaded, use it directly
        df = ticker
    else:
        raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
    # this will contain all the elements we want to return from this function
    result = {}
    # we will also return the original dataframe itself
    result['df'] = df.copy()
    # make sure that the passed feature_columns exist in the dataframe
    for col in feature_columns:
        assert col in df.columns, f"'{col}' does not exist in the dataframe."
    # add date as a column
    if "date" not in df.columns:
        df["date"] = df.index
    if scale:
        column_scaler = {}
        # scale the data (prices) from 0 to 1
        for column in feature_columns:
            scaler = preprocessing.MinMaxScaler()
            df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1))
            column_scaler[column] = scaler
        # add the MinMaxScaler instances to the result returned
        result["column_scaler"] = column_scaler
    # add the target column (label) by shifting by `lookup_step`
    df['future'] = df['adjclose'].shift(-lookup_step)
    # last `lookup_step` columns contains NaN in future column
    # get them before droping NaNs
    last_sequence = np.array(df[feature_columns].tail(lookup_step))
    # drop NaNs
    df.dropna(inplace=True)
    sequence_data = []
    sequences = deque(maxlen=n_steps)
    for entry, target in zip(df[feature_columns + ["date"]].values, df['future'].values):
        sequences.append(entry)
        if len(sequences) == n_steps:
            sequence_data.append([np.array(sequences), target])
    # get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence
    # for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length
    # this last_sequence will be used to predict future stock prices that are not available in the dataset
    last_sequence = list([s[:len(feature_columns)] for s in sequences]) + list(last_sequence)
    last_sequence = np.array(last_sequence).astype(np.float32)
    # add to result
    result['last_sequence'] = last_sequence
    # construct the X's and y's
    X, y = [], []
    for seq, target in sequence_data:
        X.append(seq)
        y.append(target)
    # convert to numpy arrays
    X = np.array(X)
    y = np.array(y)
    if split_by_date:
        # split the dataset into training & testing sets by date (not randomly splitting)
        train_samples = int((1 - test_size) * len(X))
        result["X_train"] = X[:train_samples]
        result["y_train"] = y[:train_samples]
        result["X_test"]  = X[train_samples:]
        result["y_test"]  = y[train_samples:]
        if shuffle:
            # shuffle the datasets for training (if shuffle parameter is set)
            shuffle_in_unison(result["X_train"], result["y_train"])
            shuffle_in_unison(result["X_test"], result["y_test"])
    else:    
        # split the dataset randomly
        result["X_train"], result["X_test"], result["y_train"], result["y_test"] = train_test_split(X, y, 
                                                                                test_size=test_size, shuffle=shuffle)
    # get the list of test set dates
    dates = result["X_test"][:, -1, -1]
    # retrieve test features from the original dataframe
    result["test_df"] = result["df"].loc[dates]
    # remove duplicated dates in the testing dataframe
    result["test_df"] = result["test_df"][~result["test_df"].index.duplicated(keep='first')]
    # remove dates from the training/testing sets & convert to float32
    result["X_train"] = result["X_train"][:, :, :len(feature_columns)].astype(np.float32)
    result["X_test"] = result["X_test"][:, :, :len(feature_columns)].astype(np.float32)
    return result

# This function is long but handy, it accepts several arguments to be as flexible as possible:

The ticker argument is the ticker we want to load, for instance, you can use TSLA for the Tesla stock market, AAPL for Apple, and so on. It can also be a pandas Dataframe with the condition it includes the columns in feature_columns as well as date as an index.

n_steps integer indicates the historical sequence length we want to use, some people call it the window size, recall that we are going to use a recurrent neural network, we need to feed into the network a sequence data, choosing 50 means that we will use 50 days of stock prices to predict the next lookup time step.

scale is a boolean variable that indicates whether to scale prices from 0 to 1, we will set this to True as scaling high values from 0 to 1 will help the neural network to learn much faster and more effectively.
lookup_step is the future lookup step to predict, the default is set to 1 (e.g next day). 15 means the next 15 days, and so on.

split_by_date is a boolean which indicates whether we split our training and testing sets by date, setting it to False means we randomly split the data into training and testing using sklearn's train_test_split() function. If it's True (the default), we split the data in date order.

We will be using all the features available in this dataset, which are the open, high, low, volume and adjusted close. Please check this tutorial to learn more about what these indicators are.

# The above function does the following:

First, it loads the dataset using stock_info.get_data() function in yahoo_fin module.

It adds the "date" column from the index if it doesn't exist, this will help us later to get the features of the testing set.

If the scale argument is passed as True, it will scale all the prices from 0 to 1 (including the volume) using the sklearn's MinMaxScaler class. Note that each column has its own scaler.

It then adds the future column which indicates the target values (the labels to predict, or the y's) by shifting the adjusted close column by lookup_step.

After that, it shuffles and splits the data into training and testing sets, and finally returns the result.

To understand the code even better, I highly suggest you manually print the output variable (result) and see how the features and labels are made.

# Model Creation

In [4]:
def create_model(sequence_length, n_features, units=256, cell=LSTM, n_layers=2, dropout=0.3,
                loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False):
    model = Sequential()
    for i in range(n_layers):
        if i == 0:
            # first layer
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=True), batch_input_shape=(None, sequence_length, n_features)))
            else:
                model.add(cell(units, return_sequences=True, batch_input_shape=(None, sequence_length, n_features)))
        elif i == n_layers - 1:
            # last layer
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=False)))
            else:
                model.add(cell(units, return_sequences=False))
        else:
            # hidden layers
            if bidirectional:
                model.add(Bidirectional(cell(units, return_sequences=True)))
            else:
                model.add(cell(units, return_sequences=True))
        # add dropout after each layer
        model.add(Dropout(dropout))
    model.add(Dense(1, activation="linear"))
    model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer)
    return model

Again, this function is flexible too, you can change the number of layers, dropout rate, the RNN cell, loss, and the optimizer used to compile the model.

The above function constructs an RNN that has a dense layer as output layer with 1 neuron, this model requires a sequence of features of sequence_length (in this case, we will pass 50 or 100) consecutive time steps (which are days in this dataset) and outputs a single value which indicates the price of the next time step.

It also accepts n_features as an argument, which is the number of features we will pass on each sequence, in our case, we'll pass adjclose, open, high, low and volume columns (i.e 5 features).

You can tweak the default parameters as you wish, n_layers is the number of RNN layers you want to stack, dropout is the dropout rate after each RNN layer, units are the number of RNN cell units (whether it is LSTM, SimpleRNN, or GRU), bidirectional is a boolean that indicates whether to use bidirectional RNNs, experiment with those!

# Training the Model

In [5]:
import os
import time
from tensorflow.keras.layers import LSTM

# Window size or the sequence length
N_STEPS = 50
# Lookup step, 1 is the next day
LOOKUP_STEP = 15
# whether to scale feature columns & output price as well
SCALE = True
scale_str = f"sc-{int(SCALE)}"
# whether to shuffle the dataset
SHUFFLE = True
shuffle_str = f"sh-{int(SHUFFLE)}"
# whether to split the training/testing set by date
SPLIT_BY_DATE = False
split_by_date_str = f"sbd-{int(SPLIT_BY_DATE)}"
# test ratio size, 0.2 is 20%
TEST_SIZE = 0.2
# features to use
FEATURE_COLUMNS = ["adjclose", "volume", "open", "high", "low"]
# date now
date_now = time.strftime("%Y-%m-%d")
### model parameters
N_LAYERS = 2
# LSTM cell
CELL = LSTM
# 256 LSTM neurons
UNITS = 256
# 40% dropout
DROPOUT = 0.4
# whether to use bidirectional RNNs
BIDIRECTIONAL = False
### training parameters
# mean absolute error loss
# LOSS = "mae"
# huber loss
LOSS = "huber_loss"
OPTIMIZER = "adam"
BATCH_SIZE = 64
EPOCHS = 500
# Amazon stock market
ticker = "AMZN"
ticker_data_filename = os.path.join("data", f"{ticker}_{date_now}.csv")
# model name to save, making it as unique as possible based on parameters
model_name = f"{date_now}_{ticker}-{shuffle_str}-{scale_str}-{split_by_date_str}-\
{LOSS}-{OPTIMIZER}-{CELL.__name__}-seq-{N_STEPS}-step-{LOOKUP_STEP}-layers-{N_LAYERS}-units-{UNITS}"
if BIDIRECTIONAL:
    model_name += "-b"

So the above code is all about defining all the hyperparameters we gonna use, we explained some of them, while we didn't on the others:

TEST_SIZE: The testing set rate. For instance 0.2 means 20% of the total dataset.

FEATURE_COLUMNS: The features we gonna use to predict the next price value.

N_LAYERS: Number of RNN layers to use.

CELL: RNN cell to use, default is LSTM.

UNITS: Number of cell units.

DROPOUT: The dropout rate is the probability of not training a given node in a layer, where 0.0 means no dropout at all. This type of regularization can help the model to not overfit our training data.

BIDIRECTIONAL: Whether to use bidirectional recurrent neural networks.

LOSS: Loss function to use for this regression problem, we're using Huber
loss, you can use mean absolute error (mae) or mean squared error (mse) as well.

OPTIMIZER: Optimization algorithm to use, defaulting to Adam.

BATCH_SIZE: The number of data samples to use on each training iteration.

EPOCHS: The number of times that the learning algorithm will pass through the entire training dataset, we used 500 here, but try to increase it furthermore.

Feel free to experiment with these values to get better results than mine.

In [6]:
pip install numpy==1.19.5

Note: you may need to restart the kernel to use updated packages.


In [7]:
# create these folders if they does not exist
if not os.path.isdir("results"):
    os.mkdir("results")
if not os.path.isdir("logs"):
    os.mkdir("logs")
if not os.path.isdir("data"):
    os.mkdir("data")

In [None]:
# load the data
data = load_data(ticker, N_STEPS, scale=SCALE, split_by_date=SPLIT_BY_DATE, 
                shuffle=SHUFFLE, lookup_step=LOOKUP_STEP, test_size=TEST_SIZE, 
                feature_columns=FEATURE_COLUMNS)
# save the dataframe
data["df"].to_csv(ticker_data_filename)
# construct the model
model = create_model(N_STEPS, len(FEATURE_COLUMNS), loss=LOSS, units=UNITS, cell=CELL, n_layers=N_LAYERS,
                    dropout=DROPOUT, optimizer=OPTIMIZER, bidirectional=BIDIRECTIONAL)
# some tensorflow callbacks
checkpointer = ModelCheckpoint(os.path.join("results", model_name + ".h5"), save_weights_only=True, save_best_only=True, verbose=1)
tensorboard = TensorBoard(log_dir=os.path.join("logs", model_name))
# train the model and save the weights whenever we see 
# a new optimal model using ModelCheckpoint
history = model.fit(data["X_train"], data["y_train"],
                    batch_size=BATCH_SIZE,
                    epochs=EPOCHS,
                    validation_data=(data["X_test"], data["y_test"]),
                    callbacks=[checkpointer, tensorboard],
                    verbose=1)

Epoch 1/500
Instructions for updating:
use `tf.profiler.experimental.stop` instead.
Epoch 00001: val_loss improved from inf to 0.00017, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 2/500
Epoch 00002: val_loss improved from 0.00017 to 0.00017, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 3/500
Epoch 00003: val_loss improved from 0.00017 to 0.00016, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 4/500
Epoch 00004: val_loss did not improve from 0.00016
Epoch 5/500
Epoch 00005: val_loss did not improve from 0.00016
Epoch 6/500
Epoch 00006: val_loss improved from 0.00016 to 0.00015, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 7/500
Epoch 00007: val_loss did not improve from 0.00015
Epoch 8/500
Epo

Epoch 23/500
Epoch 00023: val_loss did not improve from 0.00015
Epoch 24/500
Epoch 00024: val_loss did not improve from 0.00015
Epoch 25/500
Epoch 00025: val_loss did not improve from 0.00015
Epoch 26/500
Epoch 00026: val_loss did not improve from 0.00015
Epoch 27/500
Epoch 00027: val_loss improved from 0.00015 to 0.00014, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 28/500
Epoch 00028: val_loss did not improve from 0.00014
Epoch 29/500
Epoch 00029: val_loss did not improve from 0.00014
Epoch 30/500
Epoch 00030: val_loss did not improve from 0.00014
Epoch 31/500
Epoch 00031: val_loss did not improve from 0.00014
Epoch 32/500
Epoch 00032: val_loss did not improve from 0.00014
Epoch 33/500
Epoch 00033: val_loss did not improve from 0.00014
Epoch 34/500
Epoch 00034: val_loss did not improve from 0.00014
Epoch 35/500
Epoch 00035: val_loss did not improve from 0.00014
Epoch 36/500
Epoch 00036: val_loss improved from 

Epoch 47/500
Epoch 00047: val_loss did not improve from 0.00013
Epoch 48/500
Epoch 00048: val_loss improved from 0.00013 to 0.00013, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 49/500
Epoch 00049: val_loss did not improve from 0.00013
Epoch 50/500
Epoch 00050: val_loss did not improve from 0.00013
Epoch 51/500
Epoch 00051: val_loss did not improve from 0.00013
Epoch 52/500
Epoch 00052: val_loss did not improve from 0.00013
Epoch 53/500
Epoch 00053: val_loss did not improve from 0.00013
Epoch 54/500
Epoch 00054: val_loss did not improve from 0.00013
Epoch 55/500
Epoch 00055: val_loss did not improve from 0.00013
Epoch 56/500
Epoch 00056: val_loss did not improve from 0.00013
Epoch 57/500
Epoch 00057: val_loss improved from 0.00013 to 0.00012, saving model to results\2021-10-03_AMZN-sh-1-sc-1-sbd-0-huber_loss-adam-LSTM-seq-50-step-15-layers-2-units-256.h5
Epoch 58/500
Epoch 00058: val_loss did not improve from 0.

We used ModelCheckpoint which saves our model in each epoch during the training. We also used TensorBoard to visualize the model performance in the training process.

After running the above block of code, it will train the model for 500 epochs (as we set previously), so it will take some time, here is the first output lines:

In [None]:
tensorboard --logdir="logs"

Now this will start a local HTTP server at localhost:6006, after going to the browser, you'll see something similar to this:<img source="https://www.thepythoncode.com/media/articles/stock-price-prediction-in-python-using-tensorflow-2-and-keras/loss-in-tensorboard.PNG"> 

The loss is Huber loss as specified in the LOSS parameter (you can always change it to mean absolute error or mean squared error), the curve is the validation loss. As you can see, it is significantly decreasing over time, you can also increase the number of epochs to get much better results.

# Testing the Model

In [None]:
import matplotlib.pyplot as plt

def plot_graph(test_df):
    """
    This function plots true close price along with predicted close price
    with blue and red colors respectively
    """
    plt.plot(test_df[f'true_adjclose_{LOOKUP_STEP}'], c='b')
    plt.plot(test_df[f'adjclose_{LOOKUP_STEP}'], c='r')
    plt.xlabel("Days")
    plt.ylabel("Price")
    plt.legend(["Actual Price", "Predicted Price"])
    plt.show()

The below function takes the model and the data that was returned by create_model() and load_data() functions respectively, and constructs a dataframe in which it includes the predicted adjclose along with true future adjclose, as well as calculating buy and sell profit, we'll see it in action in a moment:

In [None]:
def get_final_df(model, data):
    """
    This function takes the `model` and `data` dict to 
    construct a final dataframe that includes the features along 
    with true and predicted prices of the testing dataset
    """
    # if predicted future price is higher than the current, 
    # then calculate the true future price minus the current price, to get the buy profit
    buy_profit  = lambda current, pred_future, true_future: true_future - current if pred_future > current else 0
    # if the predicted future price is lower than the current price,
    # then subtract the true future price from the current price
    sell_profit = lambda current, pred_future, true_future: current - true_future if pred_future < current else 0
    X_test = data["X_test"]
    y_test = data["y_test"]
    # perform prediction and get prices
    y_pred = model.predict(X_test)
    if SCALE:
        y_test = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(np.expand_dims(y_test, axis=0)))
        y_pred = np.squeeze(data["column_scaler"]["adjclose"].inverse_transform(y_pred))
    test_df = data["test_df"]
    # add predicted future prices to the dataframe
    test_df[f"adjclose_{LOOKUP_STEP}"] = y_pred
    # add true future prices to the dataframe
    test_df[f"true_adjclose_{LOOKUP_STEP}"] = y_test
    # sort the dataframe by date
    test_df.sort_index(inplace=True)
    final_df = test_df
    # add the buy profit column
    final_df["buy_profit"] = list(map(buy_profit, 
                                    final_df["adjclose"], 
                                    final_df[f"adjclose_{LOOKUP_STEP}"], 
                                    final_df[f"true_adjclose_{LOOKUP_STEP}"])
                                    # since we don't have profit for last sequence, add 0's
                                    )
    # add the sell profit column
    final_df["sell_profit"] = list(map(sell_profit, 
                                    final_df["adjclose"], 
                                    final_df[f"adjclose_{LOOKUP_STEP}"], 
                                    final_df[f"true_adjclose_{LOOKUP_STEP}"])
                                    # since we don't have profit for last sequence, add 0's
                                    )
    return final_df

The last function we gonna define is the one that's responsible for predicting the next future price:

In [None]:
def predict(model, data):
    # retrieve the last sequence from data
    last_sequence = data["last_sequence"][-N_STEPS:]
    # expand dimension
    last_sequence = np.expand_dims(last_sequence, axis=0)
    # get the prediction (scaled from 0 to 1)
    prediction = model.predict(last_sequence)
    # get the price (by inverting the scaling)
    if SCALE:
        predicted_price = data["column_scaler"]["adjclose"].inverse_transform(prediction)[0][0]
    else:
        predicted_price = prediction[0][0]
    return predicted_price

Now that we have the necessary functions for evaluating our model, let's load the optimal weights and proceed with evaluation:

In [None]:
# load optimal model weights from results folder
model_path = os.path.join("results", model_name) + ".h5"
model.load_weights(model_path)

Calculating loss and mean absolute error using model.evaluate() method:

In [None]:
# evaluate the model
loss, mae = model.evaluate(data["X_test"], data["y_test"], verbose=0)
# calculate the mean absolute error (inverse scaling)
if SCALE:
    mean_absolute_error = data["column_scaler"]["adjclose"].inverse_transform([[mae]])[0][0]
else:
    mean_absolute_error = mae

We also take scaled output values into consideration, so we use the inverse_transform() method from the MinMaxScaler we defined in the load_data() function earlier if the SCALE parameter was set to True.

Now let's call the get_final_df() function we defined earlier to construct our testing set dataframe:

In [None]:
# get the final dataframe for the testing set
final_df = get_final_df(model, data)

Also, let's use predict() function to get the future price:

In [None]:
# predict the future price
future_price = predict(model, data)

The below code calculates the accuracy score by counting the number of positive profits (in both buy profit and sell profit):

In [None]:
# we calculate the accuracy by counting the number of positive profits
accuracy_score = (len(final_df[final_df['sell_profit'] > 0]) + len(final_df[final_df['buy_profit'] > 0])) / len(final_df)
# calculating total buy & sell profit
total_buy_profit  = final_df["buy_profit"].sum()
total_sell_profit = final_df["sell_profit"].sum()
# total profit by adding sell & buy together
total_profit = total_buy_profit + total_sell_profit
# dividing total profit by number of testing samples (number of trades)
profit_per_trade = total_profit / len(final_df)

We also calculate profit per trade which is essentially the total profit divided by the number of testing samples. Printing all the previously calculated metrics:

In [None]:
# printing metrics
print(f"Future price after {LOOKUP_STEP} days is {future_price:.2f}$")
print(f"{LOSS} loss:", loss)
print("Mean Absolute Error:", mean_absolute_error)
print("Accuracy score:", accuracy_score)
print("Total buy profit:", total_buy_profit)
print("Total sell profit:", total_sell_profit)
print("Total profit:", total_profit)
print("Profit per trade:", profit_per_trade)