# LSTM

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import datetime
import torch.nn as nn
import torch
import torch.optim as optim

In [None]:
def initWeights(input_size, output_size):
    return np.random.uniform(-1, 1, (output_size, input_size)) * np.sqrt(6 / (input_size + output_size))

In [None]:
# Activation Functions 
def sigmoid(input, derivative = False):
    if derivative:
        return input * (1 - input)
    
    return 1 / (1 + np.exp(-input))

def tanh(input, derivative = False):
    if derivative:
        return 1 - input ** 2
    
    return np.tanh(input)

def softmax(input):
    return np.exp(input) / np.sum(np.exp(input))

In [None]:
# Long Short-Term Memory 
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        # Hyperparameters
        self.hidden_size = hidden_size

        # Forget Gate
        self.wf = nn.Parameter(torch.rand(input_size, hidden_size), requires_grad=True)
        self.bf = np.zeros((hidden_size, 1))

        # Input Gate
        self.wi = initWeights(input_size, hidden_size)
        self.bi = np.zeros((hidden_size, 1))

        # Candidate Gate
        self.wc = initWeights(input_size, hidden_size)
        self.bc = np.zeros((hidden_size, 1))

        # Output Gate
        self.wo = initWeights(input_size, hidden_size)
        self.bo = np.zeros((hidden_size, 1))

        # Final Gate
        self.wy = initWeights(hidden_size, output_size)
        self.by = np.zeros((output_size, 1))

    # Reset Network Memory
    def reset(self):
        self.concat_inputs = {}

        self.hidden_states = {-1:np.zeros((self.hidden_size, 1))}
        self.cell_states = {-1:np.zeros((self.hidden_size, 1))}

        self.activation_outputs = {}
        self.candidate_gates = {}
        self.output_gates = {}
        self.forget_gates = {}
        self.input_gates = {}
        self.outputs = {}

    # Forward Propogation
    def forward(self, X_):
        outputs = []
        self.reset()
        for q in range(len(X_)):

            self.concat_inputs[q] = np.concatenate((self.hidden_states[q - 1], X_[q].reshape(-1,1)))
        
            self.forget_gates[q] = sigmoid(np.matmul(self.wf, self.concat_inputs[q]) + self.bf)
            self.input_gates[q] = sigmoid(np.matmul(self.wi, self.concat_inputs[q]) + self.bi)
            self.candidate_gates[q] = tanh(np.matmul(self.wc, self.concat_inputs[q]) + self.bc)
            self.output_gates[q] = sigmoid(np.matmul(self.wo, self.concat_inputs[q]) + self.bo)

            self.cell_states[q] = self.forget_gates[q] * self.cell_states[q - 1] + self.input_gates[q] * self.candidate_gates[q]
            self.hidden_states[q] = self.output_gates[q] * tanh(self.cell_states[q])

            outputs += [np.matmul(self.wy, self.hidden_states[q]) + self.by]

        return np.array(outputs)
        
    # Backward Propogation
    def backward(self, errors, X, lr):
        d_wf, d_bf = 0, 0
        d_wi, d_bi = 0, 0
        d_wc, d_bc = 0, 0
        d_wo, d_bo = 0, 0
        d_wy, d_by = 0, 0


        dh_next, dc_next = np.zeros_like(self.hidden_states[0]), np.zeros_like(self.cell_states[0])
        for q in reversed(range(len(X))):

            error = errors[q]

            # Final Gate Weights and Biases Errors
            d_wy += np.matmul(error, self.hidden_states[q].T)
            d_by += error

            # Hidden State Error
            d_hs = np.matmul(self.wy.T, error) + dh_next

            # Output Gate Weights and Biases Errors
            d_o = tanh(self.cell_states[q]) * d_hs * sigmoid(self.output_gates[q], derivative = True)
            d_wo += np.matmul(d_o, X[q].T)
            d_bo += d_o

            # Cell State Error
            d_cs = tanh(tanh(self.cell_states[q]), derivative = True) * self.output_gates[q] * d_hs + dc_next

            # Forget Gate Weights and Biases Errors
            d_f = d_cs * self.cell_states[q - 1] * sigmoid(self.forget_gates[q], derivative = True)
            d_wf += np.matmul(d_f, X[q].T)
            d_bf += d_f

            # Input Gate Weights and Biases Errors
            d_i = d_cs * self.candidate_gates[q] * sigmoid(self.input_gates[q], derivative = True)
            d_wi += np.matmul(d_i, X[q].T)
            d_bi += d_i
            
            # Candidate Gate Weights and Biases Errors
            d_c = d_cs * self.input_gates[q] * tanh(self.candidate_gates[q], derivative = True)
            d_wc += np.matmul(d_c, X[q].T)
            d_bc += d_c

            # Concatenated Input Error (Sum of Error at Each Gate!)
            d_z = np.matmul(self.wf.T, d_f) + np.matmul(self.wi.T, d_i) + np.matmul(self.wc.T, d_c) + np.matmul(self.wo.T, d_o)

            # Error of Hidden State and Cell State at Next Time Step
            dh_next = d_z[:self.hidden_size, :]
            dc_next = self.forget_gates[q] * d_cs

        for d_ in (d_wf, d_bf, d_wi, d_bi, d_wc, d_bc, d_wo, d_bo, d_wy, d_by):
            np.clip(d_, -1, 1, out = d_)

        self.wf += d_wf * lr
        self.bf += d_bf * lr

        self.wi += d_wi * lr
        self.bi += d_bi * lr

        self.wc += d_wc * lr
        self.bc += d_bc * lr

        self.wo += d_wo * lr
        self.bo += d_bo * lr

        self.wy += d_wy * lr
        self.by += d_by * lr

    # predict 
    def predict(self,X):
        out = []
        for i in range(len(X)):
            out.append(self.forward(X[i]))
        return np.array(out)

    # Train
    def train(self, X, y, epochs,lr):

        y_new = np.hstack((X,y))
        y_new = y_new[:,1:]
        
                
        for _ in range(epochs):
            for i in range(len(X)):
                prediction = self.forward(X[i])
                errors = []
                for q in range(len(prediction)):
                    errors += [y_new[i][q]- prediction[q]]
                errors = np.array(errors)
                self.backward(errors, self.concat_inputs,lr)
            current_loss = np.mean(abs(errors))
            print(current_loss)