Description: artificial recurrent neural network, Long Short Term Memory (LSTM).<br>
&emsp;&emsp;Using (size) 60 days stock price to predict the closing stock price of Apple on (nxt) 7 day.

In [None]:
# import
import math
# import pandas_datareader as web
from yahoo_download import get_yahoo_hist_df, DATETIME_FORMAT
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

In [None]:
df = get_yahoo_hist_df('AAPL', start_str='2009-01-01')
df['Date'] = pd.to_datetime(df['Date'], format=DATETIME_FORMAT)
actual_df = df[['Date', 'Close']]

In [None]:
# visualize
plt.figure(figsize = (16, 8))
plt.title('Close Price History')
plt.plot(actual_df['Date'], actual_df['Close'])
plt.xlabel('Date', fontsize=18)
plt.ylabel('Close Price', fontsize=18)
plt.show()

In [None]:
# use size days data, to predict nxt day's price.
# 60, 1 => 60 days price, to predict 61th day price.
# 60, 7 => same 60 days price, to predict 67th day price.
size = 60
nxt = 10

In [None]:
# create a dataframe with only the 'close' column
data = df.filter(['Close'])
# convert to numpy array
dataset = data.values
# scale the data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(dataset)
scaled_data

In [None]:
# the number of rows to train the model
# training_data_len = math.ceil(len(dataset) * 0.8)
training_data_len = len(dataset) - 200
print(f'training_data={training_data_len}, size={size}, next={nxt}')
if (training_data_len + size + nxt >= len(dataset)):
    raise SystemExit('dataset is too small !!!')

In [None]:
# create the training dataset
# split into x_train and y_train
x_train = []
y_train = []
for i in range(size, training_data_len):
    x_train.append(scaled_data[i - size : i, 0])
    y_train.append(scaled_data[i + (nxt - 1), 0])

In [None]:
# convert teh x_train and y_train to numpy arrays
x_train, y_train = np.array(x_train), np.array(y_train)
x_train.shape

In [None]:
#x_train.shape.to_list()

In [None]:
# reshape the data
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
x_train.shape

In [None]:
# build the LSTM model 
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(x_train.shape[1], 1)))
model.add(LSTM(50, return_sequences=False))
model.add(Dense(25))
model.add(Dense(1))

In [None]:
# compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

In [None]:
# train the model
# this takes a long long time.
# model.fit(x_train, y_train, batch_size=1, epochs=1)
model.fit(x_train, y_train, epochs=20)

## done model training

In [None]:
# get the predicted values
predictions = model.predict(x_train)
predictions = scaler.inverse_transform(predictions)
train_df = pd.DataFrame(predictions, columns=['Train'])

In [None]:
train_df = pd.DataFrame(predictions, columns=['TrainData'])
train_df['Date'] = actual_df['Date'].iloc[size:size + train_df.shape[0]].values
train_df.head(3)

In [None]:
# testing dataset
# fill future y_test with last valid one 
x_test = []
y_test = []
y_date_len = len(scaled_data) - (nxt - 1)
for i in range(training_data_len, y_date_len):
    x_test.append(scaled_data[i - size : i, 0])
    y_test.append(scaled_data[i + (nxt - 1), 0])

In [None]:
# convert to numpy array
x_test = np.array(x_test)
# reshape
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))

In [None]:
# get the predicted values
predictions = model.predict(x_test)
predictions = scaler.inverse_transform(predictions)

In [None]:
test_df = pd.DataFrame(predictions, columns=['TestData'])
test_df['Date'] = actual_df['Date'].iloc[training_data_len + nxt - 1 : ].values
test_df.head(3)

In [None]:
# get the root mean square error (RMSE)
youtube = np.sqrt(np.mean(predictions - y_test) ** 2)
rmse = np.sqrt(np.mean((predictions - y_test) ** 2))
print(f'youtube={youtube}, rmse={rmse}')

In [None]:
train_delta_df = pd.merge(actual_df, train_df, on='Date')
train_delta_df['TrainDelta'] = train_delta_df.apply(lambda row: row['TrainData'] - row['Close'], axis='columns')

In [None]:
test_delta_df = pd.merge(actual_df, test_df, on='Date')
test_delta_df['TestDelta'] = test_delta_df.apply(lambda row: row['TestData'] - row['Close'], axis='columns')

In [None]:
# Visualize
plt.figure(figsize=(16, 8))
plt.title('Model')
plt.xlabel('Date', fontsize=18)
plt.ylabel('Close Price', fontsize = 18)
plt.plot(actual_df['Date'], actual_df['Close'])
plt.plot(train_df['Date'], train_df['TrainData'])
plt.plot(test_df['Date'], test_df['TestData'])
plt.plot(test_delta_df['Date'], test_delta_df['TestDelta'])
plt.legend(['Actual', 'Train', 'Test', 'TestDelat'], loc='upper left')
plt.show()

In [None]:
# Visualize
title = str(nxt) + ' Trading Days Predication'
plt.figure(figsize=(16, 8))
plt.title(title)
plt.xlabel('Date', fontsize=18)
plt.ylabel('Delta', fontsize = 18)
plt.plot(test_delta_df['Date'], test_delta_df['Close'])
plt.plot(test_delta_df['Date'], test_delta_df['TestData'])
plt.plot(test_delta_df['Date'], test_delta_df['TestDelta'])
plt.legend(['Close', 'TestData', 'TestDelta'], loc='upper left')
plt.show()

In [None]:
test_delta_df