In [37]:
import os
import yfinance as yf
import pandas as pd
import numpy as np
import torch, math
from torch import nn
from torch.utils.data import Dataset, DataLoader
from scipy import stats

class ReturnsDataset(Dataset):
    def __init__(self, series, m):
        X, y = [], []
        for i in range(m, len(series) - 1):
            X.append(series[i - m:i].values.astype(np.float32))
            y.append((series[i + 1] ** 2).astype(np.float32))
        self.X = torch.tensor(np.array(X))
        self.y = torch.tensor(np.array(y)).unsqueeze(-1)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

class PositionalEncoding(nn.Module):
    def __init__(self, d, L):
        super().__init__()
        P = torch.zeros(L, d)
        pos = torch.arange(0, L).unsqueeze(1)
        div = torch.exp(torch.arange(0, d, 2) * (-math.log(10000.0) / d))
        P[:, 0::2] = torch.sin(pos * div)
        P[:, 1::2] = torch.cos(pos * div)
        self.register_buffer("P", P.unsqueeze(0))
    def forward(self, x):
        return x + self.P[:, : x.size(1)]

class VolTransformer(nn.Module):
    def __init__(self, d=64, h=8, L=4, m=60):
        super().__init__()
        self.prj = nn.Linear(1, d)
        self.pos = PositionalEncoding(d, m)
        enc = nn.TransformerEncoderLayer(d, h, batch_first=True)
        self.enc = nn.TransformerEncoder(enc, L)
        self.head = nn.Sequential(nn.Linear(d, d), nn.ReLU(), nn.Linear(d, 1), nn.Softplus())
    def forward(self, x):
        z = self.prj(x.unsqueeze(-1))
        z = self.pos(z)
        z = self.enc(z)[:, -1]
        return self.head(z).squeeze(-1)

m = 60
df = yf.download("BTC-USD", start="2025-05-04", end="2025-05-12", interval="1m", progress=False)
df["r"] = np.log(df["Close"] / df["Close"].shift(1))
r = df["r"].dropna()

ds = ReturnsDataset(r, m)
n_train = int(0.8 * len(ds))
train_ds, test_ds = torch.utils.data.random_split(
    ds, [n_train, len(ds) - n_train], generator=torch.Generator().manual_seed(0)
)
loader_tr = DataLoader(train_ds, 128, True, drop_last=True)
loader_te = DataLoader(test_ds, 128, False)

dev = "cuda" if torch.cuda.is_available() else "cpu"
net = VolTransformer(m=m).to(dev)
opt = torch.optim.Adam(net.parameters(), 3e-4)
loss_fn = nn.MSELoss()

model_path = "voltransformer.pt"
if os.path.exists(model_path):
    net.load_state_dict(torch.load(model_path, map_location=dev))
else:
    net.train()
    for e in range(500):
        total_loss = 0.0
        for x, y in loader_tr:
            x, y = x.to(dev), y.to(dev)
            opt.zero_grad()
            loss = loss_fn(net(x), y.squeeze(-1))
            loss.backward()
            opt.step()
            total_loss += loss.item() * y.size(0)
        print(f"Epoch {e}: train loss = {total_loss / n_train:.6f}")
    torch.save(net.state_dict(), model_path)

net.eval()
pv, tr = [], []
with torch.no_grad():
    for x, y in loader_te:
        preds = net(x.to(dev)).cpu().numpy()
        pv.extend(preds)
        tr.extend(y.numpy())
pv = np.array(pv).flatten()
tr = np.array(tr).flatten()

start_test = m + n_train + 1
ret_test = r.iloc[start_test : start_test + len(test_ds)]

sigma = np.sqrt(pv)
alpha = 0.01
VaR99 = sigma * stats.norm.ppf(alpha)
viol = (ret_test.values < VaR99).astype(int)
n, x = len(viol), viol.sum()
p_hat = max(x / n, 1e-10)
kupiec = -2 * ((x * np.log(alpha) + (n - x) * np.log(1 - alpha))
               - (x * np.log(p_hat) + (n - x) * np.log(1 - p_hat)))
p_val = stats.chi2.sf(kupiec, 1)

print(f"Number of violations: {x}/{n}")
print(f"Kupiec test statistic: {kupiec:.4f}, p-value = {p_val:.4f}")


  y.append((series[i + 1] ** 2).astype(np.float32))
  net.load_state_dict(torch.load(model_path, map_location=dev))


Number of violations: 1/1901
Kupiec test statistic: 30.3019, p-value = 0.0000


In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import torch, math
from torch import nn
from torch.utils.data import Dataset, DataLoader
from scipy import stats

class ReturnsDataset(Dataset):
    def __init__(self, series, m):
        X, y = [], []
        for i in range(m, len(series) - 1):
            X.append(series.iloc[i - m:i].values.astype(np.float32))
            y.append(np.float32(series.iloc[i + 1] ** 2))
        self.X = torch.from_numpy(np.array(X, dtype=np.float32))
        self.y = torch.from_numpy(np.array(y, dtype=np.float32)).unsqueeze(-1)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx): return self.X[idx], self.y[idx]

class PositionalEncoding(nn.Module):
    def __init__(self, d, L):
        super().__init__()
        P = torch.zeros(L, d)
        pos = torch.arange(L).unsqueeze(1)
        div = torch.exp(torch.arange(0, d, 2) * (-math.log(10000.0) / d))
        P[:, 0::2] = torch.sin(pos * div)
        P[:, 1::2] = torch.cos(pos * div)
        self.register_buffer("P", P.unsqueeze(0))
    def forward(self, x): return x + self.P[:, : x.size(1)]

class VolTransformer(nn.Module):
    def __init__(self, d=64, h=8, L=4, m=120):
        super().__init__()
        self.prj = nn.Linear(1, d)
        self.pos = PositionalEncoding(d, m)
        enc = nn.TransformerEncoderLayer(d, h, dim_feedforward=4 * d, dropout=0.1, batch_first=True, activation="gelu")
        self.enc = nn.TransformerEncoder(enc, L)
        self.head = nn.Sequential(nn.Linear(d, d), nn.GELU(), nn.Linear(d, 1), nn.Softplus())
    def forward(self, x):
        z = self.prj(x.unsqueeze(-1))
        z = self.pos(z)
        z = self.enc(z)[:, -1]
        return self.head(z).squeeze(-1)

m = 120
df = yf.download("BTC-USD", start="2025-05-04", end="2025-05-12", interval="1m", progress=False)
df["r"] = np.log(df["Close"] / df["Close"].shift(1))
r = df["r"].dropna()

ds = ReturnsDataset(r, m)
n_train = int(0.8 * len(ds))
train_ds, test_ds = torch.utils.data.random_split(ds, [n_train, len(ds) - n_train], generator=torch.Generator().manual_seed(0))
loader_tr = DataLoader(train_ds, 128, True, drop_last=True)
loader_te = DataLoader(test_ds, 128, False)

dev = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {dev}")
net = VolTransformer(m=m).to(dev)
opt = torch.optim.AdamW(net.parameters(), 1e-3, weight_decay=1e-4)
loss_fn = nn.MSELoss()

best, wait = float("inf"), 0
for e in range(800):
    net.train()
    s = 0.0
    for x, y in loader_tr:
        x, y = x.to(dev), y.to(dev)
        opt.zero_grad()
        l = loss_fn(net(x), y.squeeze(-1))
        l.backward()
        torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)
        opt.step()
        s += l.item() * y.size(0)
    val = s / n_train
    if val < best * 0.995:
        best, wait = val, 0
        torch.save(net.state_dict(), "best.pt")
    else:
        wait += 1
    if wait == 10: break
    print(e, val)

net.load_state_dict(torch.load("best.pt"))
net.eval()
pv, tr = [], []
with torch.no_grad():
    for x, y in loader_te:
        pv.extend(net(x.to(dev)).cpu().numpy())
        tr.extend(y.numpy())
pv = np.array(pv, dtype=np.float32).flatten()
tr = np.array(tr, dtype=np.float32).flatten()

start_test = m + n_train + 1
ret_test = r.iloc[start_test : start_test + len(test_ds)]

alpha = 0.01
sigma = np.sqrt(pv)
VaR99 = sigma * stats.norm.ppf(alpha)

viol = (ret_test.values < VaR99).astype(int)
n, x = len(viol), viol.sum()
p_hat = max(x / n, 1e-10)
kupiec = -2 * ((x * np.log(alpha) + (n - x) * np.log(1 - alpha)) - (x * np.log(p_hat) + (n - x) * np.log(1 - p_hat)))
p_val = stats.chi2.sf(kupiec, 1)

rmse = np.sqrt(np.mean((pv - (ret_test.values ** 2)) ** 2))

print(f"Violations: {x}/{n} ({x / n:.4%})")
print(f"Kupiec statistic: {kupiec:.4f}")
print(f"Kupiec p-value: {p_val:.4f}")
print(f"Transformer RMSE: {rmse:.6e}")


Using device: cuda
0 0.022635164195305462
1 0.00013216198458389686
2 7.158312318935609e-05
3 4.4876560627995906e-05
4 3.0669619833714405e-05
5 2.220820541352164e-05
6 1.6848729925463685e-05
7 1.3180688560807033e-05
8 1.0601936314981512e-05
9 8.69892465768686e-06
10 7.24431780857062e-06
11 6.146290805067497e-06
12 5.280572633922009e-06
13 4.572427456003182e-06
14 3.989281407294172e-06
15 3.514904333880265e-06
16 3.1224552175262173e-06
17 2.7804354329772396e-06
18 2.5023947341714582e-06
19 2.256648363121368e-06
20 2.0440398877829128e-06
21 1.8573396862444441e-06
22 1.6938306598823114e-06
23 1.5539800055162516e-06
24 1.4262138237495073e-06
25 1.3102064044561888e-06
26 1.2106712773211934e-06
27 1.124964095590451e-06
28 1.0424808056173777e-06
29 9.674302093114018e-07
30 9.006687513298351e-07
31 8.413783888125763e-07
32 7.863532142271248e-07
33 7.350961581937521e-07
34 6.89379695235909e-07
35 6.475729492691372e-07
36 6.074129115546231e-07
37 5.727306305010729e-07
38 5.388726529257275e-07
39 

In [38]:
alpha       = 0.01
z_alpha     = stats.norm.ppf(alpha)
pdf_norm    = stats.norm.pdf(z_alpha)
es_factor_n = pdf_norm / alpha

es_series_n = sigma * es_factor_n

print(f'Average ES at {alpha*100:.1f}% level: {es_series_n.mean():.6f}')


Average ES at 1.0% level: 0.003090


In [39]:
pred_var_tr = pv
actual_sq_tr = (ret_test.values ** 2)
rmse_tr = np.sqrt(np.mean((pred_var_tr - actual_sq_tr) ** 2))
print(f"Transformer RMSE: {rmse_tr:.6e}")

Transformer RMSE: 1.329211e-06


In [26]:
torch.save(net.state_dict(), "voltransformer.pt")