In [13]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, accuracy_score
import pandas as pd
import tkinter as tk
from tkinter import filedialog

root = tk.Tk()
root.withdraw()

# model, data prep, and model run

# normalize data and assign movement direction values
def prep(dataset):
  scaler = StandardScaler()
  features = ['Open', 'High', 'Low', 'Close', 'Volume']
  dataset[features] = dataset[features].astype(float)
  dataset[features] = scaler.fit_transform(dataset[features])

  dataset['Target'] = np.where(dataset['Close'].shift(-1) > dataset['Close'], 1, 0)
  prepared_data = dataset.dropna()
  return prepared_data, scaler

# create LSTM model class
class LSTM_Model(nn.Module):
  def __init__(self, input_layer, hidden_layer, output_layer):
    super(LSTM_Model, self).__init__()
    self.hidden_layer = hidden_layer
    self.lstm = nn.LSTM(input_layer, hidden_layer, batch_first=True)
    self.linear_layer = nn.Linear(hidden_layer, output_layer)
    self.hidden_cell = (torch.zeros(1,1,self.hidden_layer),
                        torch.zeros(1,1,self.hidden_layer))

  # Define the forward pass of the LSTM_Model
  def forward(self, input_tensor):

    self.hidden_cell = (torch.zeros(1, input_tensor.size(0), self.hidden_layer), 
                        torch.zeros(1, input_tensor.size(0), self.hidden_layer))
    
    out, self.hidden_cell = self.lstm(input_tensor, self.hidden_cell)
    lstm_out_last = out[:, -1, :]
    predicted_values = self.linear_layer(lstm_out_last)
    return predicted_values

# create sequences for input data and corresponding labels
def create_sequence(input_data, sequence_length):
  sequences = []
  for i in range(len(input_data) - sequence_length):
    sequence = input_data[i : i + sequence_length, :-1]
    label = input_data[i + sequence_length, -1]
    sequences.append((sequence, label))
  return sequences

# train the model with data provided
def trainer(model, train_data, loss_func, opt, epochs):
  for epoch in range(epochs):
    for sequence, labels, in train_data:
      opt.zero_grad()
      model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer),
                           torch.zeros(1, 1, model.hidden_layer))
      
      sequence = torch.tensor(sequence).float()
      labels = torch.tensor(labels).float().view(-1, 1)

      # Initialize the hidden state at the start of each sequence
      model.hidden_cell = (torch.zeros(1, sequence.size(0), 
                                       model.hidden_layer),
                            torch.zeros(1, sequence.size(0), 
                                        model.hidden_layer))

      y = model(sequence)
      loss = loss_func(y, labels)
      loss.backward()
      opt.step()

    # print progress as the model runs
    if epoch % 25 == 1:
      print(f'Epoch {epoch} loss: {loss.item()}')

# make predictions using trained model
def predictor(model, test_data):
    model.eval()
    with torch.no_grad():
        predictions = []
        for sequence, _ in test_data:
            sequence = torch.tensor(sequence).float()

            # Initialize the hidden state at the start of each sequence
            model.hidden_cell = (torch.zeros(1, sequence.size(0), 
                                             model.hidden_layer),
                                 torch.zeros(1, sequence.size(0), 
                                             model.hidden_layer))
            
            y = model(sequence)
            predictions.append(torch.round(torch.sigmoid(y)).item())
    return predictions


# load and prep data

# get dataset
file_path = filedialog.askopenfilename()

ticker = pd.read_csv(file_path)
ticker, scaler = prep(ticker)

# create sequences
sequence_length = 10
sequences = create_sequence(ticker[['Open', 'High', 'Low', 'Close', 'Volume', 
                                    'Target']].values, sequence_length)

# split test/train and create dataloader
train_size = int(len(sequences) * 0.8) # set train size
train_sequences = sequences[ : train_size]
test_sequences = sequences[train_size : ]

train_data = torch.utils.data.DataLoader(train_sequences, shuffle=True, batch_size=1)
test_data = torch.utils.data.DataLoader(test_sequences, shuffle=True, batch_size=1)

# initialise model
model = LSTM_Model(input_layer=5, hidden_layer=50, output_layer=1)
loss_func = nn.BCEWithLogitsLoss() # is this the best one?
opt = optim.Adam(model.parameters(), lr=0.001) # and this?

# train
epochs = 150 # is this optimal?
trainer(model, train_data, loss_func, opt, epochs)

# run model and predict values
test_labels = [label for _, label in test_sequences]
predictions = predictor(model, test_data)

# calcluate statistics
accuracy = accuracy_score(test_labels, predictions)
cm = confusion_matrix(test_labels, predictions)


print(f'Confusion Matrix:\n{cm}')
print(f'Accuracy: {accuracy}') 



  sequence = torch.tensor(sequence).float()
  labels = torch.tensor(labels).float().view(-1, 1)


Epoch 1 loss: 0.833328366279602
Epoch 26 loss: 0.6814336180686951
Confusion Matrix:
[[47 59]
 [75 69]]
Accuracy: 0.464


  sequence = torch.tensor(sequence).float()


In [15]:
cm

array([[47, 59],
       [75, 69]], dtype=int64)