In [1]:
import os, re, datetime as dt, time
from pathlib import Path

import numpy as np
import pandas as pd
import cudf                 # GPU Parquet
import cupy as cp            # companion to cudf
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.cuda import amp
from sklearn.metrics import f1_score                # ①

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using", device)
torch.backends.cudnn.benchmark = True    # gyorsítja a fix input-méretű CNN-t
torch.backends.cuda.matmul.allow_tf32 = True

Using cuda


In [2]:
DEBUG_PRINT = True  # master switch – set False when prints are no longer needed

def p(msg: str):
    """Wrapped print so we can globally silence if needed."""
    if DEBUG_PRINT:
        print(f"[DEBUG] {msg}", flush=True)


In [3]:
# 0. Device & torch settings
# -----------------------------------------------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using", device)
if device.type == "cuda":
    torch.backends.cudnn.benchmark = True
    torch.backends.cuda.matmul.allow_tf32 = True

Using cuda


In [4]:
def load_book_chunk(
    start_date: dt.datetime,
    end_date:   dt.datetime,
    symbol: str,
    data_dir: str = "./szakdolgozat-high-freq-btc-prediction/data",
):
    """Load LOB parquet chunks between *start_date* and *end_date* (inclusive)."""
    p(f"load_book_chunk(): symbol={symbol}, range=({start_date} … {end_date})")
    t0 = time.perf_counter()

    sym_pat = symbol.lower().replace("-", "_")
    rex = re.compile(rf"book_{sym_pat}_(\d{{8}})_(\d{{8}})\.parquet$", re.I)

    sd, ed = pd.to_datetime(start_date), pd.to_datetime(end_date)
    frames = []

    for fn in sorted(os.listdir(data_dir)):
        m = rex.match(fn)
        fp = Path(data_dir) / fn
        if not m or not fp.is_file():
            continue

        f_sd, f_ed = pd.to_datetime(m.group(1)), pd.to_datetime(m.group(2))
        if f_ed < sd or f_sd > ed:
            continue  # out of range

        p(f"  reading {fn}")
        df = cudf.read_parquet(fp)
        msk = (df["received_time"] >= sd) & (df["received_time"] <= ed)
        if bool(msk.any()):
            p(f"    -> {msk.sum()} rows kept from this file")
            frames.append(df[msk])
        else:
            p("    -> 0 rows in range – skipped")

    if not frames:
        p("No data loaded – returning empty DataFrame")
        return cudf.DataFrame()

    df = cudf.concat(frames, ignore_index=True)
    p(f"Concatenated {len(frames)} chunks – total rows: {len(df)} – took {time.perf_counter()-t0:.2f} s")
    return df


In [5]:
class LobDataset(Dataset):
    def __init__(self, df: cudf.DataFrame,
                 depth:int = 10,
                 window:int = 100,
                 horizon:int = 100,
                 alpha:float = 0.002,
                 stride:int = 5):

        t0 = time.perf_counter()
        self.depth, self.window = depth, window
        self.horizon, self.alpha = horizon, alpha
        self.stride = stride

        p(f"LobDataset: depth={depth}, window={window}, horizon={horizon}, stride={stride}")

        pat = rf'(bid|ask)_[0-9]{{1,2}}_(price|size)'
        feat_cols = [c for c in df.columns
                     if re.match(pat, c) and int(c.split('_')[1]) < depth]
        p(f"  selected {len(feat_cols)} feature columns for LOB levels")

        # ---- rolling z‑score over ~5 days (~6.12 Hz sampling) ----
        wnd = int(5*24*60*60*6.12)
        p(f"  starting 5‑day z‑score normalisation (window={wnd}) on GPU …")
        t_norm = time.perf_counter()
        mu  = df[feat_cols].rolling(wnd, 1).mean()
        sig = df[feat_cols].rolling(wnd, 1).std() + 1e-8
        df[feat_cols] = ((df[feat_cols] - mu) / sig).astype(cp.float16)
        p(f"  normalisation finished in {time.perf_counter()-t_norm:.2f} s")

        # ---- single torch tensor on CPU (fp16) ----
        self.X = torch.from_dlpack(df[feat_cols].T.to_dlpack())\
                       .view(-1, depth*4)\
                       .to(torch.float16, device='cpu', non_blocking=True)
        mid = (df["bid_0_price"] + df["ask_0_price"]) / 2
        self.mid = torch.from_dlpack(mid.to_dlpack())\
                         .to(torch.float16, device='cpu', non_blocking=True)

        p(f"  LobDataset ready – {len(self)} samples – setup took {time.perf_counter()-t0:.2f} s")

    # ------------------------------------------------------------------
    def __len__(self):
        return (len(self.X) - self.window - self.horizon) // self.stride

    def __getitem__(self, i):
        idx = i * self.stride
        j   = idx + self.window

        x  = self.X[idx:j].view(self.window, self.depth*4)
        r  = (self.mid[j+self.horizon-1] - self.mid[j-1]) / self.mid[j-1]
        y  = 2 if r >  self.alpha else 0 if r < -self.alpha else 1
        return x, torch.tensor(y, dtype=torch.long)

In [6]:
# Cell 3 – új verzió
def make_loaders(df,
                 depth:   int = 10,
                 window:  int = 100,
                 horizon: int = 100,
                 valid_frac: float = 0.1,
                 batch: int = 32):

    ds = LobDataset(df,
                    depth   = depth,
                    window  = window,
                    horizon = horizon)

    n = len(ds)
    idx = np.arange(n);  np.random.shuffle(idx)
    split = int(n * (1 - valid_frac))

    train_sampler = torch.utils.data.SubsetRandomSampler(idx[:split])
    val_sampler   = torch.utils.data.SubsetRandomSampler(idx[split:])

    train_loader = DataLoader(ds,
                              batch_size = batch,
                              sampler    = train_sampler,
                              num_workers= 2,
                              prefetch_factor = 4,
                              pin_memory = True,
                              persistent_workers = True)
    val_loader   = DataLoader(ds,
                              batch_size = batch,
                              sampler    = val_sampler,
                              num_workers= 2,
                              pin_memory = True,
                              persistent_workers = True)
    return train_loader, val_loader

In [7]:
class InceptionModule(nn.Module):
    """DeepLOB‑style Inception@32 with detailed timing prints."""
    def __init__(self, in_ch: int, out_ch: int = 32,
                 ratio: tuple = (0.25, 0.375, 0.25, 0.125)):
        super().__init__()
        b1, b3, b5, bp = [int(out_ch * r) for r in ratio]

        self.branch1 = nn.Conv1d(in_ch, b1, 1)
        self.branch3 = nn.Sequential(
            nn.Conv1d(in_ch, max(1, b3 // 2), 1),
            nn.LeakyReLU(0.01, inplace=True),
            nn.Conv1d(max(1, b3 // 2), b3, 3, padding=1)
        )
        self.branch5 = nn.Sequential(
            nn.Conv1d(in_ch, max(1, b5 // 2), 1),
            nn.LeakyReLU(0.01, inplace=True),
            nn.Conv1d(max(1, b5 // 2), b5, 5, padding=2)
        )
        self.branch_pool = nn.Sequential(
            nn.MaxPool1d(3, stride=1, padding=1),
            nn.Conv1d(in_ch, bp, 1)
        )
        self.act = nn.LeakyReLU(0.01, inplace=True)

    def forward(self, x):               # x: (B, T, C)
        t0 = time.perf_counter()
        x = x.transpose(1, 2).contiguous()
        y = torch.cat((
            self.branch1(x),
            self.branch3(x),
            self.branch5(x),
            self.branch_pool(x)
        ), dim=1)
        y = self.act(y)
        y = y.transpose(1, 2).contiguous()
        p(f"    InceptionModule forward() took {time.perf_counter()-t0:.4f} s – output {tuple(y.shape)}")
        return y


In [8]:
# -----------------------------------------------------------------------------
# 5. DeepLOB model with per‑stage timing
# -----------------------------------------------------------------------------
class DeepLOB(nn.Module):
    def __init__(self, depth: int = 10):
        super().__init__()
        self.depth = depth

        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, (1, 2), (1, 2)),
            nn.LeakyReLU(0.01, True),
            nn.Conv2d(32, 32, (1, 2), (1, 2)),
            nn.LeakyReLU(0.01, True),
            nn.Conv2d(32, 32, (1, depth), (1, 1)),
            nn.LeakyReLU(0.01, True),
        )
        self.inception = InceptionModule(in_ch=32, out_ch=32)
        self.lstm = nn.LSTM(input_size=32,
                            hidden_size=64,
                            batch_first=True,
                            dropout=0.1)
        self.head = nn.Linear(64, 3)

    def forward(self, x):              # x: (B, 100, 40)
        t_fwd = time.perf_counter()

        # CNN expects (B, C=1, H=100, W=40)
        t0 = time.perf_counter()
        x = x.unsqueeze(1)
        x = self.cnn(x)
        p(f"    CNN took {time.perf_counter()-t0:.4f} s – output {tuple(x.shape)}")

        # prepare for Inception (B, T, C)
        x = x.squeeze(-1).permute(0, 2, 1)  # (B, 100, 32)

        # Inception
        x = self.inception(x)

        # LSTM
        t0 = time.perf_counter()
        out, _ = self.lstm(x)
        p(f"    LSTM took {time.perf_counter()-t0:.4f} s – output {tuple(out.shape)}")

        y = self.head(out[:, -1])
        p(f"    Total forward {time.perf_counter()-t_fwd:.4f} s")
        return y

In [9]:
# -----------------------------------------------------------------------------
# 6. Training loop with epoch/step timing and metrics
# -----------------------------------------------------------------------------

def train(model, train_loader, val_loader,
          epochs=40, lr=1e-3, patience_lim=5,
          accum_steps=2):

    opt    = torch.optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.999),
                               eps=1e-8, weight_decay=1e-4, fused=True)
    scaler = amp.GradScaler()
    ce     = nn.CrossEntropyLoss()

    best_f1, wait = 0.0, 0
    p("Starting training …")
    for ep in range(1, epochs + 1):
        ep_t0 = time.perf_counter()
        p(f"\n===== EPOCH {ep}/{epochs} =====")

        # ---------- TRAIN ----------
        model.train()
        running_loss = 0.0
        t_train = time.perf_counter()
        for step, (xb, yb) in enumerate(train_loader):
            step_t0 = time.perf_counter()
            xb = xb.to(device, non_blocking=True).contiguous(memory_format=torch.channels_last)
            yb = yb.to(device, non_blocking=True)

            with amp.autocast():
                logits = model(xb)
                loss   = ce(logits, yb) / accum_steps

            scaler.scale(loss).backward()
            if (step + 1) % accum_steps == 0:
                scaler.step(opt)
                scaler.update()
                opt.zero_grad(set_to_none=True)

            running_loss += loss.item() * accum_steps
            p(f"      step {step:04d} took {time.perf_counter()-step_t0:.3f} s")
        p(f"  TRAIN epoch took {time.perf_counter()-t_train:.2f} s")

        # ---------- VALID ----------
        model.eval()
        y_true, y_pred = [], []
        t_val = time.perf_counter()
        with torch.no_grad(), amp.autocast():
            for xb, yb in val_loader:
                logits = model(xb.to(device, non_blocking=True))
                y_true.append(yb)
                y_pred.append(logits.argmax(1).cpu())
        p(f"  VALID epoch took {time.perf_counter()-t_val:.2f} s")

        macro_f1 = f1_score(torch.cat(y_true), torch.cat(y_pred), average='macro')
        print(f"Ep {ep:02d}  loss={running_loss/len(train_loader.sampler):.4f}  F1={macro_f1:.4f}")

        # ---------- EARLY STOP ----------
        if macro_f1 > best_f1 + 1e-4:
            best_f1, wait = macro_f1, 0
            torch.save(model.state_dict(), "best_deeplob.pt")
            p("  New best model saved.")
        else:
            wait += 1
        if wait >= patience_lim:
            print("Early stop triggered – no improvement.")
            break
        p(f"EPOCH {ep} finished in {time.perf_counter()-ep_t0:.2f} s")

In [None]:
# -----------------------------------------------------------------------------
# 7. Convenience harness (example run)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    df = load_book_chunk(dt.datetime(2025, 2, 26), dt.datetime(2025, 2, 28), "BTC-USDT")

    train_loader, val_loader = make_loaders(df,
        depth   = 10,
        window  = 100,
        horizon = 100,
        batch   = 32)

    model = DeepLOB(depth=10).to(device, memory_format=torch.channels_last)
    model = torch.compile(model, mode="reduce-overhead")

    train(model, train_loader, val_loader,
          epochs       = 40,
          lr           = 1e-3,
          patience_lim = 5,
          accum_steps  = 2)

[DEBUG] load_book_chunk(): symbol=BTC-USDT, range=(2025-02-26 00:00:00 … 2025-02-28 00:00:00)
[DEBUG]   reading book_btc_usdt_20250215_20250228.parquet
[DEBUG]     -> 924647 rows kept from this file
[DEBUG] Concatenated 1 chunks – total rows: 924647 – took 9.49 s
[DEBUG] LobDataset: depth=10, window=100, horizon=100, stride=5
[DEBUG]   selected 40 feature columns for LOB levels
[DEBUG]   starting 5‑day z‑score normalisation (window=2643840) on GPU …


In [None]:
# Cell 7 ────────────────────────────────────────────────────────────────────
model = DeepLOB().to(device)
model.load_state_dict(torch.load("best_deeplob.pt"))
model.eval()

def predict(window_100x40: np.ndarray) -> np.ndarray:
    x = torch.from_numpy(window_100x40.astype(np.float32)).unsqueeze(0).to(device)
    with torch.no_grad(), amp.autocast():
        logits = model(x)
    return torch.softmax(logits, 1).cpu().numpy()[0]   # (p_down, p_stationary, p_up)


FileNotFoundError: [Errno 2] No such file or directory: 'best_deeplob.pt'