In [None]:
import matplotlib.pyplot as plt # plotting library
import numpy as np # this module is useful to work with numerical arrays
import pandas as pd 
import random 
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import Dataset,DataLoader,random_split
from torch.utils.data.sampler import SubsetRandomSampler
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import math
import glob

In [None]:
train_data_full , train_ma_full= [],[]
test_data_full , test_ma_full= [],[]

files = glob.glob("/content/drive/MyDrive/AI_FYP/*1h.csv")

train_data_full , train_ma_full= [],[]
test_data_full , test_ma_full= [],[]
for f in files:
  data_df = pd.read_csv(f)

  data_df = data_df[["close"]]

  data = data_df.pct_change().dropna()

  l = data.shape[0]
  idx = int(l*0.8)
  train_data= data[:idx]
  test_data = data[idx:]

  # tmp_train = train_data
  # for i in range(99):
  #   tmp_train = np.hstack((tmp_train,train_data))

  train_data_sli = np.lib.stride_tricks.sliding_window_view(train_data,(168+1,train_data.shape[1])).squeeze().astype(np.float32)

  # tmp_test = test_data
  # for i in range(99):
  #   tmp_test = np.hstack((tmp_test,test_data))

  test_data_sli = np.lib.stride_tricks.sliding_window_view(test_data,(168+1,test_data.shape[1])).squeeze().astype(np.float32)

  train_data_full.append(train_data_sli)

  test_data_full.append(test_data_sli)

In [None]:
train_data_full=np.concatenate(train_data_full)
test_data_full=np.concatenate(test_data_full)

In [None]:
train_x,train_y = train_data_full[:,:-1],train_data_full[:,-1]
test_x,test_y = test_data_full[:,:-1],test_data_full[:,-1]

In [None]:
class ClassDataset(Dataset):
    def __init__(self, input,output,threshold):
          self.input = torch.tensor(input)

          label = []
          for num in output:
            # row = [0,0,0]
            row = [0,0]
            if num>threshold:
              row[1]=1
            elif num<-threshold:
              row[0]=1
            else:
              pass
              # row[1]=1
            label.append(row)
          label = np.array(label).astype(np.float32)
          self.label = torch.tensor(label)
          self.output = torch.tensor(output)

    def __len__(self):
        return len(self.input)

    def __getitem__(self, idx):
        x = self.input[idx].T
        y = self.label[idx]

        return x,y

class line(nn.Module):
  def __init__(self,in_lay,out_lay):
    super().__init__()
    self.lin = nn.Linear(in_lay,out_lay)
    self.norm = nn.BatchNorm1d(out_lay)
    self.act = nn.LeakyReLU(True)
  
  def forward(self,x):
    y = self.lin(x)
    y = self.norm(y)
    y = self.act(y)
    return y


class Classifier(nn.Module):
    def __init__(self, input_fea,hidden_units):
        super().__init__()
        self.input_fea = input_fea  # this is the number of features
        self.hidden_units = hidden_units
        self.num_layers = 1

        self.lstm = nn.LSTM(
            input_size=input_fea,
            hidden_size=hidden_units,
            batch_first=True,
            num_layers=self.num_layers
        )

    def forward(self,x):
      batch_size = x.shape[0]
      h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
      c0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).requires_grad_()
        
      _, (hn, _) = self.lstm(x, (h0, c0))
      x = self.linear(hn[0]).flatten()
      return x

In [None]:
train_set = ClassDataset(train_x.reshape(train_x.shape[0],train_x.shape[1],1),train_y,0)
test_set = ClassDataset(test_x.reshape(test_x.shape[0],test_x.shape[1],1),test_y,0)

BATCH_SIZE = 1024
train_loader = DataLoader(train_set,batch_size=BATCH_SIZE)
test_loader = DataLoader(test_set,batch_size=BATCH_SIZE)

In [None]:
for x,l in train_loader:
  print(x.shape)
  break

torch.Size([1024, 1, 168])


In [None]:
def multi_acc(y_pred, y_test):
    y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
    _, y_pred_tags = torch.max(y_pred_softmax, dim = 1) 
    _, lab = torch.max(y_test,dim=1)   
    
    correct_pred = (y_pred_tags == lab).float()
    acc = correct_pred.sum() / len(correct_pred)
    
    acc = torch.round(acc * 100)
    
    return acc,y_pred_tags

In [None]:
epochs = 50
lr = 0.0001
use_cuda = 1
device = torch.device("cuda" if (torch.cuda.is_available() & use_cuda) else "cpu")

model = Classifier().to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

In [None]:
for epoch in range(20)[:1]:  # loop over the dataset multiple times

    total_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()
        print(inputs.size())
        # forward + backward + optimize
        outputs = model(inputs)
        print(outputs.shape)
        break
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        total_loss += loss.item()
    total_loss /= len(train_loader.dataset)
    if epoch % 1 ==0:
      print('[{}/{}] Loss:'.format(epoch+1, epochs), total_loss)
      with torch.no_grad():
        model.eval()
        test_acc = 0
        for inputs,labels in test_loader:
          inputs = inputs.to(device)
          labels = labels.to(device)
          outputs = model(inputs)

          acc ,pred_tag= multi_acc(outputs, labels)
          test_acc += acc.item()
        test_acc = test_acc/len(test_loader)
      print('[{}/{}] Test Accuracy:'.format(epoch+1, epochs), test_acc)
    

print('[{}/{}] Loss:'.format(epoch+1, epochs), total_loss)

torch.Size([1024, 1, 168])
torch.Size([1024, 32, 162])
[1/50] Loss: 0.0


RuntimeError: ignored