<a href="https://colab.research.google.com/github/sayarghoshroy/Recurrent_NN_Modelling/blob/master/Auto_Regressive_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# RNN for auto-regressive model

In [0]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler

In [0]:
X = [-1e10, 1e10, 5e10]
# X(0), X(1), X(2) being -1e10, -1e10, and 5e10

a_1 = 0.6
a_2 = - 0.5
a_3 = -0.2

# Generating Samples

for generate in range(4000):
    value = a_1 * X[-1] + a_2 * X[-2] + a_3 * X[-3] + np.random.uniform(0, 0.1)
    X.append(value)

X_train = np.asarray(X[3: 2003])
train_size = 2000

X_test = np.asarray(X[2003: 4003])
test_size = 2000

In [0]:
scaler = MinMaxScaler(feature_range = (-1, 1))
train_normalized = scaler.fit_transform(X_train.reshape(-1, 1))

In [0]:
normalized_train_set = torch.FloatTensor(train_normalized).view(-1)

In [0]:
def make_sequence(input_sequence, window):
    sequence = []
    size = len(input_sequence)

    for index in range(size - window):
        train_sequence = input_sequence[index: index + window]
        train_label = input_sequence[index + window: index + window + 1]
        # predict the next item given a window size

        sequence.append((train_sequence, train_label))
    
    return sequence

In [0]:
train_X_y = make_sequence(normalized_train_set, 3)
test_X_y = make_sequence(normalized_train_set, 3)

In [0]:
class LSTM(nn.Module):
    def __init__(self, input_size = 1, hidden_layer_size = 100, output_size = 1):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size

        self.lstm = nn.LSTM(input_size, hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)

        self.hidden_cell = (torch.zeros(1, 1, self.hidden_layer_size),
                            torch.zeros(1, 1, self.hidden_layer_size))
        
    def forward(self, input_sequence):
        input_size = len(input_sequence)
        rnn_out, self.hidden_cell = self.lstm(input_sequence.view(input_size, 1, -1), self.hidden_cell)
        predicted = self.linear(rnn_out.view(input_size, -1))
        return predicted[-1]

In [0]:
model = LSTM()
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), 1e-2)

In [10]:
# training:
num_epochs = 2
losses = []

for index in tqdm(range(num_epochs)):
    for X, y in train_X_y:
        optimizer.zero_grad()
        model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
                             torch.zeros(1, 1, model.hidden_layer_size))
        
        y_pred = model(X)
        loss = loss_function(y_pred, y)
        loss.backward()
        optimizer.step()
        losses.append(loss.item())

100%|██████████| 2/2 [00:08<00:00,  4.20s/it]


In [0]:
import pickle
f = open('losses_auto_reg.pickle', 'wb')
pickle.dump(losses, f)

In [0]:
# getting into prediction mode
model.eval()

test_sequence = normalized_train_set[-3: ].tolist()

for index in range(2000):
    seq = torch.FloatTensor(test_sequence[-3: ])
    with torch.no_grad():
        model.hidden = (torch.zeros(1, 1, model.hidden_layer_size),
                        torch.zeros(1, 1, model.hidden_layer_size))
        test_sequence.append(model(seq).item())

In [0]:
X_test_predicted = scaler.inverse_transform(np.asarray(test_sequence[3: ]).reshape(-1, 1)).reshape(2000, )

In [0]:
def R_squared(true, pred):
    means = np.array([np.mean(true) for elem in true])

    error_regr = np.power(np.linalg.norm(true - pred), 2)
    error_mean = np.power(np.linalg.norm(true - means), 2)

    return (1 - (error_regr / error_mean))

In [0]:
# Metrics:
MSE = np.linalg.norm(X_test_predicted - X_test) / test_size
MAE = np.sum(np.absolute(X_test_predicted - X_test)) / test_size
R2 = R_squared(X_test, X_test_predicted)

print("MSE: " + str(MSE))
print("MAE: " + str(MAE))
print("R-squared: " + str(R2))

In [0]:
# ^_^ Thank You