In [1]:
#environment settings
#!pip install tensorflow keras

In [2]:
import os
import math
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import yfinance as yf
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Dropout
from keras.layers import Input
from keras.layers import *
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping
import tensorflow as tf
import tensorflowjs as tfjs
import graphviz
import pydot
from tensorflow.keras.utils import plot_model

In [3]:
#global variables
stock_name = "AAPL"
stock_name_list = ["MSFT", "TSLA"]
start_year = 2002
end_year = 2024
day_before = 60
predict_day = 120
start_index = 0
sc = MinMaxScaler(feature_range=(0,1))

In [4]:
#common functions
def load_dataframe(**kwargs):
    stock_name = kwargs["stock_name"]
    start_year = kwargs["start_year"]
    end_year = kwargs["end_year"]
    df = yf.download(stock_name, start=f'{start_year}-01-01', end=f'{end_year}-01-01')
    #df = pd.read_csv(f'./stocks/{name}.csv') # Default load local csv file
    return df
    
def get_X_train(**kwargs):
    df = kwargs['df']
    start_index = kwargs['start_index']
    day_before = kwargs["day_before"]
    end_index = len(df)
    X_train = df.iloc[start_index:end_index, 0:1].values #open price
    X_train = sc.fit_transform(X_train)
    X_train = [X_train[i-day_before:i, 0] for i in range(day_before, len(X_train))]
    X_train = np.array(X_train)
    X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
    return X_train

def get_X_test(**kwargs):
    df = kwargs['df']
    start_index = kwargs['start_index']
    day_before = kwargs["day_before"]
    end_index = len(df)
    start_index = start_index - day_before
    X_test = df.iloc[start_index:end_index, 0:1].values #open price
    X_test = sc.fit_transform(X_test)
    X_test = [X_test[i-day_before:i, 0] for i in range(day_before, len(X_test))]
    X_test = np.array(X_test)
    X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
    return X_test

def get_y_train(**kwargs):
    df = kwargs['df']
    start_index = kwargs['start_index']
    day_before = kwargs['day_before']
    end_index = len(df)
    y_train = df.iloc[start_index:end_index, 3:4].values #close price
    y_train = sc.fit_transform(y_train)[0+day_before:end_index]
    return y_train

def get_y_test(**kwargs):
    df = kwargs['df']
    start_index = kwargs['start_index']
    day_before = kwargs['day_before']
    end_index = len(df)
    y_test = df.iloc[start_index:end_index, 3:4].values #close price
    return y_test

def save_model(**kwargs):
    model = kwargs["model"]
    _dir = kwargs["_dir"]
    tfjs.converters.save_keras_model(model, _dir)    

In [None]:
def get_model(**kwargs):
    X_train = kwargs['X_train']
    y_train = kwargs['y_train']
    #numpy version require 1.19.5 
    #!pip install numpy==1.19.5
    model = Sequential()
    #Adding the first LSTM layer and some Dropout regularisation
    model.add(LSTM(units = 50, return_sequences = True, input_shape = (X_train.shape[1], 1)))
    model.add(Dropout(0.2))
    # Adding a second LSTM layer and some Dropout regularisation
    model.add(LSTM(units = 50, return_sequences = True))
    model.add(Dropout(0.2))
    # Adding a third LSTM layer and some Dropout regularisation
    model.add(LSTM(units = 50, return_sequences = True))
    model.add(Dropout(0.2))
    # Adding a fourth LSTM layer and some Dropout regularisation
    model.add(LSTM(units = 50))
    model.add(Dropout(0.2))
    # Adding the output layer
    model.add(Dense(units = 1))
    # Compiling the RNN
    model.compile(optimizer = 'adam', loss = 'mean_squared_error')
    # Fitting the RNN to the Training set
    model.fit(X_train, y_train, epochs = 100, batch_size = 32, use_multiprocessing=True, verbose=0)
    return model

def train_stock_prediction_model(**kwargs):
    #arguments
    stock_name = kwargs['stock_name']
    start_year = kwargs['start_year']
    end_year = kwargs['end_year']
    #logging
    print(f"Training {stock_name} stock prediction model")
    #load stock historical price data of certain stock code through yahoo finance library
    df = load_dataframe(stock_name=stock_name, start_year=start_year, end_year=end_year)
    #datafram size
    df_size = len(df)
    #split up training set
    X_train = get_X_train(df=df, start_index=start_index, day_before=day_before)
    y_train = get_y_train(df=df, start_index=start_index, day_before=day_before)
    #split up testing set
    X_test = get_X_test(df=df, start_index=df_size-predict_day, day_before=predict_day)
    y_test = get_y_test(df=df, start_index=df_size-predict_day, end_index=df_size, day_before=0)
    #train model
    model = get_model(X_train=X_train, y_train=y_train)
    print(f"{stock_name} stock prediction model is trained")
    #saving the model
    save_model(model=model, _dir=f"./model/{stock_name}")
    #return model if necessary for temporary debug
    return model

def train_set_of_stock_prediction_model(**kwargs):
    stock_name_list = kwargs['stock_name_list']
    start_year = kwargs['start_year']
    end_year = kwargs['end_year']
    model_list = []
    #iterate to train different stock prediction models
    for stock_name in stock_name_list:
        model_list.append(train_stock_prediction_model(stock_name=stock_name, start_year=start_year, end_year=end_year))
    return model_list
        
def plot_result(**kwargs):
    model = kwargs['model']
    X_test = kwargs['X_test']
    y_test = kwargs['y_test']
    stock_name = kwargs['stock_name']
    predict_day = kwargs['predict_day']
    df_size = kwargs['df_size']
    #Predicted stock price
    predicted_stock_price = model.predict(X_test)
    predicted_stock_price = sc.inverse_transform(predicted_stock_price)
    #print(predicted_stock_price.shape)
    #Real stock price
    real_stock_price = y_test
    #print(real_stock_price.shape)
    # Visualising the results
    plt.figure(figsize=(16,6))
    plt.plot(df.index.values[df_size-predict_day:df_size], real_stock_price, color = "red", label = f'Real {stock_name} Stock Price')
    plt.plot(df.index.values[df_size-predict_day:df_size], predicted_stock_price, color = "blue", label = f'Predicted {stock_name} Stock Price')
    plt.title(f'{stock_name} Stock Price Prediction')
    plt.xlabel('Time')
    plt.ylabel(f'{stock_name} Stock Price')
    plt.legend()
    plt.show()
    
def load_model(**kwargs):
    return

def print_model_summary(**kwargs):
    model = kwargs['model']
    print(model.summary())
    tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True, dpi=80,show_layer_names=True)

In [None]:
train_set_of_stock_prediction_model(stock_name_list=stock_name_list, start_year=start_year, end_year=end_year)

Training MSFT stock prediction model
[*********************100%%**********************]  1 of 1 completed
