In [3]:
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler

def read_data_function(file_name):
    data_frame = pd.read_csv(file_name, sep = ",")
    data_frame = data_frame[["date", "open"]]
    data_frame["date"] = pd.to_datetime(data_frame["date"], format = "%Y-%m-%d")
    data_frame.index = data_frame.pop("date")
    #scaler = MinMaxScaler(feature_range = (-1, 1))
    #stock_price_val = data_frame["open"].values.reshape(-1,1)
    #data_frame["open"] = scaler.fit_transform(stock_price_val)


    return data_frame

In [4]:
import numpy as np
import pandas as pd
def data_prep_function(sequence_dataset,window_size):
    #sequence_dataset = [np.array(sequence_dataset[i*sliding_window_size: (i + 1) * sliding_window_size]) for i in range(len(sequence_dataset) // sliding_window_size)]
    #X = np.array([sequenc_dataset[i: i + num_steps] for i in range(len(sequence_dataset) - sliding_window_size)])
    #y = np.array([sequence_dataset[i + num_steps] for i in range(len(sequence_dataset) - sliding_window_size)])
    for i in range(1, window_size + 1):
        sequence_dataset[f"open-{i}"] = sequence_dataset["open"].shift(i)


    sequence_dataset.dropna(inplace = True)


    return sequence_dataset

In [5]:
from sklearn.preprocessing import MinMaxScaler
def matrix_formatting_and_normalize_dataset(sliding_window_dataset):
    sliding_window_dataset = sliding_window_dataset.to_numpy()
    scaler = MinMaxScaler()
    normalized_data = scaler.fit_transform(sliding_window_dataset)
    return normalized_data

In [6]:
from sklearn.model_selection import train_test_split
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, random_split
import torch
def feature_target_modeling(dataset):
    X = dataset[:, 1:]
    y = dataset[:,0]
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.1)
    X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
    X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
    y_train = np.reshape(y_train, (y_train.shape[0], 1))
    y_test = np.reshape(y_test, (y_test.shape[0], 1))
    X_train_tensor = torch.tensor(X_train, dtype = torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype = torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype = torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype = torch.float32)
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    train_size = int(0.8 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_sub, val_sub = random_split(train_dataset, [train_size, val_size])
    val_loader = torch.utils.data.DataLoader(val_sub, batch_size=16, shuffle=False)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = 16, shuffle = True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = 16, shuffle = False)
    return train_loader, val_loader, test_loader

In [7]:
import math
import torch.nn as nn
class CustomLSTM(nn.Module):
    def __init__(self, input_sz, hidden_sz, output_sz = 1):
        super().__init__()
        self.input_sz= input_sz
        self.hidden_size= hidden_sz
        self.W= nn.Parameter(torch.Tensor(input_sz, hidden_sz* 4))
        self.U= nn.Parameter(torch.Tensor(hidden_sz, hidden_sz* 4))
        self.bias= nn.Parameter(torch.Tensor(hidden_sz* 4))
        self.linear = nn.Linear(hidden_sz, output_sz)
        self.init_weights()

    def init_weights(self):
        stdv= 1.0/ math.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def forward(self, x,
                init_states=None):


        """Assumes x is of shape (batch, sequence, feature)"""
        bs, seq_sz, _= x.size()
        hidden_seq= []
        if init_states is None:

            h_t, c_t= (torch.zeros(bs, self.hidden_size).to(x.device),
                            torch.zeros(bs, self.hidden_size).to(x.device))
        else:

             h_t, c_t= init_states

        HS= self.hidden_size
        for t in range(seq_sz):
                        x_t= x[:, t, :]
            # batch the computations into a single matrix multiplication
                        gates= x_t@ self.W+ h_t@ self.U+ self.bias
                        i_t, f_t, g_t, o_t= (
                            torch.sigmoid(gates[:, :HS]),# input
                            torch.sigmoid(gates[:, HS:HS*2]),# forget
                            torch.tanh(gates[:, HS*2:HS*3]),
                            torch.sigmoid(gates[:, HS*3:]),# output
                        )
                        c_t= f_t* c_t+ i_t* g_t
                        h_t= o_t* torch.tanh(c_t)
                        hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq= torch.cat(hidden_seq, dim=0)
            # reshape from shape (sequence, batch, feature) to (batch, sequence, feature)
        hidden_seq= hidden_seq.transpose(0, 1).contiguous()
        output = self.linear(hidden_seq[:, -1, :])
        return output

In [8]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

def train_model(model: torch.nn.Module, train_dataloader : DataLoader,val_dataloader: DataLoader, epochs: int):
    model.train(True)
    optimizer = torch.optim.Adam(model.parameters())
    loss_function = torch.nn.MSELoss()
    loss_vectorize_train = np.zeros(epochs)
    loss_vectorize_val = np.zeros(epochs)

    for epoch in range(epochs):
        epoch_loss_train = 0
        for x_batch, y_batch in train_dataloader:
            optimizer.zero_grad()
            preds = model(x_batch)
            loss = loss_function(preds, y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss_train += loss.item()
            loss_vectorize_train[epoch] = epoch_loss_train / len(train_dataloader)

        model.eval()

        with torch.no_grad():
            epoch_loss_val = 0
            for x_batch_val, y_batch_val in val_dataloader:
                preds_2 = model(x_batch_val)
                loss = loss_function(preds_2, y_batch_val)
                epoch_loss_val += loss.item()
                loss_vectorize_val[epoch] = epoch_loss_val / len(val_dataloader)

        print(f"Epoch {epoch}: Train loss {loss_vectorize_train[epoch] : .6f} || Val loss {loss_vectorize_val[epoch] : .6f}")



    return loss_vectorize_train, loss_vectorize_val, y_batch[1]

In [9]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
def prediction_printer(test_loader : DataLoader, model):
  with torch.no_grad():
    for x_batch, y_batch in test_loader:
      prediction_test = model(x_batch).numpy()

  return prediction_test



In [10]:
data = read_data_function("/content/all_stocks_5yr.csv")

FileNotFoundError: [Errno 2] No such file or directory: '/content/all_stocks_5yr.csv'

In [None]:
window_data_10 = data_prep_function(data, window_size = 10)

In [None]:
formated_data_10 = matrix_formatting_and_normalize_dataset(window_data_10)

In [None]:
train_loader_10, val_loader_10, test_loader_10 = feature_target_modeling(formated_data_10)

In [None]:
model = CustomLSTM(1, 6)

In [None]:
loss_train_10, loss_val_10, y_train_10 = train_model(model, train_loader_10, val_loader_10,4)

In [None]:
predictions_10 = prediction_printer(test_loader_10, model)

In [None]:
window_data_50 = data_prep_function(data, window_size = 50)

In [None]:
formated_data_50 = matrix_formatting_and_normalize_dataset(window_data_50)

In [None]:
train_loader_50, val_loader_50, test_loader_50 = feature_target_modeling(formated_data_50)

In [None]:
loss_train_50, loss_val_50, y_train_50 = train_model(model, train_loader_50, val_loader_50,4)

In [None]:
prediciton_50 - prediction_printer(test_loader_50, model)

In [None]:
window_data_20 = data_prep_function(data, window_size = 20)

In [None]:
formated_data_20 = matrix_formatting_and_normalize_dataset(window_data_20)

In [None]:
train_loader_20, val_loader_20, test_loader_20 = feature_target_modeling(formated_data_20)

In [None]:
loss_train_20, loss_val_20, y_train_20 = train_model(model, train_loader_20, val_loader_20,4)

In [None]:
prediction_20 = prediction_printer(test_loader_20, model)

In [None]:
from google.colab import drive
drive.mount('/content/drive')