In [39]:
import os, time, json, asyncio, warnings, uuid, contextlib
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

import yfinance as yf
from pandas_datareader import data as web  # Stooq fallback
# Optional FRED fallback if key is set
FRED_API_KEY = os.environ.get("FRED_API_KEY", "").strip()
if FRED_API_KEY:
    try:
        from fredapi import Fred
        _fred = Fred(api_key=FRED_API_KEY)
    except Exception:
        _fred = None
else:
    _fred = None

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error

import torch
import torch.nn as nn
import torch.nn.functional as F

try:
    import nest_asyncio
    nest_asyncio.apply()
except Exception:
    pass

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.admin import AIOKafkaAdminClient, NewTopic

# ---------- Runtime ----------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "127.0.0.1:9094")
RUN_ID = f"{int(time.time())}_{uuid.uuid4().hex[:6]}"
TOPIC  = f"mln_stream_{RUN_ID}"
print("Kafka:", KAFKA_BOOTSTRAP, "| Topic:", TOPIC)

# Sliding-window + horizon
WINDOW  = 16
HORIZON = 1  # one-step ahead

# Evaluation caps
MAX_SCORED = 50     # max scored predictions per asset before stopping


# Map for optional FRED daily series if yfinance fails AND you have a FRED key
FRED_SERIES = {
    "CL=F":     "DCOILWTICO",   # WTI crude oil spot/daily
    "DX-Y.NYB": "DTWEXBGS",     # Broad Dollar Index daily
    "GLD":      "GOLDAMGBD228NLBM",  # London morning fix (daily)
}

def _try_yf(ticker, start, end):
    try:
        df = yf.download(ticker, start=start, end=end, auto_adjust=False, progress=False)
        if isinstance(df, pd.DataFrame) and "Close" in df.columns and not df["Close"].dropna().empty:
            out = df[["Close"]].dropna().copy()
            out.index.name = "Date"
            out.sort_index(inplace=True)
            print(f"[YF] {ticker}: {len(out)} rows")
            return out
    except Exception as e:
        print(f"[YF FAIL] {ticker}: {e}")
    return None

def _try_stooq(ticker, start, end):
    try:
        df = web.DataReader(ticker, "stooq", start=start, end=end)
        df = df.sort_index()
        cols = {c: c.title() for c in df.columns}
        df.rename(columns=cols, inplace=True)
        if "Close" in df.columns and not df["Close"].dropna().empty:
            out = df[["Close"]].dropna().copy()
            out.index.name = "Date"
            print(f"[STQ] {ticker}: {len(out)} rows")
            return out
    except Exception as e:
        print(f"[STQ FAIL] {ticker}: {e}")
    return None

def _try_fred(ticker, start, end):
    if not _fred: 
        return None
    sid = FRED_SERIES.get(ticker)
    if not sid:
        return None
    try:
        s = _fred.get_series(sid, observation_start=start, observation_end=end)
        s = s.dropna()
        if s.empty: 
            return None
        df = pd.DataFrame({"Close": s})
        # FRED dates may be PeriodIndex or DatetimeIndex; ensure DatetimeIndex and daily freq fill
        df.index = pd.to_datetime(df.index)
        df = df.resample("D").ffill().dropna()
        df.index.name = "Date"
        print(f"[FRED] {ticker}->{sid}: {len(df)} rows")
        return df
    except Exception as e:
        print(f"[FRED FAIL] {ticker}: {e}")
        return None

def load_close_series(ticker, start="2023-01-01", end="2024-01-01"):
    # 1) yfinance
    out = _try_yf(ticker, start, end)
    if out is not None: 
        return out
    # 2) stooq fallback (GLD works; CL=F and DX-Y.NYB may not exist on Stooq)
    out = _try_stooq(ticker, start, end)
    if out is not None:
        return out
    # 3) FRED (only if key present and mapped)
    out = _try_fred(ticker, start, end)
    if out is not None:
        return out
    raise RuntimeError(f"No real data available for {ticker}. Provide FRED_API_KEY or adjust tickers/date range.")

START, END = "2023-01-01", "2024-01-01"

ASSETS = {
    "GLD":     {"ticker": "GLD"},
    "WTI":     {"ticker": "CL=F"},
    "DXY":     {"ticker": "DX-Y.NYB"},
}

raw = {}
for name, cfg in ASSETS.items():
    raw[name] = load_close_series(cfg["ticker"], start=START, end=END)

# Per-asset scalers and supervised datasets
scalers, datasets = {}, {}

def build_supervised(series, window=WINDOW, horizon=HORIZON):
    s = np.asarray(series, dtype=np.float32)
    X, y = [], []
    for i in range(len(s) - window - horizon + 1):
        X.append(s[i:i+window])
        y.append(s[i+window:i+window+horizon])
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32).reshape(-1, horizon)

for asset, df in raw.items():
    sc = MinMaxScaler()
    scaled = sc.fit_transform(df[["Close"]]).astype(np.float32).reshape(-1)
    scalers[asset] = sc
    X, y = build_supervised(scaled, WINDOW, HORIZON)
    split = int(len(X)*0.8)
    datasets[asset] = {
        "df": df,
        "scaled": scaled,
        "X_train": torch.tensor(X[:split], dtype=torch.float32, device=device),
        "y_train": torch.tensor(y[:split], dtype=torch.float32, device=device),
        "X_test":  torch.tensor(X[split:], dtype=torch.float32, device=device),
        "y_test":  torch.tensor(y[split:], dtype=torch.float32, device=device),
        "split":   split,
        "X_all":   X, "y_all": y   # numpy for stream slicing
    }
    print(f"{asset}: train {X[:split].shape}, test {X[split:].shape}")


# --- KAN wrapper (your package must be installed/importable) ---
from kan import KAN

class KANForecaster(nn.Module):
    def __init__(self, window=WINDOW, hidden=32, horizon=HORIZON, seed=42):
        super().__init__()
        self.kan = KAN(width=[window, hidden, horizon], grid=2, k=3, seed=seed, device=device.type)

    def forward(self, x):                  # x: [B, W]
        return self.kan(x)

# class KANForecaster(nn.Module):
#     def __init__(self, window, hidden=16, horizon=1, grid=1, k=2, seed=42, device_str="cpu"):
#         super().__init__()
#         # Keep width small + grid=1 + k=2 for speed
#         self.kan = KAN(width=[window, hidden, horizon],
#                        grid=grid, k=k, seed=seed, device=device_str)
#     def forward(self, x):
#         return self.kan(x)

# --- LSTM baseline ---
class LSTMForecaster(nn.Module):
    def __init__(self, input_size=1, hidden=64, num_layers=1, horizon=HORIZON):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden, num_layers, batch_first=True)
        self.fc   = nn.Linear(hidden, horizon)
    def forward(self, x):                  # x: [B, W]
        x = x.unsqueeze(-1)               # [B, W, 1]
        out, _ = self.lstm(x)
        last = out[:, -1, :]              # [B, H]
        return self.fc(last)              # [B, horizon]

# --- Simple Transformer forecaster (Encoder-only) ---
class TransformerForecaster(nn.Module):
    def __init__(self, window=WINDOW, d_model=32, nhead=4, num_layers=2, dim_ff=64, horizon=HORIZON):
        super().__init__()
        self.embed = nn.Linear(1, d_model)
        enc_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_ff, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.pos = nn.Parameter(torch.zeros(1, window, d_model))
        self.fc = nn.Linear(d_model, horizon)
    def forward(self, x):                  # x: [B, W]
        x = x.unsqueeze(-1)               # [B, W, 1]
        z = self.embed(x) + self.pos      # [B, W, d_model]
        z = self.encoder(z)               # [B, W, d_model]
        last = z[:, -1, :]                # [B, d_model]
        return self.fc(last)              # [B, horizon]

# --- Simple TS-Mixer forecaster ---
class TSMixer(nn.Module):
    def __init__(self, window=WINDOW, d_feat=16, time_hidden=64, chan_hidden=32, num_blocks=2, horizon=HORIZON):
        super().__init__()
        self.proj_in  = nn.Linear(1, d_feat)
        self.time_mlps = nn.ModuleList([nn.Sequential(
            nn.LayerNorm([window, d_feat]),
            nn.Linear(window, time_hidden), nn.GELU(),
            nn.Linear(time_hidden, window)
        ) for _ in range(num_blocks)])
        self.chan_mlps = nn.ModuleList([nn.Sequential(
            nn.LayerNorm([window, d_feat]),
            nn.Linear(d_feat, chan_hidden), nn.GELU(),
            nn.Linear(chan_hidden, d_feat)
        ) for _ in range(num_blocks)])
        self.head = nn.Sequential(
            nn.Flatten(), nn.Linear(window*d_feat, 64), nn.GELU(), nn.Linear(64, horizon)
        )
    def forward(self, x):                  # x: [B, W]
        x = x.unsqueeze(-1)               # [B, W, 1]
        z = self.proj_in(x)               # [B, W, F]
        for tmlp, cmlp in zip(self.time_mlps, self.chan_mlps):
            z = z + tmlp(z.transpose(1,2)).transpose(1,2)  # mix over time
            z = z + cmlp(z)                                # mix over channels
        return self.head(z)               # [B, horizon]
    
torch.manual_seed(42)

def train_once(model, Xtr, ytr, Xte, yte, steps, lr, name):
    model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()
    model.train()
    for i in range(steps):
        opt.zero_grad()
        out = model(Xtr)
        loss = loss_fn(out, ytr)
        loss.backward()
        opt.step()
        if (i+1) % 50 == 0:
            print(f"{name} step {i+1}/{steps} | loss={loss.item():.6f}")
    model.eval()
    with torch.no_grad():
        tr = loss_fn(model(Xtr), ytr).item()
        te = loss_fn(model(Xte), yte).item()
    print(f"{name} final train MSE={tr:.6f} | test MSE={te:.6f}")

MODELS = ["KAN","LSTM","Transformer","TSMixer"]
models = {m: {} for m in MODELS}

for asset, d in datasets.items():
    Xtr, ytr, Xte, yte = d["X_train"], d["y_train"], d["X_test"], d["y_test"]
    print(f"\n=== Train on {asset} ===")
    m_kan  = KANForecaster(WINDOW).to(device)
    m_lstm = LSTMForecaster().to(device)
    m_trf  = TransformerForecaster(WINDOW).to(device)
    m_mix  = TSMixer(WINDOW).to(device)

    train_once(m_kan,  Xtr, ytr, Xte, yte, steps=150, lr=1e-2, name=f"[{asset}] KAN")
    train_once(m_lstm, Xtr, ytr, Xte, yte, steps=150, lr=1e-3, name=f"[{asset}] LSTM")
    train_once(m_trf,  Xtr, ytr, Xte, yte, steps=150, lr=1e-3, name=f"[{asset}] Transformer")
    train_once(m_mix,  Xtr, ytr, Xte, yte, steps=150, lr=2e-3, name=f"[{asset}] TSMixer")

    models["KAN"][asset]          = m_kan.eval()
    models["LSTM"][asset]         = m_lstm.eval()
    models["Transformer"][asset]  = m_trf.eval()
    models["TSMixer"][asset]      = m_mix.eval()


async def ensure_topic(name, bootstrap=KAFKA_BOOTSTRAP, partitions=1, rf=1):
    admin = AIOKafkaAdminClient(bootstrap_servers=bootstrap)
    await admin.start()
    try:
        topics = await admin.list_topics()
        if name not in topics:
            await admin.create_topics([NewTopic(name=name, num_partitions=partitions, replication_factor=rf)])
            for _ in range(50):
                if name in await admin.list_topics():
                    break
                await asyncio.sleep(0.2)
    finally:
        await admin.close()

async def kafka_smoke(bootstrap=KAFKA_BOOTSTRAP):
    prod = AIOKafkaProducer(bootstrap_servers=bootstrap, value_serializer=lambda v: json.dumps(v).encode())
    await prod.start()
    await prod.stop()
    print("Kafka reachable:", bootstrap)

await ensure_topic(TOPIC, bootstrap=KAFKA_BOOTSTRAP, partitions=1, rf=1)
await kafka_smoke()
print("Topic ready:", TOPIC)

##### cell 8
# ============================
# Cell 8 — Producer & Consumer (multi-asset, with per-asset limits)
# ============================

# ---- Per-asset target scores = min(MAX_SCORED, len(test set)) ----
TARGET_SCORES = {
    asset: int(min(MAX_SCORED, len(datasets[asset]["X_test"])))
    for asset in ASSETS
}
print("TARGET_SCORES:", TARGET_SCORES)

# Buckets (cleared in Cell 9)
PRED     = {m: {a: [] for a in ASSETS} for m in MODELS}
TRUTH    = {a: [] for a in ASSETS}
INFER_MS = {m: {a: [] for a in ASSETS} for m in MODELS}
E2E_MS   = {a: [] for a in ASSETS}

def stream_slice(asset):
    """Return exactly WINDOW + TARGET_SCORES[asset] scaled points."""
    d   = datasets[asset]
    sc  = d["scaled"]
    split = d["split"]
    need = WINDOW + TARGET_SCORES[asset]
    start_idx = max(0, split - (WINDOW - 1))
    end_idx   = start_idx + need
    return sc[start_idx:end_idx]

def round_robin_payloads():
    """Interleave per-asset sequences up to their target lengths."""
    seqs = {a: stream_slice(a) for a in ASSETS}
    idx  = {a: 0 for a in ASSETS}
    produced = 0
    hard_cap = sum(len(v) for v in seqs.values())
    while produced < hard_cap:
        for a in ASSETS:
            if idx[a] < len(seqs[a]):
                yield a, float(seqs[a][idx[a]])
                idx[a] += 1
                produced += 1

async def producer(bootstrap=KAFKA_BOOTSTRAP, topic=TOPIC, sleep_s=0.0):
    prod = AIOKafkaProducer(
        bootstrap_servers=bootstrap,
        value_serializer=lambda v: json.dumps(v).encode(),
        acks=0, linger_ms=0, compression_type=None
    )
    await prod.start()
    try:
        sent = 0
        for asset, val in round_robin_payloads():
            msg = {"ts": time.time(), "asset": asset, "val": val}
            await prod.send_and_wait(topic, msg)
            sent += 1
            if sleep_s > 0:
                await asyncio.sleep(sleep_s)
        await asyncio.sleep(0.25)  # drain
        print("Producer sent:", sent)
    finally:
        await prod.stop()

async def consumer(bootstrap=KAFKA_BOOTSTRAP, topic=TOPIC):
    cons = AIOKafkaConsumer(
        topic,
        bootstrap_servers=bootstrap,
        value_deserializer=lambda v: json.loads(v.decode()),
        auto_offset_reset="latest",
        group_id=None,
        enable_auto_commit=False,
        client_id="mln_eval_consumer",
        fetch_max_wait_ms=1,
        fetch_min_bytes=1,
        request_timeout_ms=15000
    )
    await cons.start()
    # seek to end to ignore stale data
    while not cons.assignment():
        await asyncio.sleep(0.01)
    end_offsets = await cons.end_offsets(cons.assignment())
    for tp, off in end_offsets.items():
        cons.seek(tp, off)
    await asyncio.sleep(0.5)  # allow fetcher to settle

    # Per-asset buffers, pending predictions, and scored counters
    buf     = {a: [] for a in ASSETS}
    pending = {a: None for a in ASSETS}
    scored  = {a: 0 for a in ASSETS}

    try:
        async for msg in cons:
            recv_t = time.time()
            datum  = msg.value
            if not (isinstance(datum, dict) and "asset" in datum and "val" in datum):
                continue
            asset = datum["asset"]
            val   = float(datum["val"])
            sent_ts = float(datum.get("ts", recv_t))

            # If this asset already reached its target, ignore further ticks for it
            if scored[asset] >= TARGET_SCORES[asset]:
                continue

            # finalize previous prediction for this asset with current truth
            if pending[asset] is not None:
                preds_per_model, pred_sent_ts = pending[asset]
                true_val = scalers[asset].inverse_transform([[val]])[0,0]
                TRUTH[asset].append(true_val)

                for mname, yhat_scaled, t0, t1 in preds_per_model:
                    yhat = scalers[asset].inverse_transform([[yhat_scaled]])[0,0]
                    PRED[mname][asset].append(yhat)
                    INFER_MS[mname][asset].append((t1 - t0)*1000.0)

                E2E_MS[asset].append((recv_t - pred_sent_ts)*1000.0)
                scored[asset] += 1
                pending[asset] = None

                # If ALL assets met their targets, we can stop
                if all(scored[a] >= TARGET_SCORES[a] for a in ASSETS):
                    break

            # issue next prediction if window is ready and we still need predictions
            buf[asset].append(val)
            if len(buf[asset]) >= WINDOW and scored[asset] < TARGET_SCORES[asset]:
                xin = torch.tensor(buf[asset][-WINDOW:], dtype=torch.float32, device=device).unsqueeze(0)
                preds = []
                # KAN
                t0 = time.time()
                with torch.no_grad():
                    yk = models["KAN"][asset](xin).detach().cpu().numpy().flatten()[0]
                t1 = time.time()
                preds.append(("KAN", yk, t0, t1))
                # LSTM
                t0 = time.time()
                with torch.no_grad():
                    yl = models["LSTM"][asset](xin).detach().cpu().numpy().flatten()[0]
                t1 = time.time()
                preds.append(("LSTM", yl, t0, t1))
                # Transformer
                t0 = time.time()
                with torch.no_grad():
                    yt = models["Transformer"][asset](xin).detach().cpu().numpy().flatten()[0]
                t1 = time.time()
                preds.append(("Transformer", yt, t0, t1))
                # TSMixer
                t0 = time.time()
                with torch.no_grad():
                    ym = models["TSMixer"][asset](xin).detach().cpu().numpy().flatten()[0]
                t1 = time.time()
                preds.append(("TSMixer", ym, t0, t1))

                pending[asset] = (preds, sent_ts)

    finally:
        await cons.stop()

Device: cpu
Kafka: 127.0.0.1:9094 | Topic: mln_stream_1758713926_2e3ffe
[YF] GLD: 250 rows
[YF] CL=F: 250 rows
[YF] DX-Y.NYB: 250 rows
GLD: train (187, 16), test (47, 16)
WTI: train (187, 16), test (47, 16)
DXY: train (187, 16), test (47, 16)

=== Train on GLD ===
checkpoint directory created: ./model
saving model version 0.0
[GLD] KAN step 50/150 | loss=0.005610
[GLD] KAN step 100/150 | loss=0.003929
[GLD] KAN step 150/150 | loss=0.003341
[GLD] KAN final train MSE=0.003333 | test MSE=0.004660
[GLD] LSTM step 50/150 | loss=0.030687
[GLD] LSTM step 100/150 | loss=0.016926
[GLD] LSTM step 150/150 | loss=0.011082
[GLD] LSTM final train MSE=0.010996 | test MSE=0.013676
[GLD] Transformer step 50/150 | loss=0.013016
[GLD] Transformer step 100/150 | loss=0.007777
[GLD] Transformer step 150/150 | loss=0.004849
[GLD] Transformer final train MSE=0.003557 | test MSE=0.005222
[GLD] TSMixer step 50/150 | loss=0.005804
[GLD] TSMixer step 100/150 | loss=0.003573
[GLD] TSMixer step 150/150 | loss=0.00

In [None]:
# ============================
# Orchestrate & Summarize (consumer FIRST)
# ============================

def summarize():
    def _mape(y, yhat):
        y, yhat = np.asarray(y), np.asarray(yhat)
        if len(y) == 0: return np.nan
        return float(np.mean(np.abs((y - yhat) / (np.abs(y) + 1e-8))) * 100.0)

    for asset in ASSETS:
        y_true = TRUTH[asset]
        print(f"\n=== {asset} ===")
        print(f"scored truths: {len(y_true)}")
        rows = []
        for m in MODELS:
            yhat = PRED[m][asset]
            mae  = mean_absolute_error(y_true, yhat) if len(yhat) else np.nan
            rmse = mean_squared_error(y_true, yhat, squared=False) if len(yhat) else np.nan
            mp   = _mape(y_true, yhat)
            infs = INFER_MS[m][asset]
            rows.append({
                "Model": m,
                "MAPE (%)": mp,
                "RMSE": rmse,
                "MAE": mae,
                "Avg Inference (ms)": float(np.mean(infs)) if infs else np.nan,
                "p50 Inference (ms)": float(np.quantile(infs, 0.50)) if infs else np.nan,
                "p95 Inference (ms)": float(np.quantile(infs, 0.95)) if infs else np.nan,
                "Samples": len(yhat)
            })
        df = pd.DataFrame(rows).sort_values("MAPE (%)")
        print(df.to_string(index=False))

        lat = E2E_MS[asset]
        if len(lat):
            print(f"E2E Latency  avg={np.mean(lat):.2f} ms | p50={np.quantile(lat,0.5):.2f} ms | p95={np.quantile(lat,0.95):.2f} ms")
        else:
            print("E2E Latency  (no samples)")

async def main(timeout_s=90.0, sleep_between_msgs=0.0):
    # Ensure the topic exists (idempotent)
    await ensure_topic(TOPIC, bootstrap=KAFKA_BOOTSTRAP, partitions=1, rf=1)

    # Clear buckets in case of reruns
    for m in MODELS:
        for a in ASSETS:
            PRED[m][a].clear()
            INFER_MS[m][a].clear()
    for a in ASSETS:
        TRUTH[a].clear()
        E2E_MS[a].clear()

    # 1) Start CONSUMER first so it seeks to end and waits for new data
    cons_task = asyncio.create_task(consumer())
    await asyncio.sleep(0.7)  # small barrier to settle

    # 2) Start PRODUCER (bounded stream by TARGET_SCORES)
    prod_task = asyncio.create_task(producer(sleep_s=sleep_between_msgs))

    # 3) Wait bounded time (to avoid forever hanging)
    try:
        await asyncio.wait_for(asyncio.gather(prod_task, cons_task), timeout=timeout_s)
    except (asyncio.TimeoutError, asyncio.CancelledError):
        print(f"[WARN] Timed out after {timeout_s:.1f}s or cancelled; using partial results.")
        with contextlib.suppress(Exception):
            await prod_task
        with contextlib.suppress(Exception):
            await cons_task

    summarize()

# Run
await main(timeout_s=120.0, sleep_between_msgs=0.0)

Producer sent: 189

=== GLD ===
scored truths: 47
      Model  MAPE (%)     RMSE      MAE  Avg Inference (ms)  p50 Inference (ms)  p95 Inference (ms)  Samples
        KAN  0.616697 1.493197 1.126402           22.699331           22.143841           26.733541       47
    TSMixer  0.623301 1.501040 1.140517            0.238231            0.230789            0.336051       47
Transformer  0.660299 1.602007 1.206494            0.326101            0.321865            0.390434       47
       LSTM  1.382118 3.058881 2.508486            0.384818            0.371933            0.543594       47
E2E Latency  avg=841.58 ms | p50=863.36 ms | p95=1644.67 ms

=== WTI ===
scored truths: 47
      Model  MAPE (%)     RMSE      MAE  Avg Inference (ms)  p50 Inference (ms)  p95 Inference (ms)  Samples
        KAN  2.081370 2.082632 1.658810           22.058152           21.957159           23.695064       47
    TSMixer  2.115231 2.085316 1.694512            0.230845            0.226974            0.281

In [None]:
# ============================
# Statistical tests (KAN vs baselines), per asset
# ============================
import numpy as np
import pandas as pd

# SciPy may not be preinstalled in some base envs
try:
    from scipy.stats import ttest_rel
except Exception:
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "-q", "scipy"], check=True)
    from scipy.stats import ttest_rel

def ape_series(y_true, y_pred, eps=1e-8):
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    n = min(len(y_true), len(y_pred))
    if n == 0:
        return np.array([], dtype=np.float64)
    y_true = y_true[:n]
    y_pred = y_pred[:n]
    return np.abs((y_true - y_pred) / (np.abs(y_true) + eps))

def cohen_d_paired(a, b):
    # effect size for paired samples (mean of diffs / std of diffs)
    d = np.asarray(a) - np.asarray(b)
    if len(d) < 2 or np.std(d, ddof=1) == 0:
        return np.nan
    return float(np.mean(d) / np.std(d, ddof=1))

def holm_bonferroni(pvals, alpha=0.05):
    """
    Holm step-down: returns list of booleans (reject or not) in the *original* order.
    """
    idx_sorted = np.argsort(pvals)
    rejs = [False]*len(pvals)
    for k, i in enumerate(idx_sorted, start=1):
        if pvals[i] <= alpha / (len(pvals) - k + 1):
            rejs[i] = True
        else:
            # once we hit a non-reject, all larger p's are non-reject
            break
    return rejs

def ttest_kan_vs_baselines_for_asset(asset, baselines=("LSTM", "Transformer", "TSMixer")):
    results = []
    y_true = TRUTH[asset]
    ape_kan = ape_series(y_true, PRED["KAN"][asset])

    if len(ape_kan) == 0:
        return pd.DataFrame(columns=["Baseline","n","KAN mean APE","Baseline mean APE","ΔAPE (mean)","t","p","Holm α=0.05","Cohen d"])

    pvals = []
    rows_tmp = []

    for b in baselines:
        ape_b = ape_series(y_true, PRED[b][asset])
        n = min(len(ape_kan), len(ape_b))
        if n < 3:
            rows_tmp.append((b, n, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan))
            pvals.append(1.0)
            continue

        # paired t-test on sample-wise APE
        t_stat, p_val = ttest_rel(ape_kan[:n], ape_b[:n], alternative='less')  
        # 'less' because H1: KAN APE < Baseline APE

        delta = float(np.mean(ape_kan[:n] - ape_b[:n]))  # negative means KAN better
        d_eff = cohen_d_paired(ape_kan[:n], ape_b[:n])

        rows_tmp.append((b, n,
                         float(np.mean(ape_kan[:n]))*100.0,
                         float(np.mean(ape_b[:n]))*100.0,
                         delta*100.0,   # report in %
                         float(t_stat),
                         float(p_val),
                         None,          # placeholder for Holm decision
                         d_eff))
        pvals.append(p_val)

    # Holm–Bonferroni across the baselines for this asset
    decisions = holm_bonferroni(pvals, alpha=0.05)
    out_rows = []
    for (b, n, mkan, mbase, dlt, t_stat, p_val, _, d_eff), rej in zip(rows_tmp, decisions):
        out_rows.append({
            "Baseline": b,
            "n": n,
            "KAN mean APE (%)": mkan,
            f"{b} mean APE (%)": mbase,
            "ΔAPE mean (KAN - Baseline) (%)": dlt,
            "t": t_stat,
            "p": p_val,
            "Holm α=0.05 (KAN better?)": "YES" if rej else "no",
            "Cohen d (paired)": d_eff
        })
    return pd.DataFrame(out_rows)

# ---- Run tests per asset and print nicely
for asset in ASSETS:
    print(f"\n=== Paired t-tests: KAN vs baselines on {asset} ===")
    df_tests = ttest_kan_vs_baselines_for_asset(asset)
    if df_tests.empty:
        print("Not enough samples to test.")
        continue
    # Order by p-value for quick reading
    df_tests = df_tests.sort_values("p")
    # Round a bit for readability
    with pd.option_context('display.max_columns', None):
        print(df_tests.round({
            "KAN mean APE (%)": 3,
            f"LSTM mean APE (%)": 3 if "LSTM" in df_tests.columns else 3,
            f"Transformer mean APE (%)": 3 if "Transformer" in df_tests.columns else 3,
            f"TSMixer mean APE (%)": 3 if "TSMixer" in df_tests.columns else 3,
            "ΔAPE mean (KAN - Baseline) (%)": 3,
            "t": 3, "p": 4, "Cohen d (paired)": 3
        }).to_string(index=False))

    # Quick interpretation line
    for _, r in df_tests.iterrows():
        verdict = "significantly better" if r["Holm α=0.05 (KAN better?)"] == "YES" else "not significantly better"
        print(f"→ Against {r['Baseline']}, KAN is {verdict} on {asset} (p={r['p']:.4g}).")


=== Paired t-tests: KAN vs baselines on GLD ===
   Baseline  n  KAN mean APE (%)  LSTM mean APE (%)  ΔAPE mean (KAN - Baseline) (%)      t      p Holm α=0.05 (KAN better?)  Cohen d (paired)  Transformer mean APE (%)  TSMixer mean APE (%)
       LSTM 47             0.617              1.382                          -0.765 -5.261 0.0000                       YES            -0.767                       NaN                   NaN
Transformer 47             0.617                NaN                          -0.044 -1.021 0.1563                        no            -0.149                      0.66                   NaN
    TSMixer 47             0.617                NaN                          -0.007 -0.257 0.3992                        no            -0.037                       NaN                 0.623
→ Against LSTM, KAN is significantly better on GLD (p=1.82e-06).
→ Against Transformer, KAN is not significantly better on GLD (p=0.1563).
→ Against TSMixer, KAN is not significantly better o