# LSTM (Long short-term memory)

In [34]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.metrics import root_mean_squared_error, r2_score

In [36]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    return sigmoid(x) * (1 - sigmoid(x))

def tanh(x):
    return np.tanh(x)

def tanh_derivative(x):
    return 1 - np.tanh(x)**2

In [37]:
class LSTM:
    def __init__(self, input_size, hidden_size, output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        self.weights_forget_gate = self.__init_weights((input_size + hidden_size, hidden_size))
        self.weights_input_gate = self.__init_weights((input_size + hidden_size, hidden_size))
        self.weights_state = self.__init_weights((input_size + hidden_size, hidden_size))
        self.weights_output_gate = self.__init_weights((input_size + hidden_size, hidden_size))
        self.weights_output = self.__init_weights((hidden_size, output_size))
        
        self.biases_forget_gate = np.zeros((hidden_size,))
        self.biases_input_gate = np.zeros((hidden_size,))
        self.biases_state = np.zeros((hidden_size,))
        self.biases_output_gate = np.zeros((hidden_size,))
        self.biases_output = np.zeros((output_size,))
        
        self.c_prev = np.zeros((hidden_size,))
        self.h_prev = np.zeros((hidden_size,))

    def __forward_propagation(self, x):
        x_and_h_prev = np.hstack([x, self.h_prev])
        ft = sigmoid(np.dot(x_and_h_prev, self.weights_forget_gate) + self.biases_forget_gate)
        it = sigmoid(np.dot(x_and_h_prev, self.weights_input_gate) + self.biases_input_gate)
        ct = tanh(np.dot(x_and_h_prev, self.weights_state) + self.biases_state)
        ot = sigmoid(np.dot(x_and_h_prev, self.weights_output_gate) + self.biases_output_gate)
        
        self.c_prev = ft * self.c_prev + it * ct
        self.h_prev = ot * tanh(self.c_prev)
        
        y = np.dot(self.h_prev, self.weights_output) + self.biases_output
        return y

    def __backward_propagation(self, x, dy, learning_rate):
        x_and_h_prev = np.hstack([x, self.h_prev])
        ft = sigmoid(np.dot(x_and_h_prev, self.weights_forget_gate) + self.biases_forget_gate)
        it = sigmoid(np.dot(x_and_h_prev, self.weights_input_gate) + self.biases_input_gate)
        ct = tanh(np.dot(x_and_h_prev, self.weights_state) + self.biases_state)
        ot = sigmoid(np.dot(x_and_h_prev, self.weights_output_gate) + self.biases_output_gate)
        
        dh = np.dot(dy, self.weights_output.T)
        dc = dh * ot * tanh_derivative(tanh(self.c_prev))
        
        dwf = np.outer(x_and_h_prev, ft * (1 - ft) * self.c_prev * dc)
        dwi = np.outer(x_and_h_prev, it * (1 - it) * ct * dc)
        dwc = np.outer(x_and_h_prev, (1 - ct**2) * it * dc)
        dwo = np.outer(x_and_h_prev, ot * (1 - ot) * tanh(self.c_prev) * dh)
        
        dbf = ft * (1 - ft) * self.c_prev * dc
        dbi = it * (1 - it) * ct * dc
        dbc = (1 - ct**2) * it * dc
        dbo = ot * (1 - ot) * tanh(self.c_prev) * dh
        
        self.weights_forget_gate -= learning_rate * dwf
        self.weights_input_gate -= learning_rate * dwi
        self.weights_state -= learning_rate * dwc
        self.weights_output_gate -= learning_rate * dwo
        
        self.biases_forget_gate -= learning_rate * dbf
        self.biases_input_gate -= learning_rate * dbi
        self.biases_state -= learning_rate * dbc
        self.biases_output_gate -= learning_rate * dbo
        
        self.c_prev = self.c_prev * ft + it * tanh(ct)


    def fit(self, x_train, y_train, epochs=100, learning_rate=0.001, batch_size=16):
        for epoch in range(epochs):
            perm = np.random.permutation(len(x_train))
            for i in range(0, len(x_train), batch_size):
                batch_indices = perm[i:i + batch_size]
                batch_x = x_train[batch_indices]
                batch_y = y_train[batch_indices]
                outputs = []
                for x in batch_x:
                    out = self.__forward_propagation(x)
                    outputs.append(out)
                gradients = 2 * (outputs - batch_y) / batch_size
                for j in range(len(gradients) - 1, 0, -1):
                    self.__backward_propagation(batch_x[j], gradients[j], learning_rate)
                    
    def predict(self, x):
        return self.__forward_propagation(x)
        
    def __init_weights(self, size):
        return np.random.uniform(-1, 1, size)

In [38]:
df = pd.read_csv('resources/Clear_steel_industry_data.csv')

In [39]:
x = df.drop('Usage_kWh', axis=1).values
y = df['Usage_kWh'].values.reshape(-1, 1)

In [40]:
scaler_x = StandardScaler()
scaler_y = StandardScaler()
x_scaled = scaler_x.fit_transform(x)
y_scaled = scaler_y.fit_transform(y)

In [41]:
x_train, x_test, y_train, y_test = train_test_split(x_scaled, y_scaled, test_size=0.2, shuffle=False, random_state=42)

In [42]:
lstm = LSTM(x_train.shape[1], 10, 1)
lstm.fit(x_train, y_train, epochs=100, learning_rate=0.001, batch_size=16)

In [43]:
def test_model(lstm, x_test, y_test):
    predictions = [lstm.predict(x.reshape(1, -1)) for x in x_test]
    predictions = np.array(predictions).reshape(-1, 1)

    y_test_inverse = scaler_y.inverse_transform(y_test)
    predictions_inverse = scaler_y.inverse_transform(predictions)
    print("R2 Score:\t", r2_score(y_test_inverse, predictions_inverse))
    print("RMSE:\t\t", root_mean_squared_error(y_test_inverse, predictions_inverse))

In [44]:
test_model(lstm, x_test, y_test)

R2 Score:	 0.7973077467102601
RMSE:		 14.124068777129098
