In [1]:
import torch
import numpy as np
import torch.nn as nn
import random
import itertools
import pickle
import torch.nn.functional as F
import os

In [2]:
BATCH_SIZE=32
EPOCHS=200

In [18]:
device='cuda'
class mark(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(mark, self).__init__()
        
        # Depthwise Convolution1
        self.depthwise1 = nn.Conv2d(
            in_channels, in_channels, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(in_channels)
        # Pointwise Convolution1
        self.pointwise1 = nn.Conv2d(
            in_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.maxpool1 = nn.MaxPool2d(kernel_size=3, stride=2)

        # Depthwise Convolution2
        self.depthwise2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn3 = nn.BatchNorm2d(out_channels)
        # Pointwise Convolution2
        self.pointwise2 = nn.Conv2d(
            out_channels, out_channels, kernel_size=1, stride=1, padding=0, bias=False
        )
        self.bn4 = nn.BatchNorm2d(out_channels)
        self.maxpool2 = nn.MaxPool2d(kernel_size=3, stride=2)

        #Linear layers
        self.fc1=nn.Linear(32*55*55, 256)
        self.bn5 = nn.BatchNorm1d(256)
        self.fc2=nn.Linear(256, 256)
        self.bn6 = nn.BatchNorm1d(256)
        self.fc3=nn.Linear(256, 5)
    def forward(self, chart):
        chart = chart.to(device)
        chart_out=F.relu(self.bn1(self.depthwise1(chart)))
        chart_out=F.relu(self.maxpool1(self.bn2(self.pointwise1(chart_out))))   
        chart_out=F.relu(self.bn3(self.depthwise2(chart_out)))
        chart_out=F.relu(self.maxpool2(self.bn4(self.pointwise2(chart_out))))
        chart_out = chart_out.view(chart.shape[0],-1)
        
        chart_out = F.relu(self.bn5(self.fc1(chart_out)))
        chart_out = F.relu(self.bn6(self.fc2(chart_out)))
        chart_out = F.softmax(self.fc3(chart_out))
        return chart_out


model = mark(3,32).to(device)
best_model = mark(3,32).to(device)
optimizer=torch.optim.Adam(model.parameters(), lr=1e-2,weight_decay=5e-4)
best_model.load_state_dict(model.state_dict())


<All keys matched successfully>

In [5]:
def get_filenames_without_extension(folder_path, extension=".pkl"):
    filenames = []
    for filename in os.listdir(folder_path):
        if filename.endswith(extension):
            filenames.append(filename[:-len(extension)])
    return filenames
tickers = get_filenames_without_extension("data")

In [6]:
#stacking all data in x (features, image in our case ) and y (labels one hot encoded)
x=[]
y=[]
for i in range(0,len(tickers)):
    with open(f'data//{tickers[i]}.pkl', 'rb') as f:
        data_list = pickle.load(f)
    data=data_list[0]
    chart=data[0]
    chart=torch.from_numpy(chart).float()
    chart = chart.permute(2,0,1)
    x.append(chart)
    target=data[1]
    target=torch.from_numpy(target).float()
    y.append(target)
    del data_list
x = torch.stack(x)
y = torch.stack(y)

In [7]:
#Split our dataset into training and testing sets. I'm using del to free the memory space
x_train=x[0:int(len(x)*0.8)]
y_train=y[0:int(len(x)*0.8)]
x_test=x[int(len(x)*0.8):]
y_test=y[int(len(x)*0.8):]
del x
del y

In [None]:
best_accuracy=0
crit=nn.CrossEntropyLoss()
torch.cuda.empty_cache()
x_train=x_train.to(device)
y_train=y_train.to(device)
for epoch in range(EPOCHS):
    correct = 0
    total = 0
    for i in range(0,len(x_train)-BATCH_SIZE,BATCH_SIZE):
        #forward
        y_hat=model(x_train[i:i+BATCH_SIZE]).to(device)
        loss=crit(y_train[i:i+BATCH_SIZE],y_hat)
        #backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(y_hat, 1)
        y_train_indices = torch.argmax(y_train[i:i + BATCH_SIZE], dim=1)
        correct += (predicted == y_train_indices).sum().item()
        total += BATCH_SIZE
    
    accuracy = correct / total
    if accuracy>=best_accuracy:
        best_model.load_state_dict(model.state_dict())
        best_accuracy=accuracy
    print('accuracy: ',accuracy)
    print('best_accuracy: ',best_accuracy)
