In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from google.colab import drive, files # google colab specific
import requests
import pandas as pd
import os
import warnings
import sys
import matplotlib.pyplot as plt
import json
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow as tf
from tensorflow import keras
from keras import layers
import math
from sklearn.metrics import mean_squared_error

In [None]:
# Read in the csv file that contains the time series data of the Shiller Index
# Create the dataframe

df = pd.read_csv('')
df.head()

In [None]:
# Reset the index based on the prices
df1 = df.reset_index()['index value']
df1

In [None]:
# Visualize the data for reference. 
plt.plot(df1)

In [None]:
# Normalize the data with the range 0 - 1
scaler = MinMaxScaler(feature_range=(0,1))

# Apply the scaler to the dataframe
df1 = scaler.fit_transform(np.array(df1).reshape(-1,1))

# Print df1 to check the change
print(df1)

In [None]:
# Split data into train, validation and test set. Ratio: 7/2/1
training_size = int(len(df1)*0.7)
validation_size = int(len(df1)*0.9) 
test_size = int(len(df1))

train_set = df1[0:training_size,:]
validation_set = df1[training_size:validation_size,:]
test_set = df1[validation_size:test_size,:1]



In [None]:
# Check sizes and set sizes
training_size, validation_size, test_size
train_set.shape
validation_set.shape
test_set.shape

In [None]:
# The current array of values needs to be converted into a dataset matrix
def create_dmatrix(dataset, time_step=1):
  data_X, data_Y = [], []
  for i in range(len(dataset)-time_step-1):
    k = dataset[i:(i + time_step), 0]
    data_X.append(k)
    data_Y.append(dataset[i + time_step, 0])
  return np.array(data_X), np.array(data_Y)

In [None]:
# Using the function already created for the conversion, convert training, validation and test sets. 
time_step = 20
X_train, Y_train = create_dmatrix(train_set, time_step)
X_val, Y_val = create_dmatrix(validation_set, time_step)
X_test, Y_test = create_dmatrix(test_set, time_step)

print('X Train set shape:', X_train.shape)
print('y Train set shape:', Y_train.shape)

print('X Validation set shape:', X_val.shape)
print('y Validation set shape:', Y_val.shape)

print('X Test set shape:', X_test.shape)
print('y Test set shape:', Y_test.shape)


In [None]:
# In order to feed the dataset to the neural network the data must be in 3D

X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

In [None]:
# Hyperparameters
batch_size = 64
epochs = 500
learning_rate = 0.001

In [None]:
# LSTM MODEL ARCHITECTURE

model = keras.models.Sequential([
  keras.Input(shape=(20, 1)),
  # Masking layer, to ignore zeros.
  # keras.layers.Masking(),
  # 4 LSTM Layers with 16 units.
  keras.layers.LSTM(units=16, return_sequences=True),
  keras.layers.LSTM(units=16, return_sequences=True),
  keras.layers.LSTM(units=16, return_sequences=True),
  keras.layers.LSTM(units=16, dropout=0.5, return_sequences=True),
  # Fully Connected Layer
  keras.layers.Dense(units=1)
])
model.summary()

In [None]:
# Loss and Optimizer Functions
mse = tf.keras.losses.MeanSquaredError()
rmse = tf.keras.metrics.RootMeanSquaredError()
model.compile(loss=mse, optimizer=keras.optimizers.Adam(learning_rate=learning_rate), metrics=[rmse])

In [None]:
# Callbacks.

callbacks = [EarlyStopping(monitor='val_loss', patience=100, verbose=1), 
            ModelCheckpoint(filepath='/content/sample_data/lstm-model.h5', verbose=1, monitor='val_loss', save_best_only=True, save_weights_only=False)]

# Train the model
history = model.fit(x=X_train, y=Y_train, validation_data =(X_val, Y_val), epochs=epochs, batch_size=batch_size, shuffle=True, callbacks=callbacks)

In [None]:
# Model evaluation
test_predicted = model.predict(X_test)

In [None]:
# Apply inverse transformation
test_predicted= scaler.inverse_transform(test_predicted)


In [None]:
# Calculate RMSE
math.sqrt(mean_squared_error(Y_test,test_predicted))


In [None]:
# Plotting everything together

look_back=20
# shift test predictions for plotting
test_predicted_plot = np.empty_like(df1)
test_predicted_plot[:, :] = np.nan
test_predicted_plot[validation_size+(look_back*2)+1:len(df1)-1, :] = test_predicted
# plot baseline and predictions
plt.plot(scaler.inverse_transform(df1))
plt.plot(test_predicted_plot)
plt.show()