# Personal Use Notice
The content of this notebook is intended for private use only. It may be shared with friends and family for non-commercial purposes, but must not be posted online, resold, or distributed publicly in any form. Unauthorized public sharing, uploading, or commercial use is strictly prohibited.

© 2025 - Jeremy COCHOY

# PART 1

## Data Exploration

In [None]:
 #!wget https://github.com/jeremycochoy/mlprague-2025-workshop/raw/refs/heads/master/klines_5m_BTC_USDT.parquet

In [None]:
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

In [None]:
df = pd.read_parquet('klines_5m_BTC_USDT.parquet')
df

Resample the dataframe to a 30 min frequency. Handle carefully first, last, max and min.

In [None]:
df = df.resample('30min').apply({
    'open': 'first',
    'close': 'last',
    'low': 'min',
    'high': 'max',
})
df

Look at the change between open and close of a candle, normalized by the average price of btc at this time. You can take the time to zoom in or out.

In [None]:
data = 2 * (df['close'] - df['open']) / (df['close'] + df['open'])
sns.histplot(data, kde=True).set(xlim=(-0.04, 0.04))

Compute the direction of the price between two consecutive close. A 1 for a price increasing, a -1 for a price decreasing. If the price remain unchanged, the value is 0. This is the information we aim at forecasting.

In [None]:
delta_price = df['close'] - df['close'].shift(1)
direction = (delta_price > 0) * 1.0 - (delta_price < 0) * 1.0
direction

Use a scatter plot to visualize a small temporal slice and get an idea of the amount of up/down/zero.

In [None]:
plt.scatter(direction.iloc[:250].index, direction.iloc[:250])
plt.show()

Visualise the occurrence of each class in the whole dataset

In [None]:
# Count occurrences
values, counts = np.unique(direction, return_counts=True)

# Plot
plt.bar([str(v) for v in values], counts)
plt.xlabel('Class')
plt.ylabel('Count')
plt.title('Class Distribution (-1, 0, 1)')
plt.show()

Compute a normalized price change. This is a multiplicative factor between the close price of two consecutive candles. Make sure the new time serie remain causal.

In [None]:
price_change = df['close'] / df['close'].shift(1)
price_change = price_change.fillna(1)

In [None]:
plt.plot(price_change.iloc[-300:])
plt.title('Price change')
plt.show()

## Dataset

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Create a training and validation dataset

In [None]:
SEQ_LEN = 512
split_date = '2024-01-01'

# Ensure alignment
assert price_change.index.equals(direction.index)

# Split by date
train_mask = price_change.index < split_date
val_mask = price_change.index >= split_date

price_train, price_val = price_change[train_mask], price_change[val_mask]
dir_train, dir_val = direction[train_mask], direction[val_mask]


In [None]:
class PriceToDirectionDataset(Dataset):
    def __init__(self, price_change_series, direction_series, seq_len=512):
        self.price_change = price_change_series.values.astype(np.float32)
        self.directions = direction_series.values.astype(np.float32)
        self.seq_len = seq_len

        assert len(self.price_change) == len(self.directions)

    def __len__(self):
        return len(self.price_change) - self.seq_len

    def __getitem__(self, idx):
        x = self.price_change[idx:idx + self.seq_len]
        y = self.directions[idx + self.seq_len]
        return torch.tensor(x).unsqueeze(-1), torch.tensor([y])


In [None]:
train_dataset = PriceToDirectionDataset(price_train, dir_train, seq_len=SEQ_LEN)
val_dataset = PriceToDirectionDataset(price_val, dir_val, seq_len=SEQ_LEN)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)


## Model

We build a simple model taking the price change as input, and returning two classification scores (logits) representing unnormalized probabilities for the next candle to go up or down.

In [None]:
import torch
import torch.nn as nn

class Forecast(nn.Module):
    """
    GRU-based sequence model for binary classification of time series direction.

    Given a sequence of prices (or features), the model outputs two scores (logits)
    at each time step, representing unnormalized probabilities for 'up' and 'down' directions.

    Final softmax is applied externally (e.g., in the loss function like nn.CrossEntropyLoss).
    """

    def __init__(self, input_dim=1, hidden_dim=16, num_layers=3):
        """
        Args:
            input_dim (int): Number of input features per time step.
            hidden_dim (int): Size of the GRU hidden state.
            num_layers (int): Number of stacked GRU layers.
        """
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 2)  # Output logits for two classes: [up, down]

    def forward(self, x):
        """
        Forward pass through the model.

        Args:
            x (Tensor): Input tensor of shape (batch, sequence_length, input_dim)

        Returns:
            Tensor: Output logits of shape (batch, sequence_length, 2)
                    representing scores for each class at each time step.
        """
        out, _ = self.gru(x)     # out shape: (batch, seq_len, hidden_dim)
        out = self.fc(out)       # out shape: (batch, seq_len, 2)
        return out


Generate some noise and run it through the model to confirm the proper implementation

In [None]:
x = torch.randn(2, 512, 1)  # batch of 2, seq len 512
model = Forecast()
y = model(x)
print(y)  # should be shaped (2, 512, 2)

## Training

Training setup for this model. We use a cross entropy loss to predict the direction.

In [None]:
from tqdm import tqdm

class Trainer:
    def __init__(self, model, train_loader, val_loader, lr=1e-3, loss_fn=None):
        """
        Trainer for binary classification (up/down) based on GRU outputs.

        Model output: logits of shape (batch, time, 2)
        Target: float labels → mapped to class index:
            y >= 0 → 0 (up)
            y <  0 → 1 (down)
        """
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.opt = torch.optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = loss_fn if loss_fn else nn.CrossEntropyLoss()

        self.train_loss_steps = []
        self.val_loss_checkpoints = []

    def train(self, epochs=10):
        for epoch in range(epochs):
            self.model.train()
            pbar = tqdm(self.train_loader, desc=f"Epoch {epoch+1}", leave=False)
            val_check_interval = max(1, len(self.train_loader) // 10)

            for i, (x, y) in enumerate(pbar):
                x, y = x.to(device), y.to(device)

                # Convert float targets to class indices: y >= 0 → 0, y < 0 → 1
                y_class = (y.squeeze() < 0).long()  # True (down) → 1, False (up) → 0

                logits = self.model(x)[:, -1, :]  # shape: (batch, 2)
                loss = self.loss_fn(logits, y_class)

                assert not torch.isnan(x).any().any()

                self.opt.zero_grad()
                loss.backward()
                self.opt.step()

                self.train_loss_steps.append(loss.item())

                if (i + 1) % val_check_interval == 0:
                    val_loss = self.evaluate()
                    self.val_loss_checkpoints.append(val_loss)
                else:
                    val_loss = self.val_loss_checkpoints[-1] if len(self.val_loss_checkpoints) > 0 else np.nan
                pbar.set_postfix(train_loss=loss.item(), val_loss=val_loss)

    def evaluate(self):
        self.model.eval()
        val_losses = []

        with torch.no_grad():
            for x, y in self.val_loader:
                x, y = x.to(device), y.to(device)
                y_class = (y.squeeze() < 0).long()
                logits = self.model(x)[:, -1, :]
                loss = self.loss_fn(logits, y_class)
                val_losses.append(loss.item())
        self.model.train()

        return sum(val_losses) / len(val_losses)

    def plot_losses(self, smooth_window=1024):
        """
        Plots training loss (smoothed) and validation loss.
        Aligns both curves over the same training duration.

        Args:
            smooth_window (int): Window size for moving average on train loss.
        """
        plt.figure(figsize=(10, 4))

        # Smooth train loss using moving average
        if len(self.train_loss_steps) >= smooth_window:
            kernel = np.ones(smooth_window) / smooth_window
            smoothed_train = np.convolve(self.train_loss_steps, kernel, mode='valid')
            smoothed_x = np.arange(len(smoothed_train)) + smooth_window // 2
            plt.plot(smoothed_x, smoothed_train, label=f"Train Loss (smoothed, {smooth_window})")
        else:
            smoothed_x = np.arange(len(self.train_loss_steps))
            plt.plot(smoothed_x, self.train_loss_steps, label="Train Loss")

        # Rescale val x to match train loss duration
        val_x = np.linspace(smoothed_x[0], smoothed_x[-1], len(self.val_loss_checkpoints))
        plt.plot(val_x, self.val_loss_checkpoints, 'o-', label="Val Loss")

        plt.title("Training & Validation Loss")
        plt.xlabel("Training Steps")
        plt.ylabel("Loss")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()


In [None]:
trainer = Trainer(model, train_loader, val_loader, lr=1e-3)
trainer.train(epochs=7)

In [None]:
trainer.plot_losses()

Train a one layer model. Compare with a 3 layer. Try to double or divide by two the latent dimension. Observe how both the train curve and validation change.

# PART 2

## Backtest

We would like to estimate what would be our profits if we try to trade according to the direction predicted every 30 min. Remember that if we trade futures, we can short the asset.

There is two simple policies we can design to trade this information:
1. Each 30 minutes, we allocate all our remaining capital to a directional move aligned with the highest score
2. Each 30 minutes, we compute the probability for the price increasing $p_{up}$ and the price decreasing $p_{down}$ and substract them to obtain a directional multiplicative factor of the capital we want to allocate $-1 \leq p_{up}-p_{down} \leq 1$.

We assume:
* No transaction fee, spread or slipage
* Trades are always executed

In [None]:
def backtest_log_return(model, prices: pd.Series) -> float:
    log_prices = np.log(prices.values)

    model_input = torch.Tensor(prices.values[:-1]).to(device).reshape(-1, 1)
    logits = model(model_input)  # shape: (T-1, 2)
    probs = torch.softmax(logits, axis=1).detach().cpu().numpy()
    trades = (probs[:, 0] >= 0.5) * 1.0 - (probs[:, 1] >= 0.5)  # long - short
    log_returns = log_prices[1:] * trades
    return log_returns.cumsum(), log_prices

In [None]:
r_disc, log_prices = backtest_log_return(model, price_val)
r_disc

In [None]:
plt.plot(r_disc, label='discrete')
plt.show()

## Bonus

Instead of training the model with the cross entropy loss, formulate a loss based on the profits and train directly an "end-to-end" model. How does the profit and risk profile on the train set change? What is the impact on the validation set?