# Exploring the LSTM model for stock price prediction

In [None]:
import os 

In [None]:
os.getcwd()

In [None]:
os.chdir('/atlas/data19/guhitj/Erdos_DL/Erdos_v2/Erdos-2024-DL-Newsworthy/models')

### Load the data


In [None]:
import pandas as pd
import datetime
from tensorflow import keras
import data_engineering
import simulation
import numpy as np
import lstm_model

In [None]:
# load data
df_dict = data_engineering.separate_by_stock()

df_dict = data_engineering.fillna(df_dict)

cv_trades = [{}, {}, {}, {}]
cv_opens = [{}, {}, {}, {}]
for tick in df_dict:
    train, test = data_engineering.train_test_split(df_dict[tick])
    
    features = ["frob_comp", "pos_art_count", "total_articles", "Open_Diff", "y", "Open"]

    train, test = train[features], test[features]

    #accs = np.zeros(4)
    i = 0
    for train_idx, test_idx in data_engineering.get_cv_splits(train):
        cv_opens[i][tick] = train.loc[test_idx, "Open"]

        df_tt = train.loc[train_idx].drop(columns=['Open'])
        df_ho = train.loc[test_idx].drop(columns=['Open'])

        pred_change, trades = lstm_model.run_lstm_model(df_tt, df_ho)
        cv_trades[i][tick] = trades
        i+=1
    
for i in range(len(trades)):
    print(simulation.get_performance(cv_trades[i], cv_opens[i]))


In [None]:
def get_performance(trade_dict, test_dict):
    n = len(trade_dict["AAPL"])
    x_t = [1] * n
    for i in range(1,n):
        x_t[i] = x_t[i-1] / 2
        for tick in trade_dict:
            x_t[i] += (x_t[i-1] / 30) * (1 + trade_dict[tick][i-1] * (test_dict[tick][i] - test_dict[tick][i-1]) / test_dict[tick][i-1])
    return x_t[-1]


In [None]:
for i in range(4):
    print(get_performance(cv_trades[i], cv_opens[i]))

In [None]:
samp_trade = np.array([1,1])
samp_open = np.array([1,2])
samp_t_dict = {tick:samp_trade.copy() for tick in df_dict}
samp_o_dict = {tick:samp_open.copy() for tick in df_dict}

print(get_performance(samp_t_dict, samp_o_dict))

samp_trade = np.array([-1,1])
samp_open = np.array([1,2])
samp_t_dict = {tick:samp_trade.copy() for tick in df_dict}
samp_o_dict = {tick:samp_open.copy() for tick in df_dict}

print(get_performance(samp_t_dict, samp_o_dict))



In [None]:
cv_trades = [{}, {}, {}, {}]
cv_opens = [{}, {}, {}, {}]
for tick in df_dict:
    train, test = data_engineering.train_test_split(df_dict[tick])
    
    features = ["finvader_tot", "Open_Diff", "y", "Open"]

    train, test = train[features], test[features]

    #accs = np.zeros(4)
    i = 0
    for train_idx, test_idx in data_engineering.get_cv_splits(train):
        cv_opens[i][tick] = train.loc[test_idx, "Open"]

        df_tt = train.loc[train_idx].drop(columns=['Open'])
        df_ho = train.loc[test_idx].drop(columns=['Open'])

        pred_change, trades = lstm_model.run_lstm_model(df_tt, df_ho, epochs=4)
        cv_trades[i][tick] = trades
        i+=1
    
for i in range(len(trades)):
    print(simulation.get_performance(cv_trades[i], cv_opens[i]))

In [None]:
for i in range(4):
    print(get_performance(cv_trades[i], cv_opens[i]))

In [None]:

train, test = data_engineering.train_test_split(df_dict['AAPL'])
train, test = train[features], test[features]


## Scales the data

In [None]:
from sklearn.preprocessing import MinMaxScaler

X, y = train.drop(columns=['y']).values, train.y.values

X_test, y_test = test.drop(columns=['y']).values, test.y.values

scaler_X = MinMaxScaler(feature_range=(-1,1))
scaler_y = MinMaxScaler(feature_range=(-1,1))

# scales the open_diff column to be between -1 and 1
X[:,1:] = scaler_X.fit_transform(X[:,1:])
y = scaler_y.fit_transform(y.reshape(-1,1)).reshape(-1,)

X_test[:,1:] = scaler_X.fit_transform(X_test[:,1:])
y_test = scaler_y.transform(y_test.reshape(-1,1)).reshape(-1,)



## Fits the model using scaled data

## Creating the LSTM model

In [None]:
def fit_lstm(X, y, batch_size, nb_epoch, neurons):
    X = X.reshape(X.shape[0], 1, X.shape[1])
    model = keras.Sequential()
    model.add(keras.layers.LSTM(neurons, batch_input_shape=(1, X.shape[1], X.shape[2]), stateful=True))
    model.add(keras.layers.Dense(1))
    model.compile(loss='mean_squared_error', optimizer='adam')
    for i in range(nb_epoch):
        model.fit(X, y, epochs=1, batch_size=batch_size, verbose=1, shuffle=False)
        model.reset_states()
    return model

In [None]:
model = fit_lstm(X, y, 1, 10, 3)

In [None]:
def forecast_lstm(model, batch_size, X):
    X = X.reshape(-1, 1, X.shape[1])
    yhat = model.predict(X, batch_size=batch_size)
    return yhat[0,0]

In [None]:
forecast_lstm(model, 1, X_test)

In [None]:
from pandas import DataFrame
from pandas import Series
from pandas import concat
from pandas import read_csv
from pandas import datetime
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from math import sqrt
#from matplotlib import pyplot
import numpy as np
 
# date-time parsing function for loading the dataset
def parser(x):
    return datetime.strptime('190'+x, '%Y-%m')
 
# invert differenced value
def inverse_difference(history, yhat, interval=1):
    return yhat + history[-interval]
 
# scale train and test data to [-1, 1]
def scale(train, test):
    # fit scaler
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaler = scaler.fit(train)
    # transform train
    train = train.reshape(train.shape[0], train.shape[1])
    train_scaled = scaler.transform(train)
    # transform test
    test = test.reshape(test.shape[0], test.shape[1])
    test_scaled = scaler.transform(test)
    return scaler, train_scaled, test_scaled
 
# inverse scaling for a forecasted value
def invert_scale(scaler, X, value):
    new_row = [x for x in X] + [value]
    array = numpy.array(new_row)
    array = array.reshape(1, len(array))
    inverted = scaler.inverse_transform(array)
    return inverted[0, -1]
 
# fit an LSTM network to training data
def fit_lstm(X, y, batch_size, nb_epoch, neurons):
    X = X.reshape(X.shape[0], 1, X.shape[1])
    model = keras.Sequential()
    model.add(keras.layers.LSTM(neurons, batch_input_shape=(1, X.shape[1], X.shape[2]), stateful=True))
    model.add(keras.layers.Dense(1))
    model.compile(loss='mean_squared_error', optimizer='adam')
    for i in range(nb_epoch):
        model.fit(X, y, epochs=1, batch_size=batch_size, verbose=1, shuffle=False)
        model.reset_states()
    return model
 
# make a one-step forecast
def forecast_lstm(model, batch_size, X):
    X = X.reshape(1, 1, len(X))
    yhat = model.predict(X, batch_size=batch_size)
    return yhat[0,0]


X, y = train.drop(columns=['y']).values, train.y.values

X_test, y_test = test.drop(columns=['y']).values, test.y.values

scaler_X = MinMaxScaler(feature_range=(-1,1))
scaler_y = MinMaxScaler(feature_range=(-1,1))

# scales the open_diff column to be between -1 and 1
X[:,1:] = scaler_X.fit_transform(X[:,1:])
y = scaler_y.fit_transform(y.reshape(-1,1)).reshape(-1,)

X_test[:,1:] = scaler_X.fit_transform(X_test[:,1:])




model = fit_lstm(X, y, 1, 10, 3)



# forecast the entire training dataset to build up state for forecasting
X_reshaped = X.reshape(X.shape[0], 1, X.shape[1])
model.predict(X_reshaped, batch_size=1)
 
# walk-forward validation on the test data
predictions = list()
for i in range(X_test.shape[0]):
    # make one-step forecast
    yhat = forecast_lstm(model, 1, X_test[i,:])
    # invert scaling
    yhat = scaler_y.inverse_transform(np.array([yhat]).reshape(1,1))[0,0]
    
    # invert differencing
    #pred_open = open_prices.iloc[i] + yhat
    # store forecast
    predictions.append(yhat)
    
    actual = y_test[i]
    print('Day=%d, Predicted=%f, Expected=%f' % (i+1, yhat, actual))

print(len(predictions)==len(y_test))
# report performance
rmse = sqrt(mean_squared_error(y_test, predictions))
print('Test RMSE: %.3f' % rmse)
# line plot of observed vs predicted
#pyplot.plot(raw_values[-12:])
#pyplot.plot(predictions)
#pyplot.show()

In [None]:
pred_rise_fall = predictions / np.abs(predictions)
actual_rise_fall = y_test / np.abs(y_test)

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(pred_rise_fall, actual_rise_fall)

In [None]:
import lstm_model

In [None]:
pred_change, trades = lstm_model.run_lstm_model(train, test)

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score

In [None]:
accuracy_score(test.y / np.abs(test.y), trades)