# LSTM from Scratch for TA

## import packages

In [1]:
import numpy as np
from numpy import *
from numpy.random import *
import matplotlib.pyplot as plt
import random
import math
from datetime import datetime
import pickle
import pandas as pd
%matplotlib inline

## Define Function of Activation Function

In [2]:
#predefined activation function and its derivative
def Sigmoid(x): 
    return 1. / (1 + np.exp(-x))

def dSigmoid(values): 
    return values*(1-values)

def Tanh(x):
    return (np.exp(x) - np.exp(-x))/(np.exp(x) + np.exp(-x))

def dtanh(values): 
    return 1. - values**2

def rand_arr(a, b, *args): 
    seed(0)
    return rand(*args)*(b - a) + a

## Define LSTM Class

In [3]:
class LSTM:
    def __init__(self, X_dim, y_dim, neuron):
        
        self.X_dim = X_dim 
        self.y_dim = y_dim
        self.neuron = neuron
        
        #initiate weight
        self.weight_f = rand_arr(-0.1, 0.1, neuron, X_dim + neuron)
        self.weight_i = rand_arr(-0.1, 0.1, neuron, X_dim + neuron)
        self.weight_g = rand_arr(-0.1, 0.1, neuron, X_dim + neuron) 
        self.weight_o = rand_arr(-0.1, 0.1, neuron, X_dim + neuron)
        self.weight_out = rand_arr(-0.1, 0.1, y_dim, neuron)
        
        #initiate bias
        self.bias_f = rand_arr(-0.1, 0.1, neuron) 
        self.bias_i = rand_arr(-0.1, 0.1, neuron) 
        self.bias_g = rand_arr(-0.1, 0.1, neuron) 
        self.bias_o = rand_arr(-0.1, 0.1, neuron)
        self.bias_out = rand_arr(-0.1, 0.1, y_dim)
    
    def forward(self, inputs, outputs):
        X = inputs 
        y = outputs
        
        self.X = X
        self.y = y
        #make list to save the values of every unit 
        self.X_concat = []
        self.f = []
        self.i = []
        self.g = []
        self.o = []
        self.s = [zeros(self.neuron)] #first s input is 0
        self.h = [zeros(self.neuron)] #first h input is 0
        self.out = []
        self.error = []
        for i in range(len(inputs)):
            #forward for every unit
            X_concat = np.hstack((X[i],  self.h[i]))
            self.X_concat.append(X_concat)
            self.f.append(Sigmoid(self.weight_f@X_concat + self.bias_f))
            self.i.append(Sigmoid(self.weight_i@X_concat + self.bias_i))
            self.g.append(Tanh(self.weight_g@X_concat + self.bias_g))
            self.o.append(Sigmoid(self.weight_o@X_concat + self.bias_o))
            self.s.append(self.f[i]*self.s[i] + self.i[i]*self.g[i])
            self.h.append(Tanh(self.s[i+1])*self.o[i])
            self.out.append(self.weight_out@self.h[i+1] + self.bias_out)
            self.error.append(abs(self.out[-1] - y[i]))
        #show error
        #print(np.array(self.error).mean())
    
    def backward(self, inputs, outputs):
        self.diff_out = []
        self.diff_h = []
        self.diff_h_bottom = [zeros(self.neuron)]
        self.diff_s_up = [zeros(self.neuron)]
        self.diff_s = []
        self.diff_o = []
        self.diff_g = []
        self.diff_i = []
        self.diff_f = []
        for i in range(len(inputs)-1,-1,-1):
            #derivate of output
            if i == len(inputs)-1:
                self.diff_out  = [2*(self.out[i] - outputs[i])] + self.diff_out
                self.diff_h = [self.diff_out[0]*dSigmoid(self.weight_out@self.h[i+1] + self.bias_out)*
                               self.weight_out + self.diff_h_bottom[0]] + self.diff_h
            else:
                self.diff_h = [self.diff_h_bottom[0]] + self.diff_h
            
            
            #derivative of gate/neural network operation
            self.diff_s = [self.diff_h[0]*self.o[i]*dtanh(self.s[i+1]) + self.diff_s_up[0]] + self.diff_s
            self.diff_o = [self.diff_h[0]*Tanh(self.s[i+1])] + self.diff_o
            self.diff_g = [self.i[i]*self.diff_s[0]] + self.diff_g
            self.diff_i = [self.g[i]*self.diff_s[0]] + self.diff_i
            self.diff_f = [self.s[i]*self.diff_s[0]] + self.diff_f
            
            #derivative of concatenation of input dan previous output value
            self.dX_concat = (self.diff_o[0].T*(dSigmoid(self.o[i]).reshape(self.neuron,1))).T@self.weight_o
            self.dX_concat += (self.diff_g[0].T*(dSigmoid(self.g[i]).reshape(self.neuron,1))).T@self.weight_g
            self.dX_concat += (self.diff_i[0].T*(dSigmoid(self.i[i]).reshape(self.neuron,1))).T@self.weight_i
            self.dX_concat += (self.diff_f[0].T*(dSigmoid(self.f[i]).reshape(self.neuron,1))).T@self.weight_f
            
            #update value of long and short term memory
            self.diff_h_bottom = [self.dX_concat[-1][self.X_dim:]] + self.diff_h_bottom
            self.diff_s_up = [self.diff_s[0]*self.f[i]] + self.diff_s_up
            
    def update(self, alpha):
        
        #alpha 
        
        #update everyweight and bias
        self.weight_out -= alpha*self.diff_out[0].reshape(1,1)@self.h[1].T.reshape(1,self.neuron)
        self.bias_out -= alpha*self.diff_out[0]
        
        self.weight_f -= alpha*(self.diff_f[0]*dSigmoid(self.weight_f@self.X_concat[0] + self.bias_f)).T@self.X_concat[0].reshape(1,self.X_dim + self.neuron)
        self.bias_f -= alpha*(self.diff_f[0]*dSigmoid(self.weight_f@self.X_concat[0] + self.bias_f)).reshape(self.neuron,)
        
        self.weight_i -= alpha*(self.diff_i[0]*dSigmoid(self.weight_i@self.X_concat[0] + self.bias_i)).T@self.X_concat[0].reshape(1,self.X_dim + self.neuron)
        self.bias_i -= alpha*(self.diff_i[0]*dSigmoid(self.weight_i@self.X_concat[0] + self.bias_i)).reshape(self.neuron,)
        
        self.weight_g -= alpha*(self.diff_g[0]*dSigmoid(self.weight_g@self.X_concat[0] + self.bias_g)).T@self.X_concat[0].reshape(1,self.X_dim + self.neuron)
        self.bias_g -= alpha*(self.diff_g[0]*dSigmoid(self.weight_g@self.X_concat[0] + self.bias_g)).reshape(self.neuron,)
        
        self.weight_o -= alpha*(self.diff_o[0]*dSigmoid(self.weight_o@self.X_concat[0] + self.bias_o)).T.reshape(self.neuron,1)@self.X_concat[0].reshape(1,self.X_dim + self.neuron)
        self.bias_o -= alpha*(self.diff_o[0]*dSigmoid(self.weight_o@self.X_concat[0] + self.bias_o)).reshape(self.neuron,)
    
    def predict(self, inputs, outputs):
        X = inputs
        y = outputs
        self.X = X
        self.y = y
        #make list to save the values of every unit 
        self.X_concat = []
        self.f = []
        self.i = []
        self.g = []
        self.o = []
        self.s = [zeros(self.neuron)] #first s input is 0
        self.h = [zeros(self.neuron)] #first h input is 0
        self.out = []
        self.error = []
        for i in range(len(inputs)):
            #forward for every unit
            X_concat = np.hstack((X[i],  self.h[i]))
            self.X_concat.append(X_concat)
            self.f.append(Sigmoid(self.weight_f@X_concat + self.bias_f))
            self.i.append(Sigmoid(self.weight_i@X_concat + self.bias_i))
            self.g.append(Tanh(self.weight_g@X_concat + self.bias_g))
            self.o.append(Sigmoid(self.weight_o@X_concat + self.bias_o))
            self.s.append(self.f[i]*self.s[i] + self.i[i]*self.g[i])
            self.h.append(Tanh(self.s[i+1])*self.o[i])
            self.out.append(self.weight_out@self.h[i+1] + self.bias_out)
            self.error.append(abs(self.out[-1] - y[i]))
        
        return [self.out[-1][0],self.y[-1]]
    
    def show_progress(self):
        return abs(self.y[-1][0] - self.out[-1])

## Change the Indicators Here!

In [4]:
#hyperparameters
epochs  = 10
nc      = 10 #units

#datasets
name_dataset = 'UNVR-long'

#column_dataset_obs = 'Close'

## Import Dataset

In [5]:
df = pd.read_csv('../Datasets/'+name_dataset+'.csv')
# df.drop('Volume', inplace=True, axis=1)
df

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2017-01-02,7760.0,7760.0,7760.0,7760.0,6857.312500,0
1,2017-01-03,7800.0,7840.0,7760.0,7765.0,6861.731445,5014500
2,2017-01-04,7770.0,8070.0,7770.0,8040.0,7104.741699,10642000
3,2017-01-05,8100.0,8110.0,8005.0,8110.0,7166.599121,10493500
4,2017-01-06,8060.0,8190.0,7965.0,8120.0,7175.434570,5400500
...,...,...,...,...,...,...,...
1257,2021-12-24,4090.0,4210.0,4080.0,4200.0,4200.000000,11979500
1258,2021-12-27,4200.0,4230.0,4170.0,4210.0,4210.000000,7238100
1259,2021-12-28,4210.0,4230.0,4180.0,4180.0,4180.000000,7006900
1260,2021-12-29,4180.0,4190.0,4120.0,4120.0,4120.000000,9030700


In [6]:
df.describe()
new_df = df.drop(["Date","Adj Close"], axis=1)

In [7]:
new_df

Unnamed: 0,Open,High,Low,Close,Volume
0,7760.0,7760.0,7760.0,7760.0,0
1,7800.0,7840.0,7760.0,7765.0,5014500
2,7770.0,8070.0,7770.0,8040.0,10642000
3,8100.0,8110.0,8005.0,8110.0,10493500
4,8060.0,8190.0,7965.0,8120.0,5400500
...,...,...,...,...,...
1257,4090.0,4210.0,4080.0,4200.0,11979500
1258,4200.0,4230.0,4170.0,4210.0,7238100
1259,4210.0,4230.0,4180.0,4180.0,7006900
1260,4180.0,4190.0,4120.0,4120.0,9030700


In [8]:
#plt.plot(new_df)


## MinMax Scaler

In [9]:
from sklearn.preprocessing import MinMaxScaler

# Get the number of rows in the data
nrows = new_df.shape[0]
print(nrows)
# Convert the data to numpy values
np_data_unscaled = np.array(new_df)
print(np_data_unscaled)
np_data = np.reshape(np_data_unscaled, (nrows, -1))
print(np_data.shape)

# Transform the data by scaling each feature to a range between 0 and 1
scaler = MinMaxScaler(feature_range=(0,1))
np_data_scaled = scaler.fit_transform(np_data_unscaled)

np_data_scaled

1262
[[7.7600e+03 7.7600e+03 7.7600e+03 7.7600e+03 0.0000e+00]
 [7.8000e+03 7.8400e+03 7.7600e+03 7.7650e+03 5.0145e+06]
 [7.7700e+03 8.0700e+03 7.7700e+03 8.0400e+03 1.0642e+07]
 ...
 [4.2100e+03 4.2300e+03 4.1800e+03 4.1800e+03 7.0069e+06]
 [4.1800e+03 4.1900e+03 4.1200e+03 4.1200e+03 9.0307e+06]
 [4.1300e+03 4.1700e+03 4.1100e+03 4.1100e+03 7.6331e+06]]
(1262, 5)


array([[0.53261601, 0.50193548, 0.53658537, 0.53532609, 0.        ],
       [0.53799597, 0.51225806, 0.53658537, 0.53600543, 0.01998575],
       [0.533961  , 0.54193548, 0.53794038, 0.57336957, 0.04241466],
       ...,
       [0.05514459, 0.04645161, 0.05149051, 0.04891304, 0.02792664],
       [0.05110962, 0.04129032, 0.04336043, 0.04076087, 0.03599268],
       [0.04438467, 0.03870968, 0.04200542, 0.03940217, 0.03042242]])

In [10]:

# scaler = MinMaxScaler(feature_range=(0,1))
# new_df = scaler.fit_transform(np.array(new_df).reshape(-1,1))
# new_df

In [11]:
xs = np_data_scaled

In [12]:
xs

array([[0.53261601, 0.50193548, 0.53658537, 0.53532609, 0.        ],
       [0.53799597, 0.51225806, 0.53658537, 0.53600543, 0.01998575],
       [0.533961  , 0.54193548, 0.53794038, 0.57336957, 0.04241466],
       ...,
       [0.05514459, 0.04645161, 0.05149051, 0.04891304, 0.02792664],
       [0.05110962, 0.04129032, 0.04336043, 0.04076087, 0.03599268],
       [0.04438467, 0.03870968, 0.04200542, 0.03940217, 0.03042242]])

In [26]:
#Standarized the dataset
N       = xs.shape[0]

#hypreparameter
Ts      = 1
x_dim   = 1
alpha   = 0.01 #learningrate

xt      = xs[0:N-x_dim,:]

for i in range(x_dim):
    xt  = hstack((xt, xs[i+1:N-x_dim+i+1]))   



In [27]:
xt.shape[0]

1261

## Splitting Training and Testing

In [28]:
training_size = int(len(new_df)*0.8)
test_size = len(new_df)-training_size

In [29]:
training_size, test_size

(1009, 253)

In [145]:
X_train = xt[:training_size, 0:x_dim]     
y_train = xt[:training_size, x_dim:x_dim+1]  
X_test = xt[-test_size:, 0:x_dim]     
y_test = xt[-test_size:, x_dim:x_dim+1]

In [146]:
y_test

array([[0.49096774],
       [0.49419355],
       [0.48774194],
       [0.47483871],
       [0.4683871 ],
       [0.46516129],
       [0.47483871],
       [0.4683871 ],
       [0.4683871 ],
       [0.45548387],
       [0.43612903],
       [0.43935484],
       [0.44903226],
       [0.43935484],
       [0.42967742],
       [0.41354839],
       [0.41032258],
       [0.4716129 ],
       [0.53290323],
       [0.49096774],
       [0.4683871 ],
       [0.45548387],
       [0.44580645],
       [0.46516129],
       [0.44903226],
       [0.42645161],
       [0.43290323],
       [0.41677419],
       [0.42      ],
       [0.44580645],
       [0.44580645],
       [0.44903226],
       [0.43612903],
       [0.42645161],
       [0.42322581],
       [0.42645161],
       [0.42645161],
       [0.41677419],
       [0.41677419],
       [0.41677419],
       [0.40064516],
       [0.40387097],
       [0.39419355],
       [0.39741935],
       [0.39096774],
       [0.40387097],
       [0.40387097],
       [0.384

## Build LSTM Model

In [147]:
me = LSTM(x_dim,1,nc)
for i in range(epochs):
    print('-------------',i+1,'-------------')
    for j in range(X_train.shape[0]-Ts):
        me.forward(X_train[j:j+Ts],y_train[j:j+Ts])
        me.backward(X_train[j:j+Ts],y_train[j:j+Ts])
        me.update(alpha)
    hehe = []
    for j in range(X_test.shape[0]-Ts):
        me.forward(X_test[j:j+Ts],y_test[j:j+Ts])
        me.backward(X_test[j:j+Ts],y_test[j:j+Ts])
        me.update(alpha)
        hehe.append(me.show_progress())    
    print(np.array(hehe).mean())

------------- 1 -------------
0.08985839442542537
------------- 2 -------------
0.08981778768745734
------------- 3 -------------
0.08977720524952713
------------- 4 -------------
0.08973664983463546
------------- 5 -------------
0.0896961240824531
------------- 6 -------------
0.08965563056229947
------------- 7 -------------
0.08961517176530295
------------- 8 -------------
0.0895747500967086
------------- 9 -------------
0.08953436786833936
------------- 10 -------------
0.08949402729121676


In [148]:
X_test.shape[0]

253

In [149]:
import matplotlib.pyplot as plt

out = []
for j in range(X_test.shape[0]-Ts):
    out.append(me.predict(X_test[j:j+Ts],y_test[j:j+Ts]))

new_output = []
for k in out:
    real_val = k[0].tolist()
    predict_val=k[1][0].tolist()
    new_output.append(real_val)
    new_output.append(predict_val)
    
new_output = np.reshape(new_output, (-1,2))
new_output

array([[0.09579386, 0.49096774],
       [0.09586221, 0.49419355],
       [0.09587196, 0.48774194],
       [0.0958427 , 0.47483871],
       [0.09582317, 0.4683871 ],
       [0.09580363, 0.46516129],
       [0.09580363, 0.47483871],
       [0.09580363, 0.4683871 ],
       [0.09581341, 0.4683871 ],
       [0.0957743 , 0.45548387],
       [0.09570574, 0.43612903],
       [0.09572534, 0.43935484],
       [0.09569593, 0.44903226],
       [0.09572534, 0.43935484],
       [0.09570574, 0.42967742],
       [0.09563701, 0.41354839],
       [0.09562718, 0.41032258],
       [0.0956075 , 0.4716129 ],
       [0.09594013, 0.53290323],
       [0.09586221, 0.49096774],
       [0.09578408, 0.4683871 ],
       [0.0957743 , 0.45548387],
       [0.09573514, 0.44580645],
       [0.09573514, 0.46516129],
       [0.09575473, 0.44903226],
       [0.09566648, 0.42645161],
       [0.09565666, 0.43290323],
       [0.09558782, 0.41677419],
       [0.09566648, 0.42      ],
       [0.09566648, 0.44580645],
       [0.

In [151]:
for l in new_output:
    y_test_unscaled = scaler.inverse_transform(l[0].reshape(1, 5))
    y_pred = scaler.inverse_transform(l[1].reshape(1, 5))

ValueError: cannot reshape array of size 1 into shape (1,5)

## Report Performance of LSTM Predictions

In [None]:
from math import sqrt

import math
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error

In [None]:
# mse = mean_squared_error(y, predictions)
# print('MSE: '+str(mse))
print('epoch: ' + str(epochs))
print('units: ' + str(nc))

rmse = math.sqrt(mean_squared_error(np.array(out)[:,1], np.array(out)[:,0]))
print('RMSE: '+ str("{:.2f}".format(rmse)))
mae = mean_absolute_error(np.array(out)[:,1], np.array(out)[:,0])
print('MAE: '+ str("{:.2f}".format(mae)))
mape = mean_absolute_percentage_error(np.array(out)[:,1], np.array(out)[:,0])
print('MAPE: '+ str("{:.2f}".format(mape*100)))

## Visualize Test Data the Prediction and Real Stock

In [None]:
from matplotlib.pylab import rcParams
rcParams['figure.figsize'] = 15, 6

In [None]:
plt.figure()
plt.plot(np.array(out)[:,1], color = 'red', label = 'Real Stock Price')
plt.plot(np.array(out)[:,0], color = 'blue', label = 'Predicted Stock Price')
plt.title(name_dataset +' Term Stock Prediction, epochs: '+ str(epochs) +'; units: '+str(nc))
plt.xlabel('Time')
plt.ylabel(name_dataset +' Stock Price '+ column_dataset_obs)
plt.legend()

## Save as a new Dataset

In [None]:
real_price = np.array(out)[:,1]
close_lstm = np.array(out)[:,0]

In [None]:
new_data = {'real_price': real_price,
            'Close_LSTM': close_lstm}


In [None]:
df_new_data = pd.DataFrame(new_data, columns = ['real_price', 'Close_LSTM'])


In [None]:
df_new_data.to_csv('../Datasets/dataset after prediction/'+name_dataset
                   +'_LSTM_'+ str(epochs) +'_'+ str(nc) + '.csv', index=False)