# Lab 7 Report: 
## Stock Prediction AI with Encoder-Decoder RNN

### Name:

In [None]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
import torch
import pandas as pd
import seaborn as sns

In [None]:
from IPython.display import Image # For displaying images in colab jupyter cell

In [None]:
Image('lab7_exercise.png', width = 1000)

In [None]:
# Seaborn plot styling
sns.set(style = 'white', font_scale = 2)

## Prepare Data

In [None]:
# Load stock datasets
# Pick one of three to train your model 
# Use 'closing price' column for training and testing

tesla = pd.read_csv('TSLA.csv') 
tesla_np = tesla.to_numpy()

google = pd.read_csv('GOOGL.csv') 
google_np = google.to_numpy()

dji = pd.read_csv('DJI.csv') 
dji_np = dji.to_numpy()
print(tesla_np.shape)

In [None]:
# Normalize your data and select training dataset (all the days except for last 100 days)
training_raw = tesla_np[:,5]
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
training_raw = scaler.fit_transform(training_raw.reshape(-1,1))
print(training_raw.shape)
# YOUR CODE HERE

In [None]:
# Define your encoder input sequence length, decoder output sequence length and testing sequence length
# Construct train_input_seqs and train_output_seqs according to 
# encoder input sequence length and decoder output sequence length similar to example task
encoder_inputseq_len = 7
decoder_outputseq_len = 3
testing_sequence_len = 50

num_samples = training_raw.shape[0] - encoder_inputseq_len - decoder_outputseq_len + 1

train_input_seqs = np.zeros((num_samples, encoder_inputseq_len, 1))
train_output_seqs = np.zeros((num_samples, decoder_outputseq_len, 1))



for i in range(num_samples):
    train_input_seqs[i] = training_raw[i:i+encoder_inputseq_len]
    train_output_seqs[i] = training_raw[i+encoder_inputseq_len:i+encoder_inputseq_len+decoder_outputseq_len]

# YOUR CODE HERE

In [None]:
# Make sure train_input_seqs and train_output_seqs have correct dimensions as expected
# (sample size, sequence length, # of features / timestep)

print("Encoder Training Inputs Shape: ", train_input_seqs.shape)
print("Decoder Training Outputs Shape: ", train_output_seqs.shape)

## Define Model

In [None]:
class Encoder(torch.nn.Module):

    def __init__(self, input_size, hidden_size, num_layers):
        
        super(Encoder, self).__init__()

        self.lstm = torch.nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        
    def forward(self, input_seq, hidden_state):
        
        out, hidden = self.lstm(input_seq, hidden_state)
        
        return out, hidden     

class Decoder(torch.nn.Module):
    
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        
        super(Decoder, self).__init__()

        self.lstm = torch.nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)# YOUR CODE HERE  
        self.fc_decoder = torch.nn.Linear(hidden_size, output_size)
        
    def forward(self, input_seq, encoder_hidden_states):
        out, hidden = self.lstm(input_seq, encoder_hidden_states)
        output = self.fc_decoder(out)
        
        return output, hidden

class Encoder_Decoder(torch.nn.Module):
    
    def __init__(self, input_size, hidden_size, decoder_output_size, num_layers):

        super(Encoder_Decoder, self).__init__()

        self.Encoder = Encoder(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.Decoder = Decoder(input_size=input_size, hidden_size=hidden_size, output_size=decoder_output_size, num_layers=num_layers)
        


## Define Hyperparameters

In [None]:
torch.manual_seed(2)

Encoder_Decoder_RNN = Encoder_Decoder(input_size=1, hidden_size=20, decoder_output_size=1, num_layers=1)

learning_rate = 0.01
epochs = 50

batchsize = 10
num_features = 1

loss_func = torch.nn.MSELoss()
optimizer = torch.optim.Adam(Encoder_Decoder_RNN.parameters(), lr=learning_rate)



## Identify Tracked Values

In [None]:
train_loss_list=[]

## Train Model

In [None]:
# Convert training data into torch tensors

train_input_seqs = torch.from_numpy(train_input_seqs).float()
train_output_seqs = torch.from_numpy(train_output_seqs).float()
# Split training data into mini-batches
train_batches_features = torch.split(train_input_seqs, batchsize)[:-1]
train_batches_targets = torch.split(train_output_seqs, batchsize)[:-1]

batch_split_num = len(train_batches_features)
# YOUR CODE HERE

# Compute total number of mini-batches in training data


In [None]:

#problem seems to be with output and not input?

for epoch in range(epochs): 
    
    for k in range(batch_split_num): 
        
        hidden_state = None
        decoder_output_seq = torch.zeros(batchsize, decoder_outputseq_len, 1)
        optimizer.zero_grad()
        
        encoder_output, encoder_hidden = Encoder_Decoder_RNN.Encoder(train_batches_features[k], hidden_state)
        decoder_hidden = encoder_hidden
        
        
        print(encoder_output)
        decoder_input = train_batches_features[k][:, -1, :]
        
        decoder_input = torch.unsqueeze(decoder_input, 2)
        
        for t in range(decoder_outputseq_len):
            
            decoder_output, decoder_hidden = Encoder_Decoder_RNN.Decoder(decoder_input, decoder_hidden)
           
            decoder_output_seq[:, t, :] = torch.squeeze(decoder_output, 2)
            
            decoder_input = train_batches_targets[k][:, t, :]
            decoder_input = torch.unsqueeze(decoder_input, 2)

        loss = loss_func(torch.squeeze(decoder_output_seq), torch.squeeze(train_batches_targets[k]))
        
        train_loss_list.append(loss.item())
        
        loss.backward()
        
        optimizer.step()
    
    print("Averaged Training Loss for Epoch ", epoch,": ", np.mean(train_loss_list[-batch_split_num:]))


## Visualize & Evaluate Model

In [None]:
plt.figure(figsize = (12, 7))

plt.plot(np.convolve(train_loss_list, np.ones(100), 'valid') / 100, 
         linewidth = 3, label = 'Rolling Averaged Training Loss')
plt.ylabel("training loss")
plt.xlabel("Iterations")
plt.legend()
sns.despine()

In [None]:
# Define your testing sequence

test_input_seq = training_raw[-200:-150]
print(type(test_input_seq))

In [None]:
# We can visualize the testing sequence

plt.figure(figsize = (10, 5))
plt.plot(test_input_seq, linewidth = 3)
plt.title('Test Sequence')
sns.despine()

### Generate signal predictions for testing sequence with trained Encoder-Decoder

In [None]:
# USE TEACHER FORCING METHOD WHEN GENERATING OUTPUTS FROM DECODER
# See slide 42 of Lab 5 or Lab 5 part 2 video to recap the concept of teacher forcing method
# When generating decoder outputs, make sure each input to decoder at timestep t has the shape (1,1,1)
# i.e., num_samples = 1, sequence_len = 1, num_features = 1 

# YOUR CODE HERE

test_input_seq = torch.from_numpy(test_input_seq).float()

decoder_output_seq = torch.zeros(50, num_features)

decoder_output_seq[:encoder_input_length] = test_input_seq[:encoder_input_length]


pred_start_ind = 0

with torch.no_grad():
    
    while pred_start_ind + encoder_input_length + decoder_output_length < test_sequence_length:
        
        hidden_state = None
        
        input_test_seq = decoder_output_seq[pred_start_ind:pred_start_ind + encoder_input_length]
        input_test_seq = torch.unsqueeze(input_test_seq, 0)
        
        encoder_output, encoder_hidden = Encoder_Decoder_RNN.Encoder(input_test_seq, hidden_state)
        decoder_hidden = encoder_hidden
        
        decoder_input = input_test_seq[:, -1, :]
        decoder_input = torch.unsqueeze(decoder_input, 2)
        
        for t in range(decoder_output_length):
            
            decoder_output, decoder_hidden = Encoder_Decoder_RNN.Decoder(decoder_input, decoder_hidden)
            decoder_output_seq[pred_start_ind + encoder_input_length + t] = torch.squeeze(decoder_output)
            decoder_input = decoder_output
        
        pred_start_ind += decoder_output_length


In [None]:
# Visualize predicted stock sequence vs the ground truth

plt.figure(figsize = (10, 5))

plt.plot(test_input_seq, linewidth = 3, label = 'GroundTruth')
plt.plot(decoder_output_seq, linewidth = 3, label = 'RNN Predicted')
plt.title('RNN Predicted vs GroundTruth')
plt.legend()
sns.despine()

In [None]:
# Compute the MSE error between test_input_seq and decoder_output_seq and print the value as Test MSE Error

print(loss_func(decoder_output_seq, test_input_seq).item())