# Calibrate all snapshots (BTC and ETH) → Excel workbook

This notebook calibrates **Black**, **Heston**, and **SVCJ** to **all** `deribit_options_snapshot_*.csv` files in `data/`, then persists:

- model parameters (3 sheets)
- train and test datasets used (2 sheets), each with **three additional columns**: `price_black`, `price_heston`, `price_svcj`

Key features:

- **Resume**: if the Excel file already exists, we continue from the first snapshot **after** the latest `timestamp` present in `black_params` for each currency.
- **Periodic saving**: flush to Excel every `SAVE_EVERY_N_FILES` processed snapshots (atomic write).
- **Warm start**: per-thread warm start within chunk; chunk-start warm start pulled from the existing workbook (latest successful params prior to the chunk's first timestamp).
- **Threading**: multiple worker threads process contiguous chunks; the main thread commits results **in chronological order** to keep resume safe.


In [None]:
# Imports and project path setup
from __future__ import annotations

import os
import sys
import re
import math
import contextlib
import threading
import queue
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional, Dict, Tuple, Any

import numpy as np
import pandas as pd

try:
    from threadpoolctl import threadpool_limits
except Exception:
    threadpool_limits = None

# --- locate project root (folder that contains `src/` and `data/`)
ROOT = Path.cwd().resolve()
candidates = [ROOT, ROOT.parent, *ROOT.parents]
for c in candidates:
    if (c / "src").exists() and (c / "data").exists():
        ROOT = c
        break

assert (ROOT / "src").exists() and (ROOT / "data").exists(), f"Could not find project root from cwd={Path.cwd()}"

# Make project importable
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

from src.calibration import (
    filter_liquid_options,
    calibrate_model,
    price_dataframe,
    WeightConfig,
)
from src.inverse_fft_pricer import FFTParams

pd.set_option("display.max_columns", 200)

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"

## Global configuration

In [None]:
# -----------------------------
# Output / resume
# -----------------------------
OUTPUT_XLSX = ROOT / "calibration_results.xlsx"
RESUME = True
SAVE_EVERY_N_FILES = 10  # flush workbook every N committed snapshots (per currency run)

# If you want a quick smoke test, set to an integer (e.g., 1 or 2). None means process all.
SMOKE_TEST_MAX_FILES_PER_CURRENCY: Optional[int] = None

# -----------------------------
# Global filter rules (liquidity cleaning)
# -----------------------------
FILTER_RULES = dict(
    require_bid_ask=True,             # drop rows missing bid OR ask (hard requirement)
    min_time_to_maturity=1/365,
    max_time_to_maturity=None,
    min_open_interest=1.0,
    min_vega=0.0,
    max_rel_spread=0.50,              # relative spread cap (ask-bid)/mid
    moneyness_range=(0.5, 2.0),       # K/F0 range
    drop_synthetic_underlyings=False, # optionally drop SYN.* futures underlyings
)

# Skip snapshot/currency if too few options remain after filtering
MIN_OPTIONS_AFTER_FILTER = 50

# -----------------------------
# Parallelism (per-snapshot processing)
# -----------------------------
# N_THREADS = max(1, int((os.cpu_count() or 4) - 2))
N_THREADS = 6

# Prevent oversubscription: each worker may call heavy NumPy/SciPy/FFT kernels.
LIMIT_INTERNAL_THREADS = True
INTERNAL_NUM_THREADS = 1  # recommended = 1 when using N_THREADS > 1

# -----------------------------
# Global weight config (used in calibration residuals)
# r_i = w_i * (P_model_i - P_mkt_i)
# -----------------------------
WEIGHT_CONFIG = WeightConfig(
    use_spread=True,
    use_vega=False,
    use_open_interest=False,
    spread_power=1.0,
    vega_power=0.5,
    oi_power=0.5,
    eps_spread=1e-6,
    eps_other=1e-12,
    cap=1e6,
)

# -----------------------------
# Carr–Madan FFT base parameters
# (dynamic_b=False here because we precompute per-expiry b per snapshot)
# -----------------------------
FFT_BASE = FFTParams(
    N=2**12,
    eta=0.10,
    alpha=1.5,
    b=-10.0,          # overridden by per-expiry b
    use_simpson=True,
)

# -----------------------------
# Calibration knobs
# -----------------------------
TRAIN_FRAC = 0.70
GLOBAL_RANDOM_SEED = 123

# max_nfev is the main runtime control (per model per snapshot)
MAX_NFEV = dict(
    black=200,
    heston=200,
    svcj=200,
)

# -----------------------------
# Runtime guards (optional, recommended when looping many snapshots)
# -----------------------------
RUNTIME_TOP_EXPIRIES_BY_OI = None   # keep only top expiries by total OI per snapshot/currency
RUNTIME_MAX_OPTIONS = None          # cap number of options per snapshot/currency after filtering

# -----------------------------
# Currency list
# -----------------------------
CURRENCIES = ["BTC", "ETH"]


## Excel schema + persistence helpers

In [None]:
# Excel schema and helpers
PARAM_SHEET_BLACK = "black_params"
PARAM_SHEET_HESTON = "heston_params"
PARAM_SHEET_SVCJ = "svcj_params"
TRAIN_SHEET = "train_data"
TEST_SHEET = "test_data"

PARAM_COLS_COMMON = [
    "timestamp",          # snapshot timestamp from filename (ISO string, e.g. 2026-02-04T09:36:20Z)
    "currency",
    "success",
    "message",
    "nfev",
    "rmse_fit",
    "mae_fit",
    "rmse_train",
    "mae_train",
    "rmse_test",
    "mae_test",
    "n_options_total",
    "n_train",
    "n_test",
    "random_seed",
]

PARAM_COLS_BLACK = PARAM_COLS_COMMON + ["sigma"]
PARAM_COLS_HESTON = PARAM_COLS_COMMON + ["kappa", "theta", "sigma_v", "rho", "v0"]
PARAM_COLS_SVCJ = PARAM_COLS_COMMON + ["kappa", "theta", "sigma_v", "rho", "v0", "lam", "ell_y", "sigma_y", "ell_v", "rho_j"]

# Preferred (front) columns for train/test sheets; any other columns will be appended after these.
TRAIN_TEST_FRONT_COLS = [
    "snapshot_ts",
    "currency",
    "instrument_name",
    "option_type",
    "strike",
    "expiry_datetime",
    "time_to_maturity",
    "futures_price",
    "bid_price",
    "ask_price",
    "mid_price_clean",
    "rel_spread",
    "open_interest",
    "vega",
    "random_seed",
    "price_black",
    "price_heston",
    "price_svcj",
]

def _empty_df(cols: list[str]) -> pd.DataFrame:
    return pd.DataFrame({c: pd.Series(dtype="object") for c in cols})

def init_empty_workbook() -> dict[str, pd.DataFrame]:
    return {
        PARAM_SHEET_BLACK: _empty_df(PARAM_COLS_BLACK),
        PARAM_SHEET_HESTON: _empty_df(PARAM_COLS_HESTON),
        PARAM_SHEET_SVCJ: _empty_df(PARAM_COLS_SVCJ),
        TRAIN_SHEET: pd.DataFrame(),
        TEST_SHEET: pd.DataFrame(),
    }

def normalize_timestamp_series(s: pd.Series) -> pd.Series:
    """Normalize timestamps to ISO strings (UTC with trailing 'Z')."""
    dt = pd.to_datetime(s, utc=True, errors="coerce")
    out = dt.dt.strftime("%Y-%m-%dT%H:%M:%SZ")
    out = out.where(out.notna(), s.astype(str))
    return out

def ensure_param_columns(df: pd.DataFrame, expected_cols: list[str]) -> pd.DataFrame:
    out = df.copy()
    for c in expected_cols:
        if c not in out.columns:
            out[c] = np.nan
    out = out[expected_cols]
    if "timestamp" in out.columns:
        out["timestamp"] = normalize_timestamp_series(out["timestamp"])
    return out

def order_train_test_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Put important columns first; keep the rest in stable (sorted) order."""
    if df is None or len(df) == 0:
        return df if df is not None else pd.DataFrame()
    front = [c for c in TRAIN_TEST_FRONT_COLS if c in df.columns]
    rest = [c for c in df.columns if c not in front]
    rest_sorted = sorted(rest)
    return df[front + rest_sorted]

def load_existing_workbook(path: Path) -> dict[str, pd.DataFrame]:
    if not path.exists():
        return init_empty_workbook()

    sheets = pd.read_excel(path, sheet_name=None, engine="openpyxl")
    wb = init_empty_workbook()

    # params sheets: enforce schema
    if PARAM_SHEET_BLACK in sheets:
        wb[PARAM_SHEET_BLACK] = ensure_param_columns(sheets[PARAM_SHEET_BLACK], PARAM_COLS_BLACK)
    if PARAM_SHEET_HESTON in sheets:
        wb[PARAM_SHEET_HESTON] = ensure_param_columns(sheets[PARAM_SHEET_HESTON], PARAM_COLS_HESTON)
    if PARAM_SHEET_SVCJ in sheets:
        wb[PARAM_SHEET_SVCJ] = ensure_param_columns(sheets[PARAM_SHEET_SVCJ], PARAM_COLS_SVCJ)

    # train/test: keep as-is; we'll align columns on append
    if TRAIN_SHEET in sheets:
        wb[TRAIN_SHEET] = sheets[TRAIN_SHEET].copy()
        if "snapshot_ts" in wb[TRAIN_SHEET].columns:
            wb[TRAIN_SHEET]["snapshot_ts"] = normalize_timestamp_series(wb[TRAIN_SHEET]["snapshot_ts"])
    if TEST_SHEET in sheets:
        wb[TEST_SHEET] = sheets[TEST_SHEET].copy()
        if "snapshot_ts" in wb[TEST_SHEET].columns:
            wb[TEST_SHEET]["snapshot_ts"] = normalize_timestamp_series(wb[TEST_SHEET]["snapshot_ts"])

    return wb

def get_latest_processed_timestamp(wb: dict[str, pd.DataFrame], currency: str) -> Optional[pd.Timestamp]:
    """Resume key: latest timestamp in black_params for the given currency."""
    df = wb.get(PARAM_SHEET_BLACK, pd.DataFrame())
    if df is None or df.empty or "timestamp" not in df.columns:
        return None
    sub = df[df["currency"].astype(str) == str(currency)].copy()
    if sub.empty:
        return None
    dt = pd.to_datetime(sub["timestamp"], utc=True, errors="coerce")
    dt = dt.dropna()
    if dt.empty:
        return None
    return dt.max()

def flush_workbook_atomic(wb: dict[str, pd.DataFrame], output_path: Path) -> None:
    """Write workbook to disk (atomic replace)."""
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Ensure stable ordering
    wb_to_write = dict(wb)

    wb_to_write[PARAM_SHEET_BLACK] = ensure_param_columns(wb_to_write.get(PARAM_SHEET_BLACK, pd.DataFrame()), PARAM_COLS_BLACK)
    wb_to_write[PARAM_SHEET_HESTON] = ensure_param_columns(wb_to_write.get(PARAM_SHEET_HESTON, pd.DataFrame()), PARAM_COLS_HESTON)
    wb_to_write[PARAM_SHEET_SVCJ] = ensure_param_columns(wb_to_write.get(PARAM_SHEET_SVCJ, pd.DataFrame()), PARAM_COLS_SVCJ)

    # sort param sheets
    for sh in [PARAM_SHEET_BLACK, PARAM_SHEET_HESTON, PARAM_SHEET_SVCJ]:
        df = wb_to_write[sh].copy()
        if len(df):
            df["timestamp_dt"] = pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
            df = df.sort_values(["currency", "timestamp_dt"]).drop(columns=["timestamp_dt"])
        wb_to_write[sh] = df

    # order train/test columns
    wb_to_write[TRAIN_SHEET] = order_train_test_columns(wb_to_write.get(TRAIN_SHEET, pd.DataFrame()))
    wb_to_write[TEST_SHEET] = order_train_test_columns(wb_to_write.get(TEST_SHEET, pd.DataFrame()))

    # sort train/test rows
    for sh in [TRAIN_SHEET, TEST_SHEET]:
        df = wb_to_write.get(sh, pd.DataFrame()).copy()
        if len(df):
            if "snapshot_ts" in df.columns:
                df["_snapshot_dt"] = pd.to_datetime(df["snapshot_ts"], utc=True, errors="coerce")
            else:
                df["_snapshot_dt"] = pd.NaT
            if "expiry_datetime" in df.columns:
                df["_expiry_dt"] = pd.to_datetime(df["expiry_datetime"], utc=True, errors="coerce")
            else:
                df["_expiry_dt"] = pd.NaT
            sort_cols = ["currency", "_snapshot_dt", "_expiry_dt"]
            if "strike" in df.columns:
                sort_cols.append("strike")
            df = df.sort_values(sort_cols).drop(columns=[c for c in ["_snapshot_dt", "_expiry_dt"] if c in df.columns])
        wb_to_write[sh] = df

    with NamedTemporaryFile("wb", suffix=".xlsx", delete=False) as tmp:
        tmp_path = Path(tmp.name)

    try:
        with pd.ExcelWriter(tmp_path, engine="openpyxl") as writer:
            for sheet_name, df in wb_to_write.items():
                if df is None:
                    df = pd.DataFrame()
                df.to_excel(writer, sheet_name=sheet_name, index=False)
        os.replace(tmp_path, output_path)
    finally:
        if tmp_path.exists():
            try:
                tmp_path.unlink()
            except Exception:
                pass

def append_df(wb: dict[str, pd.DataFrame], sheet: str, new: pd.DataFrame) -> None:
    if new is None or len(new) == 0:
        return
    if sheet not in wb or wb[sheet] is None or wb[sheet].empty:
        wb[sheet] = new.copy()
        return
    wb[sheet] = pd.concat([wb[sheet], new], ignore_index=True, sort=False)

def latest_successful_params_before(
    params_df: pd.DataFrame, *, currency: str, ts0: pd.Timestamp, required_cols: list[str]
) -> Optional[dict[str, float]]:
    """Return latest successful params row before ts0 (converted to dict)."""
    if params_df is None or params_df.empty:
        return None
    sub = params_df.copy()
    sub = sub[sub["currency"].astype(str) == str(currency)]
    if sub.empty:
        return None
    sub_dt = pd.to_datetime(sub["timestamp"], utc=True, errors="coerce")
    sub = sub.assign(_ts=sub_dt)
    sub = sub[(sub["_ts"].notna()) & (sub["_ts"] < ts0)]
    if "success" in sub.columns:
        sub = sub[sub["success"] == True]  # noqa: E712
    if sub.empty:
        return None
    sub = sub.sort_values("_ts").iloc[-1]
    out: dict[str, float] = {}
    for c in required_cols:
        if c in sub.index and pd.notna(sub[c]):
            try:
                out[c] = float(sub[c])
            except Exception:
                pass
    return out if out else None


## Helper utilities

In [None]:
def list_snapshot_files(data_dir: Path) -> list[Path]:
    files = sorted(data_dir.glob("deribit_options_snapshot_*.csv"))
    files = [f for f in files if not f.name.startswith("._")]  # drop macOS metadata files
    return files

_TS_RE = re.compile(r"deribit_options_snapshot_(\d{8}T\d{6})Z\.csv$")

def timestamp_from_filename(path: Path) -> pd.Timestamp:
    m = _TS_RE.search(path.name)
    if not m:
        raise ValueError(f"Cannot parse timestamp from filename: {path.name}")
    return pd.to_datetime(m.group(1), format="%Y%m%dT%H%M%S", utc=True)

def timestamp_to_iso_z(ts: pd.Timestamp) -> str:
    ts = pd.to_datetime(ts, utc=True)
    return ts.strftime("%Y-%m-%dT%H:%M:%SZ")

def restrict_for_runtime(
    df: pd.DataFrame, *, top_expiries: Optional[int], max_options: Optional[int], random_state: int
) -> pd.DataFrame:
    """Optional additional selection to keep per-snapshot runtime under control."""
    out = df.copy()

    if top_expiries is not None and top_expiries > 0:
        oi_by_expiry = out.groupby("expiry_datetime")["open_interest"].sum().sort_values(ascending=False)
        keep_expiries = oi_by_expiry.head(top_expiries).index
        out = out[out["expiry_datetime"].isin(keep_expiries)].copy()

    if max_options is not None and len(out) > max_options:
        out = out.sample(n=max_options, random_state=random_state).copy()

    return out.reset_index(drop=True)

def compute_errors(y_true: np.ndarray, y_pred: np.ndarray, *, min_finite_frac: float = 0.8) -> dict[str, float]:
    """Compute MSE/MAE, guarding against occasional non-finite outputs."""
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    finite = np.isfinite(y_pred)
    if finite.mean() < min_finite_frac:
        return {"mse": float("nan"), "mae": float("nan")}
    yt = y_true[finite]
    yp = y_pred[finite]
    mse = float(np.mean((yp - yt) ** 2))
    mae = float(np.mean(np.abs(yp - yt)))
    return {"mse": mse, "mae": mae}

def chunk_contiguous(items: list[Any], n_chunks: int) -> list[list[tuple[int, Any]]]:
    """Split items into n_chunks contiguous chunks, returning list of (index,item) lists."""
    n = len(items)
    if n == 0:
        return []
    n_chunks = max(1, min(n_chunks, n))
    sizes = [(n // n_chunks) + (1 if i < (n % n_chunks) else 0) for i in range(n_chunks)]
    chunks: list[list[tuple[int, Any]]] = []
    start = 0
    for sz in sizes:
        chunk = [(start + j, items[start + j]) for j in range(sz)]
        chunks.append(chunk)
        start += sz
    return chunks


## Core routine: process one snapshot and one currency → Excel payload

In [None]:
def process_snapshot_to_excel_payload(
    csv_path: Path,
    *,
    currency: str,
    filter_rules: dict,
    weight_config: WeightConfig,
    fft_base: FFTParams,
    max_nfev: dict,
    train_frac: float,
    random_seed: int,
    runtime_top_expiries_by_oi: Optional[int],
    runtime_max_options: Optional[int],
    min_options_after_filter: int,
    warm_start: Optional[dict[str, dict[str, float]]] = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """
    Returns a payload dict containing:
      - timestamp_iso, currency
      - param_rows: {black: df1row, heston: df1row, svcj: df1row}
      - train_df, test_df (each includes price_* columns)
      - warm_next (dict for warm start)
    """
    ts = timestamp_from_filename(csv_path)
    ts_iso = timestamp_to_iso_z(ts)

    warm_start = warm_start or {}
    warm_next: dict[str, dict[str, float]] = {k: dict(v) for k, v in warm_start.items()}

    empty_tt = pd.DataFrame()

    def _make_param_row(model: str, *, success: bool, message: str, nfev: int,
                        rmse_fit: float, mae_fit: float, rmse_train: float, mae_train: float,
                        rmse_test: float, mae_test: float,
                        n_total: int, n_train: int, n_test: int,
                        params: Optional[dict[str, float]]) -> pd.DataFrame:
        base = dict(
            timestamp=ts_iso,
            currency=currency,
            success=bool(success),
            message=str(message),
            nfev=int(nfev),
            rmse_fit=float(rmse_fit) if rmse_fit is not None else float("nan"),
            mae_fit=float(mae_fit) if mae_fit is not None else float("nan"),
            rmse_train=float(rmse_train) if rmse_train is not None else float("nan"),
            mae_train=float(mae_train) if mae_train is not None else float("nan"),
            rmse_test=float(rmse_test) if rmse_test is not None else float("nan"),
            mae_test=float(mae_test) if mae_test is not None else float("nan"),
            n_options_total=int(n_total),
            n_train=int(n_train),
            n_test=int(n_test),
            random_seed=int(random_seed),
        )
        params = params or {}
        base.update(params)
        return pd.DataFrame([base])

    # --- Load & filter
    df_raw = pd.read_csv(csv_path)
    df_ccy = df_raw[df_raw["currency"].astype(str) == str(currency)].copy()

    if df_ccy.empty:
        msg = f"No rows found for currency={currency} in this snapshot."
        black_row = _make_param_row("black", success=False, message=msg, nfev=0,
                                   rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                   rmse_test=np.nan, mae_test=np.nan,
                                   n_total=0, n_train=0, n_test=0, params={"sigma": np.nan})
        heston_row = _make_param_row("heston", success=False, message=msg, nfev=0,
                                    rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                    rmse_test=np.nan, mae_test=np.nan,
                                    n_total=0, n_train=0, n_test=0, params={})
        svcj_row = _make_param_row("svcj", success=False, message=msg, nfev=0,
                                  rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                  rmse_test=np.nan, mae_test=np.nan,
                                  n_total=0, n_train=0, n_test=0, params={})
        return {
            "timestamp_iso": ts_iso,
            "currency": currency,
            "param_rows": {
                "black": ensure_param_columns(black_row, PARAM_COLS_BLACK),
                "heston": ensure_param_columns(heston_row, PARAM_COLS_HESTON),
                "svcj": ensure_param_columns(svcj_row, PARAM_COLS_SVCJ),
            },
            "train_df": empty_tt,
            "test_df": empty_tt,
            "warm_next": warm_next,
        }

    df_filt = filter_liquid_options(df_ccy, **filter_rules)

    if df_filt is None or df_filt.empty or len(df_filt) < min_options_after_filter:
        n_total = 0 if (df_filt is None) else int(len(df_filt))
        msg = f"Skipped: too few options after filtering (n={n_total}, min={min_options_after_filter})."
        black_row = _make_param_row("black", success=False, message=msg, nfev=0,
                                   rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                   rmse_test=np.nan, mae_test=np.nan,
                                   n_total=n_total, n_train=0, n_test=0, params={"sigma": np.nan})
        heston_row = _make_param_row("heston", success=False, message=msg, nfev=0,
                                    rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                    rmse_test=np.nan, mae_test=np.nan,
                                    n_total=n_total, n_train=0, n_test=0, params={})
        svcj_row = _make_param_row("svcj", success=False, message=msg, nfev=0,
                                  rmse_fit=np.nan, mae_fit=np.nan, rmse_train=np.nan, mae_train=np.nan,
                                  rmse_test=np.nan, mae_test=np.nan,
                                  n_total=n_total, n_train=0, n_test=0, params={})
        return {
            "timestamp_iso": ts_iso,
            "currency": currency,
            "param_rows": {
                "black": ensure_param_columns(black_row, PARAM_COLS_BLACK),
                "heston": ensure_param_columns(heston_row, PARAM_COLS_HESTON),
                "svcj": ensure_param_columns(svcj_row, PARAM_COLS_SVCJ),
            },
            "train_df": empty_tt,
            "test_df": empty_tt,
            "warm_next": warm_next,
        }

    # Optional runtime restriction
    df_filt = restrict_for_runtime(
        df_filt,
        top_expiries=runtime_top_expiries_by_oi,
        max_options=runtime_max_options,
        random_state=random_seed,
    )

    # Precompute per-expiry FFTParams using *all* filtered options in this snapshot.
    def _fft_params_for_expiry(strikes: np.ndarray) -> FFTParams:
        N = fft_base.N
        eta = fft_base.eta
        lam = 2.0 * np.pi / (N * eta)
        logK_center = float(np.log(np.median(strikes)))
        b = logK_center - 0.5 * N * lam
        return FFTParams(N=N, alpha=fft_base.alpha, eta=eta, b=b, use_simpson=fft_base.use_simpson)

    fft_params_by_expiry: dict = {}
    for exp, g in df_filt.groupby("expiry_datetime", sort=False):
        K_all = g["strike"].to_numpy(dtype=float)
        fft_params_by_expiry[exp] = _fft_params_for_expiry(K_all)

    # Deterministic split (shuffled)
    df_filt = df_filt.sample(frac=1.0, random_state=random_seed).reset_index(drop=True)
    n_train = int(np.floor(train_frac * len(df_filt)))
    train = df_filt.iloc[:n_train].copy()
    test = df_filt.iloc[n_train:].copy()

    # Warm-start initializations
    init_black = warm_start.get("black", None)

    def _init_heston_from_black(sigma: float, prev: Optional[dict[str, float]]) -> dict[str, float]:
        sigma2 = float(sigma) * float(sigma)
        init = dict(prev) if prev else {}
        init.setdefault("kappa", 2.0)
        init.setdefault("theta", 0.5 * sigma2)
        init.setdefault("sigma_v", 0.75 * sigma)
        init.setdefault("rho", -0.5)
        init.setdefault("v0", 0.5 * sigma2)
        return init

    def _init_svcj_from_black(sigma: float, prev: Optional[dict[str, float]]) -> dict[str, float]:
        sigma2 = float(sigma) * float(sigma)
        init = dict(prev) if prev else {}
        init.setdefault("kappa", 2.0)
        init.setdefault("theta", 0.5 * sigma2)
        init.setdefault("sigma_v", 0.75 * sigma)
        init.setdefault("rho", -0.5)
        init.setdefault("v0", 0.5 * sigma2)
        # Jump defaults (mild)
        init.setdefault("lam", 0.10)
        init.setdefault("ell_y", -0.05)
        init.setdefault("sigma_y", 0.15)
        init.setdefault("ell_v", 0.10)
        init.setdefault("rho_j", -0.3)
        return init

    models = ["black", "heston", "svcj"]
    results: dict[str, Any] = {}
    sigma_seed: Optional[float] = None

    for model in models:
        if verbose:
            print(f"[{currency}] {ts_iso} | {model}: calibrating on n_train={len(train)}")

        if model == "black":
            init_params = init_black
        elif model == "heston":
            if sigma_seed is None:
                if init_black and "sigma" in init_black:
                    sigma_seed = float(init_black["sigma"])
                else:
                    sigma_seed = 0.6
            init_params = _init_heston_from_black(sigma_seed, warm_start.get("heston", None))
        else:
            if sigma_seed is None:
                if init_black and "sigma" in init_black:
                    sigma_seed = float(init_black["sigma"])
                else:
                    sigma_seed = 0.6
            init_params = _init_svcj_from_black(sigma_seed, warm_start.get("svcj", None))

        try:
            res = calibrate_model(
                train,
                model,
                weight_config=weight_config,
                fft_params_base=fft_base,
                dynamic_b=False,
                fft_params_by_expiry=fft_params_by_expiry,
                use_cache_in_optimization=False,
                initial_params=init_params,
                max_nfev=int(max_nfev[model]),
                verbose=1 if verbose else 0,
                clear_cache_before=False,
            )
        except Exception as e:
            results[model] = {"success": False, "message": f"Exception: {repr(e)}", "nfev": 0, "params": {}}
            continue

        results[model] = res
        if getattr(res, "success", False):
            warm_next[model] = dict(res.params)
            if model == "black":
                try:
                    sigma_seed = float(res.params.get("sigma", np.nan))
                except Exception:
                    sigma_seed = sigma_seed

    # Reprice full snapshot once per model (then split train/test for output & errors)
    df_out = df_filt.copy()
    df_out["snapshot_ts"] = ts_iso
    df_out["currency"] = currency
    df_out["random_seed"] = int(random_seed)

    price_cols = {"black": "price_black", "heston": "price_heston", "svcj": "price_svcj"}
    for model in models:
        col = price_cols[model]
        if model in results and hasattr(results[model], "params") and getattr(results[model], "success", False):
            try:
                p = price_dataframe(
                    df_out,
                    model,
                    dict(results[model].params),
                    fft_params_base=fft_base,
                    dynamic_b=False,
                    fft_params_by_expiry=fft_params_by_expiry,
                    use_cache=True,
                )
                df_out[col] = p
            except Exception as e:
                df_out[col] = np.nan
                if verbose:
                    print(f"[{currency}] {ts_iso} | {model}: pricing exception: {repr(e)}")
        else:
            df_out[col] = np.nan

    train_out = df_out.iloc[:n_train].copy()
    test_out = df_out.iloc[n_train:].copy()

    # Errors (unweighted)
    y_train = train_out["mid_price_clean"].to_numpy(dtype=float) if len(train_out) else np.array([])
    y_test = test_out["mid_price_clean"].to_numpy(dtype=float) if len(test_out) else np.array([])

    errs: dict[str, dict[str, float]] = {}
    for model in models:
        col = price_cols[model]
        p_train = train_out[col].to_numpy(dtype=float) if len(train_out) else np.array([])
        p_test = test_out[col].to_numpy(dtype=float) if len(test_out) else np.array([])
        e_tr = compute_errors(y_train, p_train) if len(train_out) else {"mse": np.nan, "mae": np.nan}
        e_te = compute_errors(y_test, p_test) if len(test_out) else {"mse": np.nan, "mae": np.nan}
        errs[model] = dict(
            rmse_train=float(math.sqrt(e_tr["mse"])) if np.isfinite(e_tr["mse"]) else float("nan"),
            mae_train=float(e_tr["mae"]),
            rmse_test=float(math.sqrt(e_te["mse"])) if np.isfinite(e_te["mse"]) else float("nan"),
            mae_test=float(e_te["mae"]),
        )

    n_total = int(len(df_out))
    n_test = int(len(test_out))

    # Parameter rows
    if isinstance(results.get("black"), dict):
        msg = results["black"].get("message", "failed")
        black_row = _make_param_row("black", success=False, message=msg, nfev=results["black"].get("nfev", 0),
                                    rmse_fit=np.nan, mae_fit=np.nan,
                                    rmse_train=errs["black"]["rmse_train"], mae_train=errs["black"]["mae_train"],
                                    rmse_test=errs["black"]["rmse_test"], mae_test=errs["black"]["mae_test"],
                                    n_total=n_total, n_train=n_train, n_test=n_test,
                                    params={"sigma": np.nan})
    else:
        resb = results.get("black", None)
        if resb is None:
            black_row = _make_param_row("black", success=False, message="failed", nfev=0,
                                        rmse_fit=np.nan, mae_fit=np.nan,
                                        rmse_train=errs["black"]["rmse_train"], mae_train=errs["black"]["mae_train"],
                                        rmse_test=errs["black"]["rmse_test"], mae_test=errs["black"]["mae_test"],
                                        n_total=n_total, n_train=n_train, n_test=n_test,
                                        params={"sigma": np.nan})
        else:
            black_row = _make_param_row("black", success=bool(resb.success), message=resb.message, nfev=resb.nfev,
                                        rmse_fit=resb.rmse, mae_fit=resb.mae,
                                        rmse_train=errs["black"]["rmse_train"], mae_train=errs["black"]["mae_train"],
                                        rmse_test=errs["black"]["rmse_test"], mae_test=errs["black"]["mae_test"],
                                        n_total=n_total, n_train=n_train, n_test=n_test,
                                        params=dict(resb.params))

    resh = results.get("heston", None)
    if isinstance(resh, dict):
        heston_row = _make_param_row("heston", success=False, message=resh.get("message", "failed"), nfev=resh.get("nfev", 0),
                                     rmse_fit=np.nan, mae_fit=np.nan,
                                     rmse_train=errs["heston"]["rmse_train"], mae_train=errs["heston"]["mae_train"],
                                     rmse_test=errs["heston"]["rmse_test"], mae_test=errs["heston"]["mae_test"],
                                     n_total=n_total, n_train=n_train, n_test=n_test, params={})
    elif resh is None:
        heston_row = _make_param_row("heston", success=False, message="failed", nfev=0,
                                     rmse_fit=np.nan, mae_fit=np.nan,
                                     rmse_train=errs["heston"]["rmse_train"], mae_train=errs["heston"]["mae_train"],
                                     rmse_test=errs["heston"]["rmse_test"], mae_test=errs["heston"]["mae_test"],
                                     n_total=n_total, n_train=n_train, n_test=n_test, params={})
    else:
        heston_row = _make_param_row("heston", success=bool(resh.success), message=resh.message, nfev=resh.nfev,
                                     rmse_fit=resh.rmse, mae_fit=resh.mae,
                                     rmse_train=errs["heston"]["rmse_train"], mae_train=errs["heston"]["mae_train"],
                                     rmse_test=errs["heston"]["rmse_test"], mae_test=errs["heston"]["mae_test"],
                                     n_total=n_total, n_train=n_train, n_test=n_test, params=dict(resh.params))

    ress = results.get("svcj", None)
    if isinstance(ress, dict):
        svcj_row = _make_param_row("svcj", success=False, message=ress.get("message", "failed"), nfev=ress.get("nfev", 0),
                                   rmse_fit=np.nan, mae_fit=np.nan,
                                   rmse_train=errs["svcj"]["rmse_train"], mae_train=errs["svcj"]["mae_train"],
                                   rmse_test=errs["svcj"]["rmse_test"], mae_test=errs["svcj"]["mae_test"],
                                   n_total=n_total, n_train=n_train, n_test=n_test, params={})
    elif ress is None:
        svcj_row = _make_param_row("svcj", success=False, message="failed", nfev=0,
                                   rmse_fit=np.nan, mae_fit=np.nan,
                                   rmse_train=errs["svcj"]["rmse_train"], mae_train=errs["svcj"]["mae_train"],
                                   rmse_test=errs["svcj"]["rmse_test"], mae_test=errs["svcj"]["mae_test"],
                                   n_total=n_total, n_train=n_train, n_test=n_test, params={})
    else:
        svcj_row = _make_param_row("svcj", success=bool(ress.success), message=ress.message, nfev=ress.nfev,
                                   rmse_fit=ress.rmse, mae_fit=ress.mae,
                                   rmse_train=errs["svcj"]["rmse_train"], mae_train=errs["svcj"]["mae_train"],
                                   rmse_test=errs["svcj"]["rmse_test"], mae_test=errs["svcj"]["mae_test"],
                                   n_total=n_total, n_train=n_train, n_test=n_test, params=dict(ress.params))

    black_row = ensure_param_columns(black_row, PARAM_COLS_BLACK)
    heston_row = ensure_param_columns(heston_row, PARAM_COLS_HESTON)
    svcj_row = ensure_param_columns(svcj_row, PARAM_COLS_SVCJ)

    train_out["snapshot_ts"] = normalize_timestamp_series(train_out["snapshot_ts"])
    test_out["snapshot_ts"] = normalize_timestamp_series(test_out["snapshot_ts"])

    return {
        "timestamp_iso": ts_iso,
        "currency": currency,
        "param_rows": {"black": black_row, "heston": heston_row, "svcj": svcj_row},
        "train_df": train_out,
        "test_df": test_out,
        "warm_next": warm_next,
    }


## Runner: loop all snapshots for one currency (resume + threading + ordered commits)

In [None]:
def run_all_snapshots_to_excel_for_currency(
    currency: str,
    *,
    output_xlsx: Path,
    resume: bool,
    verbose: bool = False,
) -> dict[str, pd.DataFrame]:
    """
    Runs calibration for one currency and persists to Excel every SAVE_EVERY_N_FILES commits.

    Returns the in-memory workbook dict (DataFrames) after completion.
    """
    data_dir = ROOT / "data"
    all_files = list_snapshot_files(data_dir)
    if not all_files:
        raise RuntimeError(f"No snapshot files found in {data_dir}")

    all_files_sorted = sorted(all_files, key=timestamp_from_filename)
    file_index_map = {p: i for i, p in enumerate(all_files_sorted)}
    currency_index = CURRENCIES.index(currency) if currency in CURRENCIES else 0

    wb = load_existing_workbook(output_xlsx)
    last_ts = get_latest_processed_timestamp(wb, currency) if resume else None

    pending = []
    for f in all_files_sorted:
        ts = timestamp_from_filename(f)
        if last_ts is None or ts > last_ts:
            pending.append(f)

    if SMOKE_TEST_MAX_FILES_PER_CURRENCY is not None:
        pending = pending[: int(SMOKE_TEST_MAX_FILES_PER_CURRENCY)]

    if not pending:
        print(f"[{currency}] Nothing to do. (resume={resume}, last_ts={last_ts})")
        return wb

    print(f"[{currency}] Pending files: {len(pending)} (resume={resume}, last_ts={last_ts})")
    print(f"[{currency}] Output: {output_xlsx}")

    chunks = chunk_contiguous(pending, N_THREADS)

    warm_init_by_chunk: list[dict[str, dict[str, float]]] = []
    for ch in chunks:
        if not ch:
            warm_init_by_chunk.append({})
            continue
        _, f0 = ch[0]
        ts0 = timestamp_from_filename(f0)

        black_hist = latest_successful_params_before(
            wb.get(PARAM_SHEET_BLACK, pd.DataFrame()),
            currency=currency,
            ts0=ts0,
            required_cols=["sigma"],
        )
        heston_hist = latest_successful_params_before(
            wb.get(PARAM_SHEET_HESTON, pd.DataFrame()),
            currency=currency,
            ts0=ts0,
            required_cols=["kappa", "theta", "sigma_v", "rho", "v0"],
        )
        svcj_hist = latest_successful_params_before(
            wb.get(PARAM_SHEET_SVCJ, pd.DataFrame()),
            currency=currency,
            ts0=ts0,
            required_cols=["kappa", "theta", "sigma_v", "rho", "v0", "lam", "ell_y", "sigma_y", "ell_v", "rho_j"],
        )
        warm_init: dict[str, dict[str, float]] = {}
        if black_hist:
            warm_init["black"] = black_hist
        if heston_hist:
            warm_init["heston"] = heston_hist
        if svcj_hist:
            warm_init["svcj"] = svcj_hist
        warm_init_by_chunk.append(warm_init)

    q: queue.Queue = queue.Queue()
    n_workers = len(chunks)

    def _worker(chunk_id: int, chunk: list[tuple[int, Path]], warm0: dict[str, dict[str, float]]) -> None:
        warm = {k: dict(v) for k, v in (warm0 or {}).items()}
        cm = threadpool_limits(limits=INTERNAL_NUM_THREADS) if (LIMIT_INTERNAL_THREADS and threadpool_limits is not None) else contextlib.nullcontext()
        with cm:
            for idx, path in chunk:
                seed = int(GLOBAL_RANDOM_SEED + 10_000 * currency_index + file_index_map[path])
                try:
                    payload = process_snapshot_to_excel_payload(
                        path,
                        currency=currency,
                        filter_rules=FILTER_RULES,
                        weight_config=WEIGHT_CONFIG,
                        fft_base=FFT_BASE,
                        max_nfev=MAX_NFEV,
                        train_frac=TRAIN_FRAC,
                        random_seed=seed,
                        runtime_top_expiries_by_oi=RUNTIME_TOP_EXPIRIES_BY_OI,
                        runtime_max_options=RUNTIME_MAX_OPTIONS,
                        min_options_after_filter=MIN_OPTIONS_AFTER_FILTER,
                        warm_start=warm,
                        verbose=verbose,
                    )
                    warm = payload.get("warm_next", warm)
                except Exception as e:
                    # Catastrophic failure; still advance resume key (Option A)
                    ts_iso = timestamp_to_iso_z(timestamp_from_filename(path))
                    msg = f"Worker exception: {repr(e)}"
                    black_row = pd.DataFrame([{
                        "timestamp": ts_iso,
                        "currency": currency,
                        "success": False,
                        "message": msg,
                        "nfev": 0,
                        "rmse_fit": np.nan, "mae_fit": np.nan,
                        "rmse_train": np.nan, "mae_train": np.nan,
                        "rmse_test": np.nan, "mae_test": np.nan,
                        "n_options_total": 0, "n_train": 0, "n_test": 0,
                        "random_seed": seed,
                        "sigma": np.nan,
                    }])
                    payload = {
                        "timestamp_iso": ts_iso,
                        "currency": currency,
                        "param_rows": {
                            "black": ensure_param_columns(black_row, PARAM_COLS_BLACK),
                            "heston": ensure_param_columns(pd.DataFrame([{
                                "timestamp": ts_iso, "currency": currency, "success": False, "message": msg, "nfev": 0,
                                "rmse_fit": np.nan, "mae_fit": np.nan, "rmse_train": np.nan, "mae_train": np.nan,
                                "rmse_test": np.nan, "mae_test": np.nan, "n_options_total": 0, "n_train": 0, "n_test": 0,
                                "random_seed": seed
                            }]), PARAM_COLS_HESTON),
                            "svcj": ensure_param_columns(pd.DataFrame([{
                                "timestamp": ts_iso, "currency": currency, "success": False, "message": msg, "nfev": 0,
                                "rmse_fit": np.nan, "mae_fit": np.nan, "rmse_train": np.nan, "mae_train": np.nan,
                                "rmse_test": np.nan, "mae_test": np.nan, "n_options_total": 0, "n_train": 0, "n_test": 0,
                                "random_seed": seed
                            }]), PARAM_COLS_SVCJ),
                        },
                        "train_df": pd.DataFrame(),
                        "test_df": pd.DataFrame(),
                        "warm_next": warm,
                    }

                q.put((idx, payload))

        q.put(("DONE", chunk_id))

    threads = []
    for chunk_id, (chunk, warm0) in enumerate(zip(chunks, warm_init_by_chunk)):
        t = threading.Thread(target=_worker, args=(chunk_id, chunk, warm0), daemon=True)
        t.start()
        threads.append(t)

    pending_results: dict[int, dict[str, Any]] = {}
    next_commit = 0
    committed_since_flush = 0
    done_count = 0
    total_pending = len(pending)

    def _commit_payload(payload: dict[str, Any]) -> None:
        nonlocal wb
        append_df(wb, PARAM_SHEET_BLACK, payload["param_rows"]["black"])
        append_df(wb, PARAM_SHEET_HESTON, payload["param_rows"]["heston"])
        append_df(wb, PARAM_SHEET_SVCJ, payload["param_rows"]["svcj"])
        append_df(wb, TRAIN_SHEET, payload.get("train_df", pd.DataFrame()))
        append_df(wb, TEST_SHEET, payload.get("test_df", pd.DataFrame()))

    while done_count < n_workers:
        msg = q.get()
        if isinstance(msg[0], str) and msg[0] == "DONE":
            done_count += 1
            continue

        idx, payload = msg
        pending_results[int(idx)] = payload

        while next_commit in pending_results:
            payload2 = pending_results.pop(next_commit)
            _commit_payload(payload2)
            committed_since_flush += 1
            next_commit += 1

            last_ts_iso = payload2.get("timestamp_iso", "?")
            print(f"[{currency}] committed {next_commit}/{total_pending} (last={last_ts_iso})")

            if committed_since_flush >= SAVE_EVERY_N_FILES:
                flush_workbook_atomic(wb, output_xlsx)
                committed_since_flush = 0
                print(f"[{currency}] flushed → {output_xlsx}")

    for t in threads:
        t.join(timeout=0.1)

    flush_workbook_atomic(wb, output_xlsx)
    print(f"[{currency}] final flush → {output_xlsx}")

    return wb


## Run everything: BTC then ETH

In [None]:
# Run calibrations and persist to Excel
wb_final = None
for ccy in CURRENCIES:
    wb_final = run_all_snapshots_to_excel_for_currency(
        ccy,
        output_xlsx=OUTPUT_XLSX,
        resume=RESUME,
        verbose=False,
    )

print("Done. Workbook at:", OUTPUT_XLSX)
