Imports

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset

import torchvision
import torchvision.models as models
from torchvision import *
from torchvision import datasets
from torchvision.models.feature_extraction import create_feature_extractor

from torchmetrics import *

from torch.utils.data import *

import sklearn as sk
from sklearn import *
from sklearn import svm
from sklearn.preprocessing import MinMaxScaler

import pandas as pd

from itertools import compress


import matplotlib.pyplot as plt


import yfinance as yf


Data Processing

In [None]:
# time step to predict ahead, creates input sequences
# gonna have to figure out test
def create_lstm_data(data, time_step=1, future = 0,test_percent = .1):
    x_vec, y_vec = [], []
    # formats data so y = t and x = t-1, ... , t-time_steps
    for i in range(len(data) - time_step):
        split = i + time_step
        # checks if data is of same length for concatentation
        length = data[split : split + future, 0].shape[0]
        if(length == future):
            x_vec.append(data[i : split, 0].unsqueeze(0))
            y_vec.append(data[split : split + future, 0].unsqueeze(0))
    # calculate number of elements to allocate to test
    #dataset_length = len(x_vec)
    #num_of_train = dataset_length - (int)(test_percent * dataset_length)
    # concats x into matrix and y into vector, needs unsqueez to add single dimension for LSTM
    return torch.cat(x_vec,0).unsqueeze(-1), torch.cat(y_vec,0).unsqueeze(-1)#, torch.cat(x_vec[num_of_train:],0).unsqueeze(-1), torch.cat(y_vec[num_of_train:],0).unsqueeze(-1)

# create a dataset out of timeseries data, must be formatted first, tensor
# correct timeseries formatation
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self,i):
        return self.X[i], self.y[i]
    

Model Architecture

In [None]:
#possibly add attention
#possibly change how layers of encoder to decoder are handled (possibly make different sizes)
#add batch norm
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, dropout = 0.5):
        super(Encoder,self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.input_dim = input_dim
        self.rnn = nn.LSTM(input_dim, hidden_dim, n_layers, dropout=dropout,
                           batch_first=True, bidirectional = True)
        
        #to choose part of birdectinals were important
        self.fc_hidden = nn.Linear(hidden_dim*2, hidden_dim)
        self.fc_cell = nn.Linear(hidden_dim*2, hidden_dim)

    def forward(self, x):
        
        outputs, (hidden, cell) = self.rnn(x)
        
        #chooses which way of directionsal is most important
        hidden = self.fc_hidden(torch.cat((hidden[0:1],hidden[1:2]), dim = 2))
        cell = self.fc_cell(torch.cat((cell[0:1],cell[1:2]), dim = 2))
        
        return outputs, hidden, cell
    
class Decoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, dropout = 0.5):
        super(Decoder,self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(input_dim, hidden_dim, n_layers, dropout=dropout,
                           )
        
        #mini attention layer
        self.energy = nn.Linear(hidden_dim*3,1)
        self.softmax = nn.Softmax(dim=0)
        self.relu = nn.ReLU()
        
        self.fc_out = nn.Linear(hidden_dim, 1)

    #predicts a single time step, with either y pred as input or y
    def forward(self, y, encoder_output, prev_hidden, prev_cell):
        
        print(f"Decoder-input-shape: {y.shape}")
        print(f"Encoder-output-shape: {encoder_output.shape}")
        print(f"Prev-hidden-shape: {prev_hidden.shape}")
        
        #going to have to list dimensions of every vector to see what works
        y = y.unsqueeze(0)
        
        sequence_length = prev_hidden.shape[0]

        hidden_reshape = hidden.repeat(sequence_length,1,1)
        print(f"hidden-repeat-shape: {prev_hidden.shape}")
        energy = self.relu(self.energy(torch.cat((hidden_reshape,encoder_output),dim = 2)))
        print(f"energy-shape: {energy.shape}")
        attention = self.softmax(energy)
        
        #change dim of it to element wise multiply it with encoder states
        attention = attention.permute(1,2,0)
        encoder_states = encoder_states.permute(1,2,0)
        
        #change dim to fit with input(y)
        context_vector = torch.bmm(attention,encoder_states).permute(1,0,2)
        rnn_input = torch.cat((context_vector, y))
        
        output, (hidden, cell) = self.rnn(rnn_input, (prev_hidden, prev_cell))
        
        
        prediction = self.fc_out(output)
        
        prediction = prediction.squeeze(0)
        
        return prediction, hidden, cell
    
class EncoderDecoderWrapper(nn.Module):
    def __init__(self, encoder, decoder, output_size = 1, teacher_forcing=0.3, device = 'cpu'):
        super(EncoderDecoderWrapper,self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.teacher_forcing = teacher_forcing
        self.device = device
        self.output_size = output_size
        # might change if layers arent the same proves different results
        assert (
            encoder.hidden_dim == decoder.hidden_dim
        )
        assert (
            encoder.n_layers == decoder.n_layers
        )
    def forward(self, source, target=None):
        
        # num of elements in each batch
        batch_size = source.shape[0]
        #should be same size as output
        target_len = target.shape[1]
        
        assert(target_len == self.output_size)
        
        encoder_output, prev_hidden, prev_cell = self.encoder(source)

        prev_target = source[:,-1]
            
        outputs = torch.zeros(batch_size,self.output_size).to(self.device)
        
        for t in range(self.output_size):
            
            prediction, prev_hidden, prev_cell = self.decoder(prev_target, prev_hidden, prev_cell)

            outputs[:,t] = prediction.squeeze(1)
            
            #chance of using actual vs chance of using predicted in training
            prev_target = target[:,t] if torch.rand(1) < self.teacher_forcing or target != None else prediction
        
        return outputs