#### DeepLOB: CNN(Convolutional Blocks, inceptions) and LSTM
#### CNN1: Convolutional Layers, Pooling Layers and Fully Connected Layers
#### BinCTABL: BiN, BL_layer, TABL_layer

#### The models are selected from the paper "LOB-based Deep Learning Models for Stock Price Trend Prediction: A Benchmark Study". GitHub reference: https://github.com/matteoprata/LOBCAST

## Part 1: Preparation

In [9]:
# load packages
import os
import requests
import zipfile
import pandas as pd
import pickle
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from tqdm import tqdm 
from sklearn.metrics import accuracy_score, classification_report

import torch
import torch.nn.functional as F
from torch.utils import data
from torchinfo import summary
import torch.nn as nn
import torch.optim as optim

In [10]:
# device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [11]:
# Functions
def prepare_x(data):
    df1 = data[:40, :].T
    return np.array(df1)

def get_label(data):
    lob = data[-5:, :].T
    return lob

def data_classification(X, Y, T):
    [N, D] = X.shape
    df = np.array(X)

    dY = np.array(Y)

    dataY = dY[T - 1:N]

    dataX = np.zeros((N - T + 1, T, D))
    for i in range(T, N + 1):
        dataX[i - T] = df[i - T:i, :]

    return dataX, dataY

def torch_data(x, y):
    x = torch.from_numpy(x)
    x = torch.unsqueeze(x, 1)
    y = torch.from_numpy(y)
    y = F.one_hot(y, num_classes=3)
    return x, y

In [12]:
# Custom Dataset Class
class Dataset(data.Dataset):
    """Characterizes a dataset for PyTorch"""
    def __init__(self, data, k, num_classes, T):
        """Initialization""" 
        self.k = k
        self.num_classes = num_classes
        self.T = T
            
        x = prepare_x(data)
        y = get_label(data)
        x, y = data_classification(x, y, self.T)
        y = y[:,self.k] - 1
        self.length = len(x)

        x = torch.from_numpy(x)
        self.x = torch.unsqueeze(x, 1)
        self.y = torch.from_numpy(y)

    def __len__(self):
        """Denotes the total number of samples"""
        return self.length

    def __getitem__(self, index):
        """Generates samples of data"""
        return self.x[index], self.y[index]

## Part 2: Data Download and Extraction

In [13]:
def download_and_extract_data(data_url, data_zip_path, data_folder):
    if not os.path.isfile(data_zip_path):
        print('Downloading data...')
        response = requests.get(data_url, stream=True)
        with open(data_zip_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        print('Data download completed.')

        # Extract data.zip
        print('Extracting data...')
        with zipfile.ZipFile(data_zip_path, 'r') as zip_ref:
            zip_ref.extractall('.')
        print('Data extraction completed.')
    else:
        print('data.zip already exists.')
        # Check if the extracted folder exists
        if not os.path.exists(data_folder):
            print('Extracting data...')
            with zipfile.ZipFile(data_zip_path, 'r') as zip_ref:
                zip_ref.extractall('.')
            print('Data extraction completed.')
        else:
            print('Data already extracted.')

# Set data URL and paths
data_url = 'https://raw.githubusercontent.com/zcakhaa/DeepLOB-Deep-Convolutional-Neural-Networks-for-Limit-Order-Books/master/data/data.zip'
data_zip_path = 'data.zip'
data_folder = 'data'  # Extracted folder name

# Download and extract data
download_and_extract_data(data_url, data_zip_path, data_folder)

data.zip already exists.
Data already extracted.


## Part 3: Data Loading and Preprocessing

In [14]:
# Load training and validation data
dec_data = np.loadtxt('Train_Dst_NoAuction_DecPre_CF_7.txt')
dec_train = dec_data[:, :int(np.floor(dec_data.shape[1] * 0.8))]
dec_val = dec_data[:, int(np.floor(dec_data.shape[1] * 0.8)):]

# Load test data
dec_test1 = np.loadtxt('Test_Dst_NoAuction_DecPre_CF_7.txt')
dec_test2 = np.loadtxt('Test_Dst_NoAuction_DecPre_CF_8.txt')
dec_test3 = np.loadtxt('Test_Dst_NoAuction_DecPre_CF_9.txt')
dec_test = np.hstack((dec_test1, dec_test2, dec_test3))

# Print data shapes
print(dec_train.shape, dec_val.shape, dec_test.shape)

(149, 203800) (149, 50950) (149, 139587)


In [15]:
# Creating Dataset Instances and Data Loaders

batch_size = 64
dataset_train = Dataset(data=dec_train, k=4, num_classes=3, T=100)
dataset_val = Dataset(data=dec_val, k=4, num_classes=3, T=100)
dataset_test = Dataset(data=dec_test, k=4, num_classes=3, T=100)

train_loader = torch.utils.data.DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=dataset_val, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(dataset=dataset_test, batch_size=batch_size, shuffle=False)

print(dataset_train.x.shape, dataset_train.y.shape)

torch.Size([203701, 1, 100, 40]) torch.Size([203701])


In [16]:
# Viewing dataset_train
tmp_loader = torch.utils.data.DataLoader(dataset=dataset_train, batch_size=1, shuffle=True)

for x, y in tmp_loader:
    print(x)
    print(y)
    print(x.shape, y.shape)
    break

tensor([[[[0.1738, 0.0080, 0.1737,  ..., 0.0634, 0.1728, 0.0126],
          [0.1739, 0.0010, 0.1737,  ..., 0.0634, 0.1728, 0.0126],
          [0.1739, 0.0010, 0.1737,  ..., 0.0634, 0.1728, 0.0126],
          ...,
          [0.1741, 0.0339, 0.1739,  ..., 0.0623, 0.1730, 0.0100],
          [0.1741, 0.0203, 0.1739,  ..., 0.0623, 0.1730, 0.0100],
          [0.1741, 0.0203, 0.1740,  ..., 0.0623, 0.1731, 0.0495]]]],
       dtype=torch.float64)
tensor([0.], dtype=torch.float64)
torch.Size([1, 1, 100, 40]) torch.Size([1])


## Part 4: Model

#### 1. DeepLOB

In [28]:
class deeplob(nn.Module):
    def __init__(self, y_len):
        super().__init__()
        self.y_len = y_len
        
        # convolution blocks
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(1,2), stride=(1,2)),
            nn.LeakyReLU(negative_slope=0.01),
#             nn.Tanh(),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(32),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(1,2), stride=(1,2)),
            nn.Tanh(),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.Tanh(),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.Tanh(),
            nn.BatchNorm2d(32),
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(1,10)),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(4,1)),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(32),
        )
        
        # inception moduels
        self.inp1 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(1,1), padding='same'),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(64),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(3,1), padding='same'),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(64),
        )
        self.inp2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(1,1), padding='same'),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(64),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5,1), padding='same'),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(64),
        )
        self.inp3 = nn.Sequential(
            nn.MaxPool2d((3, 1), stride=(1, 1), padding=(1, 0)),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(1,1), padding='same'),
            nn.LeakyReLU(negative_slope=0.01),
            nn.BatchNorm2d(64),
        )
        
        # lstm layers
        self.lstm = nn.LSTM(input_size=192, hidden_size=64, num_layers=1, batch_first=True)
        self.fc1 = nn.Linear(64, self.y_len)

    def forward(self, x):
        # h0: (number of hidden layers, batch size, hidden size)
        h0 = torch.zeros(1, x.size(0), 64).to(device)
        c0 = torch.zeros(1, x.size(0), 64).to(device)
    
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        
        x_inp1 = self.inp1(x)
        x_inp2 = self.inp2(x)
        x_inp3 = self.inp3(x)  
        
        x = torch.cat((x_inp1, x_inp2, x_inp3), dim=1)
        
#         x = torch.transpose(x, 1, 2)
        x = x.permute(0, 2, 1, 3)
        x = torch.reshape(x, (-1, x.shape[1], x.shape[2]))
        
        x, _ = self.lstm(x, (h0, c0))
        x = x[:, -1, :]
        x = self.fc1(x)
        forecast_y = torch.softmax(x, dim=1)
        
        return forecast_y

#### 2. CNN1

In [7]:
class CNN1(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()

        n_features = input_dim[2]

        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(4, n_features), padding=(3, 0), dilation=(2, 1))
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=4)
        self.conv3 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=2)
        self.conv4 = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=3, padding=2)

        # Max pooling layers
        self.maxpool1 = nn.MaxPool1d(kernel_size=2)
        self.maxpool2 = nn.MaxPool1d(kernel_size=2)

        # Fully connected layers
        self.fc1 = nn.Linear(26 * 32, 32)
        self.fc2 = nn.Linear(32, output_dim)

        # Activation function
        self.relu = nn.LeakyReLU()

    def forward(self, x):
        # Apply convolutional layers with activation
        out = self.relu(self.conv1(x))                  # [batch_size, 16, H_out, W_out]
        out = out.permute(0, 1, 3, 2).reshape(out.size(0), out.size(1), -1)  # [batch_size, 16, W_out * H_out]

        out = self.relu(self.conv2(out))                # [batch_size, 16, L_out]
        out = self.maxpool1(out)                        # [batch_size, 16, L_out//2]

        out = self.relu(self.conv3(out))                # [batch_size, 32, L_out//2]
        out = self.relu(self.conv4(out))                # [batch_size, 32, L_out//2]
        out = self.maxpool2(out)                        # [batch_size, 32, L_out//4]

        # Flatten
        out = out.view(out.size(0), -1)                 # [batch_size, 32 * (L_out//4)]

        # Fully connected layers
        out = self.relu(self.fc1(out))                  # [batch_size, 32]
        out = self.fc2(out)                             # [batch_size, output_dim]

        return out

#### 3. BinCTABL

In [39]:
class BiN(nn.Module):
    def __init__(self, d2, d1, t1, t2):
        super().__init__()
        self.t1 = t1
        self.d1 = d1
        self.t2 = t2
        self.d2 = d2

        bias1 = torch.Tensor(t1, 1)
        self.B1 = nn.Parameter(bias1)
        nn.init.constant_(self.B1, 0)

        l1 = torch.Tensor(t1, 1)
        self.l1 = nn.Parameter(l1)
        nn.init.xavier_normal_(self.l1)

        bias2 = torch.Tensor(d1, 1)
        self.B2 = nn.Parameter(bias2)
        nn.init.constant_(self.B2, 0)

        l2 = torch.Tensor(d1, 1)
        self.l2 = nn.Parameter(l2)
        nn.init.xavier_normal_(self.l2)

        y1 = torch.Tensor(1, )
        self.y1 = nn.Parameter(y1)
        nn.init.constant_(self.y1, 0.5)

        y2 = torch.Tensor(1, )
        self.y2 = nn.Parameter(y2)
        nn.init.constant_(self.y2, 0.5)

    def forward(self, x):
        if (self.y1[0] < 0):
            y1 = torch.zeros(1, ).to(x.device)
            self.y1 = nn.Parameter(y1)
            nn.init.constant_(self.y1, 0.01)

        if (self.y2[0] < 0):
            y2 = torch.zeros(1, ).to(x.device)
            self.y2 = nn.Parameter(y2)
            nn.init.constant_(self.y2, 0.01)

        T2 = torch.ones([self.t1, 1], device=x.device)
        x2 = torch.mean(x, dim=2)
        x2 = torch.reshape(x2, (x2.shape[0], x2.shape[1], 1))

        std = torch.std(x, dim=2)
        std = torch.reshape(std, (std.shape[0], std.shape[1], 1))
        std[std < 1e-4] = 1

        diff = x - (x2 @ T2.T)
        Z2 = diff / (std @ T2.T)

        X2 = self.l2 @ T2.T
        X2 = X2 * Z2
        X2 = X2 + (self.B2 @ T2.T)

        T1 = torch.ones([self.d1, 1], device=x.device)
        x1 = torch.mean(x, dim=1)
        x1 = torch.reshape(x1, (x1.shape[0], x1.shape[1], 1))

        std = torch.std(x, dim=1)
        std = torch.reshape(std, (std.shape[0], std.shape[1], 1))

        op1 = x1 @ T1.T
        op1 = torch.permute(op1, (0, 2, 1))

        op2 = std @ T1.T
        op2 = torch.permute(op2, (0, 2, 1))

        z1 = (x - op1) / (op2)
        X1 = (T1 @ self.l1.T)
        X1 = X1 * z1
        X1 = X1 + (T1 @ self.B1.T)

        x = self.y1 * X1 + self.y2 * X2

        return x

class TABL_layer(nn.Module):
    def __init__(self, d2, d1, t1, t2):
        super().__init__()
        self.t1 = t1

        weight = torch.Tensor(d2, d1)
        self.W1 = nn.Parameter(weight)
        nn.init.kaiming_uniform_(self.W1, nonlinearity='relu')

        weight2 = torch.Tensor(t1, t1)
        self.W = nn.Parameter(weight2)
        nn.init.constant_(self.W, 1 / t1)

        weight3 = torch.Tensor(t1, t2)
        self.W2 = nn.Parameter(weight3)
        nn.init.kaiming_uniform_(self.W2, nonlinearity='relu')

        bias1 = torch.Tensor(d2, t2)
        self.B = nn.Parameter(bias1)
        nn.init.constant_(self.B, 0)

        l = torch.Tensor(1, )
        self.l = nn.Parameter(l)
        nn.init.constant_(self.l, 0.5)

        self.activation = nn.ReLU()

    def forward(self, X):
        if (self.l[0] < 0):
            l = torch.tensor([0.0], device=X.device)
            self.l = nn.Parameter(l)
            nn.init.constant_(self.l, 0.0)

        if (self.l[0] > 1):
            l = torch.tensor([1.0], device=X.device)
            self.l = nn.Parameter(l)
            nn.init.constant_(self.l, 1.0)

        X = self.W1 @ X

        W = self.W - self.W * torch.eye(self.t1, device=X.device) + torch.eye(self.t1, device=X.device) / self.t1

        E = X @ W

        A = torch.softmax(E, dim=-1)

        X = self.l[0] * X + (1.0 - self.l[0]) * X * A

        y = X @ self.W2 + self.B
        return y

class BL_layer(nn.Module):
    def __init__(self, d2, d1, t1, t2):
        super().__init__()
        weight1 = torch.Tensor(d2, d1)
        self.W1 = nn.Parameter(weight1)
        nn.init.kaiming_uniform_(self.W1, nonlinearity='relu')

        weight2 = torch.Tensor(t1, t2)
        self.W2 = nn.Parameter(weight2)
        nn.init.kaiming_uniform_(self.W2, nonlinearity='relu')

        bias1 = torch.zeros((d2, t2))
        self.B = nn.Parameter(bias1)
        nn.init.constant_(self.B, 0)

        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.activation(self.W1 @ x @ self.W2 + self.B)
        return x

class LOBCAST_model(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim

class BinCTABL(LOBCAST_model):
    def __init__(self, input_dim, output_dim, d2, d1, t1, t2, d3, t3, d4, t4):
        super().__init__(input_dim, output_dim)

        self.BiN = BiN(d2, d1, t1, t2)
        self.BL = BL_layer(d2, d1, t1, t2)
        self.BL2 = BL_layer(d3, d2, t2, t3)
        self.TABL = TABL_layer(d4, d3, t3, t4)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        x = x.squeeze(1)  

        # Permute to [batch_size, features, time]
        x = torch.permute(x, (0, 2, 1))  # [batch_size, 40, 100]

        x = self.BiN(x)

        # Apply max norm to BL layers
        self.max_norm_(self.BL.W1.data)
        self.max_norm_(self.BL.W2.data)
        x = self.BL(x)
        x = self.dropout(x)

        self.max_norm_(self.BL2.W1.data)
        self.max_norm_(self.BL2.W2.data)
        x = self.BL2(x)
        x = self.dropout(x)

        # Apply max norm to TABL layers
        self.max_norm_(self.TABL.W1.data)
        self.max_norm_(self.TABL.W.data)
        self.max_norm_(self.TABL.W2.data)
        x = self.TABL(x)
        x = torch.squeeze(x) 

        return x

    def max_norm_(self, w):
        with torch.no_grad():
            if (torch.linalg.matrix_norm(w) > 10.0):
                norm = torch.linalg.matrix_norm(w)
                desired = torch.clamp(norm, min=0.0, max=10.0)
                w *= (desired / (1e-8 + norm))

In [29]:
Model_Deeplob = deeplob(y_len = dataset_train.num_classes)
summary(Model_Deeplob, input_size=(1, 1, 100, 40))

Layer (type:depth-idx)                   Output Shape              Param #
deeplob                                  [1, 3]                    --
├─Sequential: 1-1                        [1, 32, 94, 20]           --
│    └─Conv2d: 2-1                       [1, 32, 100, 20]          96
│    └─LeakyReLU: 2-2                    [1, 32, 100, 20]          --
│    └─BatchNorm2d: 2-3                  [1, 32, 100, 20]          64
│    └─Conv2d: 2-4                       [1, 32, 97, 20]           4,128
│    └─LeakyReLU: 2-5                    [1, 32, 97, 20]           --
│    └─BatchNorm2d: 2-6                  [1, 32, 97, 20]           64
│    └─Conv2d: 2-7                       [1, 32, 94, 20]           4,128
│    └─LeakyReLU: 2-8                    [1, 32, 94, 20]           --
│    └─BatchNorm2d: 2-9                  [1, 32, 94, 20]           64
├─Sequential: 1-2                        [1, 32, 88, 10]           --
│    └─Conv2d: 2-10                      [1, 32, 94, 10]           2,080
│    └

In [33]:
input_dim = (1, 100, 40)  
output_dim = dataset_train.num_classes  
Model_CNN1 = CNN1(input_dim=input_dim, output_dim=output_dim)
summary(Model_CNN1, input_size=(1, 1, 100, 40))

Layer (type:depth-idx)                   Output Shape              Param #
CNN1                                     [1, 3]                    --
├─Conv2d: 1-1                            [1, 16, 100, 1]           2,576
├─LeakyReLU: 1-2                         [1, 16, 100, 1]           --
├─Conv1d: 1-3                            [1, 16, 97]               1,040
├─LeakyReLU: 1-4                         [1, 16, 97]               --
├─MaxPool1d: 1-5                         [1, 16, 48]               --
├─Conv1d: 1-6                            [1, 32, 50]               1,568
├─LeakyReLU: 1-7                         [1, 32, 50]               --
├─Conv1d: 1-8                            [1, 32, 52]               3,104
├─LeakyReLU: 1-9                         [1, 32, 52]               --
├─MaxPool1d: 1-10                        [1, 32, 26]               --
├─Linear: 1-11                           [1, 32]                   26,656
├─LeakyReLU: 1-12                        [1, 32]                   --

In [40]:
d1 = 40
d2 = 40
t1 = 100
t2 = 100
d3 = 120 
t3 = 50 
d4 = 3
t4 = 1

input_dim = (1, 100, 40)  # [channels, time, features]
output_dim = 3

Model_BinCTABL = BinCTABL(input_dim=input_dim,
                output_dim=output_dim,
                d2=d2, d1=d1, t1=t1, t2=t2,
                d3=d3, t3=t3, d4=d4, t4=t4)
summary(Model_BinCTABL, input_size=(1, 1, 100, 40))

Layer (type:depth-idx)                   Output Shape              Param #
BinCTABL                                 [3]                       --
├─BiN: 1-1                               [1, 40, 100]              282
├─BL_layer: 1-2                          [1, 40, 100]              15,600
│    └─ReLU: 2-1                         [1, 40, 100]              --
├─Dropout: 1-3                           [1, 40, 100]              --
├─BL_layer: 1-4                          [1, 120, 50]              15,800
│    └─ReLU: 2-2                         [1, 120, 50]              --
├─Dropout: 1-5                           [1, 120, 50]              --
├─TABL_layer: 1-6                        [1, 3, 1]                 2,914
Total params: 34,596
Trainable params: 34,596
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0
Input size (MB): 0.02
Forward/backward pass size (MB): 0.03
Params size (MB): 0.00
Estimated Total Size (MB): 0.05

In [31]:
# setting up the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(Model_Deeplob.parameters(), lr=0.0001)

## Model Training

In [22]:
# A function to encapsulate the training loop
def batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs):
    
    train_losses = np.zeros(epochs)
    test_losses = np.zeros(epochs)
    best_test_loss = np.inf
    best_test_epoch = 0

    for it in tqdm(range(epochs)):
        
        model.train()
        t0 = datetime.now()
        train_loss = []
        for inputs, targets in train_loader:
            # move data to GPU
            inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)
            # print("inputs.shape:", inputs.shape)
            # zero the parameter gradients
            optimizer.zero_grad()
            # Forward pass
            # print("about to get model output")
            outputs = model(inputs)
            # print("done getting model output")
            # print("outputs.shape:", outputs.shape, "targets.shape:", targets.shape)
            loss = criterion(outputs, targets)
            # Backward and optimize
            # print("about to optimize")
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
        # Get train loss and test loss
        train_loss = np.mean(train_loss) # a little misleading
    
        model.eval()
        test_loss = []
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)      
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            test_loss.append(loss.item())
        test_loss = np.mean(test_loss)

        # Save losses
        train_losses[it] = train_loss
        test_losses[it] = test_loss
        
        if test_loss < best_test_loss:
            torch.save(model, './best_val_model_pytorch')
            best_test_loss = test_loss
            best_test_epoch = it
            print('model saved')

        dt = datetime.now() - t0
        print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, \
          Validation Loss: {test_loss:.4f}, Duration: {dt}, Best Val Epoch: {best_test_epoch}')

    return train_losses, test_losses

In [None]:
train_losses, val_losses = batch_gd(Model_BinCTABL, criterion, optimizer, 
                                    train_loader, val_loader, epochs=50)

In [None]:
plt.figure(figsize=(15,6))
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='validation loss')
plt.legend()

## Model Testing

In [None]:
model = torch.load('best_val_model_pytorch')

n_correct = 0.
n_total = 0.
for inputs, targets in test_loader:
    # Move to GPU
    inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)

    # Forward pass
    outputs = model(inputs)
    
    # Get prediction
    # torch.max returns both max and argmax
    _, predictions = torch.max(outputs, 1)

    # update counts
    n_correct += (predictions == targets).sum().item()
    n_total += targets.shape[0]

test_acc = n_correct / n_total
print(f"Test acc: {test_acc:.4f}")

In [None]:
# model = torch.load('best_val_model_pytorch')
all_targets = []
all_predictions = []

for inputs, targets in test_loader:
    # Move to GPU
    inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)

    # Forward pass
    outputs = model(inputs)
    
    # Get prediction
    # torch.max returns both max and argmax
    _, predictions = torch.max(outputs, 1)

    all_targets.append(targets.cpu().numpy())
    all_predictions.append(predictions.cpu().numpy())

all_targets = np.concatenate(all_targets)    
all_predictions = np.concatenate(all_predictions) 

In [None]:
print('accuracy_score:', accuracy_score(all_targets, all_predictions))
print(classification_report(all_targets, all_predictions, digits=4))

In [None]:
# Assume data has been successfully loaded using np.loadtxt()
print('Processing data...')
X = dec_data[:, :-1]  # Features
y = dec_data[:, -1]   # Labels

# Split into training, validation, and test sets
from sklearn.model_selection import train_test_split

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Convert to PyTorch tensors
import torch

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Create data loaders
from torch.utils.data import DataLoader, TensorDataset

batch_size = 64
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Step 3: Model construction
print('Building model...')
class LOBNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LOBNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

input_size = X_train.shape[1]
hidden_size = 64
output_size = len(np.unique(y))  # Number of label classes
model = LOBNet(input_size, hidden_size, output_size)

# Step 4: Train the model
print('Training model...')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Step 5: Evaluate the model
print('Evaluating model...')
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in val_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Validation Accuracy: {accuracy:.4f}')