In [1]:
import os
from typing import Any, Dict, Tuple

import numpy as np
import pytorch_lightning as pl
import torch
import torchmetrics
from numpy.lib.stride_tricks import sliding_window_view
from torch import nn
from torch.utils.data import DataLoader, Dataset

# Download data

In [28]:
if not all(
    os.path.isfile(path) for path in ["data/train.txt", "data/val.txt", "test.txt"]
):
    # data paths
    train_path = "data/Train_Dst_NoAuction_DecPre_CF_7.txt"
    test_paths = [
        "data/Test_Dst_NoAuction_DecPre_CF_7.txt",
        "data/Test_Dst_NoAuction_DecPre_CF_8.txt",
        "data/Test_Dst_NoAuction_DecPre_CF_9.txt",
    ]

    # download data
    if not os.path.isfile("data/data.zip"):
        !wget "https://raw.githubusercontent.com/zcakhaa/DeepLOB-Deep-Convolutional-Neural-Networks-for-Limit-Order-Books/master/data/data.zip" -P data/
        !unzip -n data/data.zip -d data/

    # load training + validation data
    train_val_data = np.loadtxt(train_path, unpack=True)

    # split into train and validation
    train_slice = slice(0, int(0.8 * train_val_data.shape[0]))
    val_slice = slice(int(0.8 * train_val_data.shape[0]), train_val_data.shape[0])

    train_data = train_val_data[train_slice, :]
    val_data = train_val_data[val_slice, :]

    # load test data
    test_data = np.concatenate([np.loadtxt(path, unpack=True) for path in test_paths])

    # save train, val, test data to single
    np.savetxt("data/train.txt", train_data.T)
    np.savetxt("data/val.txt", val_data.T)
    np.savetxt("data/test.txt", test_data.T)

else:
    # data paths
    train_path = "data/train.txt"
    val_path = "data/val.txt"
    test_path = "data/test.txt"

    # load train, val, test data
    train_data = np.loadtxt(train_path)
    val_data = np.loadtxt(val_path)
    test_data = np.loadtxt(test_path)

In [3]:
train_data.shape, val_data.shape, test_data.shape

((203800, 149), (50950, 149), (139587, 149))

In [50]:
class LobDataset(Dataset):
    def __init__(
        self,
        data: np.ndarray,
        window_length: int = 100,
        prediction_horizon_index: int = 4,
    ) -> None:
        super(LobDataset, self).__init__()
        data_copy = data.copy()
        # As input, we select the first 40 columns. These are the first 10 levels
        # of the orderbook, containing price and volume for both bid and ask
        input_data = data_copy[:, :40]
        # As labels, we select the last 5 columns of the orderbook.
        # The labels are (1, 2, 3), which respectively represent
        # (positive percentage change, stationary, negative percentage change).
        labels = data_copy[:, -5:]
        # Make the labels start from 0
        labels -= 1

        # Each of the 5 column of the labels represents
        # a different prediction horizon (i.e., 1, 2, 3, 5, 10).
        # We keep just one of those
        labels = labels[:, prediction_horizon_index]

        # Split the input data in windows of length `window_length`,
        # and trim the first `window_length` elements of the labels
        input_windows, labels_trimmed = self.sliding_window_data(
            input_data, labels, window_length
        )

        # Cast np arrays into tensors and add one dimension to input to account for convolutions
        self.input_windows = torch.tensor(input_windows, dtype=torch.float).unsqueeze(1)
        self.labels = torch.tensor(labels_trimmed, dtype=torch.long)

    def __len__(self) -> int:
        return self.input_windows.shape[0]

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        return {"input": self.input_windows[idx], "labels": self.labels[idx]}

    @staticmethod
    def sliding_window_data(
        input_data: np.ndarray, labels: np.ndarray, window_length: int
    ) -> Tuple[np.ndarray, np.ndarray]:
        input_data = np.array(input_data)
        labels = np.array(labels)
        input_windows = sliding_window_view(
            input_data, window_length, axis=0
        ).transpose((0, 2, 1))
        labels_trimmed = labels[window_length - 1 :]
        return input_windows, labels_trimmed

# Model

In [46]:
class ConvolutionBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(ConvolutionBlock, self).__init__()

        self.conv_block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, (1, 2), (1, 2)),
            nn.LeakyReLU(0.01),
            nn.Conv2d(out_channels, out_channels, (4, 1), padding="same"),
            nn.LeakyReLU(0.01),
            nn.Conv2d(out_channels, out_channels, (4, 1), padding="same"),
            nn.LeakyReLU(0.01),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.conv_block(x)


class InceptionBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(InceptionBlock, self).__init__()

        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, (1, 1), padding="same"),
            nn.LeakyReLU(0.01),
            nn.Conv2d(out_channels, out_channels, (3, 1), padding="same"),
            nn.LeakyReLU(0.01),
        )
        self.block2 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, (1, 1), padding="same"),
            nn.LeakyReLU(0.01),
            nn.Conv2d(out_channels, out_channels, (5, 1), padding="same"),
            nn.LeakyReLU(0.01),
        )
        self.block3 = nn.Sequential(
            nn.MaxPool2d((3, 1), stride=(1, 1), padding=(1, 0)),
            nn.Conv2d(in_channels, out_channels, (1, 1), padding="same"),
            nn.LeakyReLU(0.01),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out_1 = self.block1(x)
        out_2 = self.block2(x)
        out_3 = self.block3(x)

        return torch.cat((out_1, out_2, out_3), dim=1)


class DeepLOB(nn.Module):
    def __init__(self, num_classes: int = 3) -> None:
        super(DeepLOB, self).__init__()

        # convolution blocks
        self.conv_block1 = ConvolutionBlock(1, 32)
        self.conv_block2 = ConvolutionBlock(32, 32)

        # convolution block 3 is not standard
        self.conv_block3 = self.conv_block = nn.Sequential(
            nn.Conv2d(32, 32, (1, 10)),
            nn.LeakyReLU(0.01),
            nn.Conv2d(32, 32, (4, 1), padding="same"),
            nn.LeakyReLU(0.01),
            nn.Conv2d(32, 32, (4, 1), padding="same"),
            nn.LeakyReLU(0.01),
        )

        # inception block
        self.inception_block = InceptionBlock(32, 64)

        # lstm layer
        self.lstm = nn.LSTM(input_size=192, hidden_size=64, batch_first=True)
        self.fc1 = nn.Linear(64, num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # conv blocks
        out = self.conv_block1(x)

        out = self.conv_block2(out)
        out = self.conv_block3(out)

        # inception block
        out = self.inception_block(out)

        # reshape data to feed lstm
        out = out.permute(0, 2, 1, 3)
        out = out.reshape(-1, out.shape[1], out.shape[2])

        # use lstm and take last item
        out, _ = self.lstm(out)
        out = out[:, -1, :]

        # linear and softmax to output probabilities
        return self.fc1(out)

# Pytorch Lightning modules

## Data module

In [47]:
class LobDataModule(pl.LightningDataModule):
    def __init__(
        self, train_data: np.ndarray, val_data: np.ndarray, batch_size: int
    ) -> None:
        super(LobDataModule, self).__init__()
        self.train_data = train_data
        self.val_data = val_data
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.trainset = LobDataset(self.train_data)
        self.devset = LobDataset(self.val_data)

    def train_dataloader(self):
        return DataLoader(
            self.trainset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=8,
            pin_memory=True,
        )

    def val_dataloader(self):
        return DataLoader(
            self.devset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=8,
            pin_memory=True,
        )

## Model module

In [53]:
class DeepLobModel(pl.LightningModule):
    def __init__(self, hparams: Dict[str, Any]) -> None:
        super(DeepLobModel, self).__init__()
        self.save_hyperparameters(hparams)
        self.model = DeepLOB(self.hparams.num_classes)

        self.criterion = nn.CrossEntropyLoss()

        self.train_acc = torchmetrics.Accuracy()
        self.valid_acc = torchmetrics.Accuracy()
        self.train_f1 = torchmetrics.F1()
        self.val_f1 = torchmetrics.F1()

    def forward(self, batch: torch.Tensor) -> torch.Tensor:
        return self.model(batch["input"])

    def step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        labels = batch["labels"].view(-1)
        logits = self(batch)
        predictions = torch.argmax(logits, dim=1)
        loss = self.criterion(logits, labels)

        return {"loss": loss, "predictions": predictions}

    def training_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        step_output = self.step(batch)
        accuracy = self.train_acc(step_output["predictions"], batch["labels"])
        f1_score = self.train_f1(step_output["predictions"], batch["labels"])

        self.log_dict(
            {
                "train_loss": step_output["loss"],
                "train_acc": accuracy,
                "train_f1": f1_score,
            },
            prog_bar=True,
            on_step=True,
            on_epoch=True,
        )

        return step_output

    def validation_step(
        self, batch: Dict[str, torch.Tensor]
    ) -> Dict[str, torch.Tensor]:
        step_output = self.step(batch)
        accuracy = self.val_acc(step_output["predictions"], batch["labels"])
        f1_score = self.val_f1(step_output["predictions"], batch["labels"])

        self.log_dict(
            {"val_loss": step_output["loss"], "val_acc": accuracy, "val_f1": f1_score},
            prog_bar=True,
            on_step=True,
            on_epoch=True,
        )

        return step_output

    def configure_optimizers(self) -> torch.optim.Optimizer:
        return torch.optim.Adam(
            self.parameters(),
            lr=self.hparams.lr,
            weight_decay=self.hparams.weight_decay,
        )

# Training

In [54]:
hparams = {
    "lr": 0.0001,
    "weight_decay": 0.0,
    "num_classes": 3,
    "batch_size": 128,
}
model = DeepLobModel(hparams)

datamodule = LobDataModule(train_data, val_data, hparams["batch_size"])

trainer = pl.Trainer(
    gpus=1,
    val_check_interval=1.0,
    max_epochs=100,
    num_sanity_val_steps=0,
)
trainer.fit(model=model, datamodule=datamodule)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type             | Params
-----------------------------------------------
0 | model     | DeepLOB          | 142 K 
1 | criterion | CrossEntropyLoss | 0     
2 | train_acc | Accuracy         | 0     
3 | valid_acc | Accuracy         | 0     
4 | train_f1  | F1               | 0     
5 | val_f1    | F1               | 0     
-----------------------------------------------
142 K     Trainable params
0         Non-trainable params
142 K     Total params
0.571     Total estimated model params size (MB)


Epoch 0:   2%|▏         | 48/1990 [00:18<12:32,  2.58it/s, loss=1.09, v_num=8, train_loss_step=1.090, train_acc_step=0.391, train_f1_step=0.391]