In [None]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import math
import numpy as np
import pandas as pd


# =========================
# Configuration
# =========================
# Input:
CSV_PATH = "spy_all.csv"               # or use concat_daily_csvs(...) below

# Target windows and lags per window (must match between V and Psi):
PARAMS_TARGET = [
    {"window": 60,  "N_lags": 6},     # windows in number of bars (post alignment)
    {"window": 120, "N_lags": 6},
    {"window": 150, "N_lags": 4},
]

# Truncation / pattern:
TRUNCATION_C = 5.0                     # robust truncation multiplier (MAD-based)
USE_PATTERN = True                     # multiplicative intraday U-shape removal

# GMM grid (open interval to avoid edges):
H_MIN, H_MAX, H_MESH = 0.05, 0.45, 1e-3

# Market RTH (for alignment):
RTH_OPEN = (9, 30)                     # 09:30
RTH_CLOSE = (16, 0)                    # 16:00
EXCLUDE_DATES: set[pd.Timestamp] = set()  # add dates to exclude if desired

# Monte Carlo (optional):
DO_MC = False                          # set True to run MC study
N_MC = 200
SEED = 123
FINE_DT_SECONDS = 0.5                  # fine grid for MC sim
SAMPLE_DT_SECONDS = 5                  # post-subsample grid for MC
TRADING_DAYS_PER_YEAR = 252


# =========================
# Utilities: I/O & alignment
# =========================
def concat_daily_csvs(data_dir: Path, out_path: Path) -> Path:
    """Concatenate spy_*.csv* files with columns DT, Price."""
    files = sorted([p for p in data_dir.glob("spy_*.csv*") if p.is_file()])
    if not files:
        raise RuntimeError(f"No files like 'spy_*.csv*' found in {data_dir}")
    parts = []
    for p in files:
        if p.suffix.lower() == ".icloud":
            print(f"⏭️  Skipping placeholder: {p.name}")
            continue
        try:
            df = pd.read_csv(p, usecols=["DT", "Price"])
            df["DT"] = pd.to_datetime(df["DT"], errors="coerce")
            df["Price"] = pd.to_numeric(df["Price"], errors="coerce")
            df = df.dropna(subset=["DT", "Price"])
            if not df.empty:
                parts.append(df)
        except Exception as e:
            print(f"Skipping {p.name}: {e}")
    if not parts:
        raise RuntimeError("No readable CSVs after filtering placeholders/errors.")
    df_all = pd.concat(parts, ignore_index=True)
    df_all = df_all.sort_values("DT").drop_duplicates(subset="DT", keep="last").reset_index(drop=True)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    df_all.to_csv(out_path, index=False)
    print(f"[concat] Saved merged file with {len(df_all):,} rows → {out_path}")
    return out_path


def _align_one_df(df_in: pd.DataFrame, dt_sec: int, verbose: bool) -> Optional[pd.DataFrame]:
    """Align naive-ET dataframe to a strict RTH grid at dt_sec, with forward/backward fill."""
    df = df_in.dropna(subset=["DT", "Price"]).copy()
    df = df.sort_values("DT")
    df["date"] = df["DT"].dt.normalize()

    n_kept, n_skipped = 0, 0
    out = []
    for d, g in df.groupby("date", sort=True):
        day = pd.Timestamp(d)
        if day in EXCLUDE_DATES:
            continue
        start = day.replace(hour=RTH_OPEN[0], minute=RTH_OPEN[1], second=0)
        end = day.replace(hour=RTH_CLOSE[0], minute=RTH_CLOSE[1], second=0)

        gi = g.set_index("DT").sort_index()
        gi_buf = gi.loc[(gi.index >= start - pd.Timedelta("2min")) & (gi.index <= end)]
        if gi_buf.empty:
            n_skipped += 1
            continue

        grid = pd.date_range(start, end, freq=f"{dt_sec}S")
        gi2 = gi_buf.reindex(grid)
        gi2["Price"] = gi2["Price"].ffill().bfill()

        # keep only if at least one real RTH obs
        if gi.loc[(gi.index >= start) & (gi.index <= end)].empty:
            n_skipped += 1
            continue

        gi2 = gi2.rename_axis("DT").reset_index()
        gi2["date"] = day
        out.append(gi2[["DT", "date", "Price"]])
        n_kept += 1

    if verbose:
        print(f"[align] dt={dt_sec}s → kept days: {n_kept} | skipped: {n_skipped}")
    if not out:
        return None
    return pd.concat(out, ignore_index=True)


def load_and_align(path_csv: Path, dt_sec: int, verbose: bool = True) -> Optional[pd.DataFrame]:
    raw = pd.read_csv(path_csv, parse_dates=["DT"]).dropna(subset=["DT", "Price"])

    aligned = _align_one_df(raw, dt_sec, verbose)
    if aligned is not None and not aligned.empty:
        return aligned

    # fallback: treat DT as UTC→New York
    raw2 = raw.copy()
    dt_parsed = pd.to_datetime(raw2["DT"], utc=True, errors="coerce")
    raw2["DT"] = dt_parsed.dt.tz_convert("America/New_York").dt.tz_localize(None)
    aligned2 = _align_one_df(raw2, dt_sec, verbose)
    if aligned2 is not None and not aligned2.empty:
        print("[align] Interpreted timestamps as UTC and converted to New York.")
        return aligned2
    return None


def choose_dt_and_align(path_csv: Path, verbose: bool = True) -> Tuple[pd.DataFrame, int]:
    """Try 5s; if not feasible, try 60s."""
    for cand in (5, 60):
        aligned = load_and_align(path_csv, cand, verbose=verbose)
        if aligned is not None and not aligned.empty:
            if verbose:
                print(f"[align] Using dt_sec={cand}.")
            return aligned, cand
    raise RuntimeError("No usable RTH days found at 5s or 60s.")


def to_daily_returns(aligned: pd.DataFrame) -> List[np.ndarray]:
    """Extract same-length daily log-returns arrays; keep the modal length."""
    rets = []
    for _, g in aligned.groupby("date", sort=True):
        p = g["Price"].to_numpy()
        r = np.diff(np.log(p))
        rets.append(r)
    counts = pd.Series([len(x) for x in rets])
    mode_len = int(counts.mode().iloc[0])
    rets = [r for r in rets if len(r) == mode_len]
    if len(rets) == 0:
        raise RuntimeError("All days had inconsistent lengths after alignment.")
    return rets


# =========================
# Volatility preprocessing
# =========================
def robust_truncate_returns(r_day: np.ndarray, c: float) -> np.ndarray:
    """MAD-based clipping of returns to remove jumps/outliers (per day)."""
    if r_day.size == 0:
        return r_day
    med = np.median(r_day)
    mad = np.median(np.abs(r_day - med)) + 1e-16
    sigma = mad / 0.67448975  # MAD -> std under normality
    thresh = c * sigma + 1e-16
    return np.clip(r_day, -thresh, thresh)


def compute_intraday_pattern_sq(rets_days: List[np.ndarray]) -> np.ndarray:
    """
    Multiplicative U-shape pattern on per-bar variance proxy r^2.
    Returns a length-T vector with mean 1.0 across bars.
    """
    T = len(rets_days[0])
    S = np.zeros(T, dtype=float)
    cnt = 0
    for r in rets_days:
        if len(r) != T:
            continue
        S += r * r
        cnt += 1
    if cnt == 0:
        raise RuntimeError("No days to compute pattern.")
    pattern = S / max(cnt, 1)
    pattern = np.maximum(pattern, 1e-16)
    return pattern / pattern.mean()


def realized_variance_series_from_s2(s2: np.ndarray, K: int) -> np.ndarray:
    """Rolling sum over window K of per-bar variance proxy s2 (already truncated & pattern-normalized)."""
    kernel = np.ones(K, dtype=float)
    rv = np.convolve(s2, kernel, mode="valid")  # length: len(s2)-K+1
    return rv


def volatility_increments(rv: np.ndarray, K: int) -> np.ndarray:
    """
    Volatility increments at window K:
    inc_t = RV[t+K] - RV[t], producing a series of length len(rv)-K.
    """
    if len(rv) < 2 * K:
        return np.array([], dtype=float)
    return rv[K:] - rv[:-K]


def sample_autocov(x: np.ndarray, L: int) -> Dict[int, float]:
    """
    γ_ℓ = 1/(n-ℓ) * sum_{t=0}^{n-ℓ-1} (x_t - μ)(x_{t+ℓ} - μ).
    """
    n = len(x)
    if n == 0:
        return {ell: np.nan for ell in range(L + 1)}
    mu = float(np.mean(x))
    xc = x - mu
    out: Dict[int, float] = {}
    for ell in range(L + 1):
        if ell >= n:
            out[ell] = np.nan
            continue
        x1 = xc[: n - ell]
        x2 = xc[ell:]
        denom = max(n - ell, 1)
        out[ell] = float(np.dot(x1, x2) / denom)
    return out


def build_moment_vector_V(
    days_returns: List[np.ndarray],
    windows_and_lags: List[Dict[str, int]],
    trunc_c: float,
    use_pattern: bool,
) -> Tuple[np.ndarray, Dict[str, Dict[int, float]]]:
    """
    Build the stacked moment vector V across all requested windows,
    using per-day truncated returns, pattern removal, volatility increments,
    and FIRST-LAG correction. Also returns per-window autocovariances for inspection.
    """
    # 1) Truncate per day
    days_trunc = [robust_truncate_returns(r, trunc_c) for r in days_returns]

    # 2) Pattern (U-shape) on per-bar variance proxy
    if use_pattern:
        pattern = compute_intraday_pattern_sq(days_trunc)
    else:
        T = len(days_trunc[0])
        pattern = np.ones(T, dtype=float)

    # 3) Build per-window increments concatenated across days, then autocovs
    V_blocks: List[float] = []
    ac_by_window: Dict[str, Dict[int, float]] = {}

    for spec in windows_and_lags:
        K = int(spec["window"])
        L = int(spec["N_lags"])

        # concatenate increments across days
        inc_all = []
        for r in days_trunc:
            if len(r) != len(pattern):
                continue
            s2 = (r * r) / (pattern + 1e-16)  # multiplicative normalization to mean ~1
            rv = realized_variance_series_from_s2(s2, K)
            inc = volatility_increments(rv, K)
            if inc.size > 0:
                inc_all.append(inc)

        if not inc_all:
            raise RuntimeError(f"No increments for window K={K}. Check data length or K.")
        y = np.concatenate(inc_all, axis=0)

        # autocovariances at lags 0..L
        gamma = sample_autocov(y, L)

        # FIRST-LAG correction + stacking: [γ0+2γ1, γ2, γ3, ..., γL]
        if L < 2:
            raise ValueError(f"N_lags must be at least 2 for first-lag correction (got {L}).")
        first = gamma[0] + 2.0 * gamma[1]
        block = [first] + [gamma[ell] for ell in range(2, L + 1)]

        V_blocks.extend(block)
        ac_by_window[f"K={K}"] = gamma

    V = np.array(V_blocks, dtype=float)
    return V, ac_by_window


# =========================
# Model side: Ψ(H)
# =========================
def phi_H_l(ell: int, H: float) -> float:
    """
    Five-point finite-difference operator Φ_H(ℓ) constructed from powers |·|^{2H}.
    This symmetric discrete stencil captures the shape the estimator expects.
    """
    # Define f(a) = |a|^{2H} with f(0)=0
    def f(a: int) -> float:
        if a == 0:
            return 0.0
        return abs(float(a)) ** (2.0 * H)

    # 5-point symmetric combination (discrete 4th difference; scale not needed for GMM)
    # Φ_H(ℓ) = f(ℓ+2) - 4 f(ℓ+1) + 6 f(ℓ) - 4 f(ℓ-1) + f(ℓ-2)
    return f(ell + 2) - 4.0 * f(ell + 1) + 6.0 * f(ell) - 4.0 * f(ell - 1) + f(ell - 2)


def build_Psi_vector(H: float, windows_and_lags: List[Dict[str, int]]) -> np.ndarray:
    """
    Build Ψ(H) as the concatenation across windows of:
      [ Φ_H(0) + Φ_H(1), Φ_H(2), ..., Φ_H(L) ] * window^(2H)
    """
    blocks: List[float] = []
    for spec in windows_and_lags:
        K = int(spec["window"])
        L = int(spec["N_lags"])
        if L < 2:
            raise ValueError("Each window must have N_lags >= 2.")
        first = phi_H_l(0, H) + phi_H_l(1, H)
        rest = [phi_H_l(ell, H) for ell in range(2, L + 1)]
        block = [first] + rest
        scale = (float(K) ** (2.0 * H))
        blocks.extend([scale * b for b in block])
    return np.array(blocks, dtype=float)


# =========================
# Estimation: GMM over H, closed-form R(H)
# =========================
@dataclass
class GMMResult:
    H: float
    R: float
    obj: float
    H_grid: np.ndarray
    obj_grid: np.ndarray


def gmm_objective_with_closed_form_R(V: np.ndarray, Psi: np.ndarray, W: Optional[np.ndarray]) -> Tuple[float, float]:
    """
    Given V and Psi(H), and weight matrix W (default I), return:
      R*(H) = argmin_R (V - R Psi)' W (V - R Psi) = (Psi' W V)/(Psi' W Psi)
      F(H)  = (V - R* Psi)' W (V - R* Psi)
    """
    n = len(V)
    if W is None:
        W = np.eye(n)
    # Avoid singularities:
    WPsi = W @ Psi
    num = float(Psi @ WPsi)  # Psi' W Psi
    if num <= 1e-30:
        return math.nan, math.inf
    R = float(Psi @ (W @ V)) / num
    resid = V - R * Psi
    obj = float(resid @ (W @ resid))
    return R, obj


def estimate_H_and_R_by_GMM(
    V: np.ndarray,
    windows_and_lags: List[Dict[str, int]],
    H_min: float = H_MIN,
    H_max: float = H_MAX,
    mesh: float = H_MESH,
    W: Optional[np.ndarray] = None,
) -> GMMResult:
    H_grid = np.arange(H_min, H_max + 1e-12, mesh, dtype=float)
    objs = np.empty_like(H_grid)
    Rs = np.empty_like(H_grid)

    for i, H in enumerate(H_grid):
        Psi = build_Psi_vector(H, windows_and_lags)
        R, obj = gmm_objective_with_closed_form_R(V, Psi, W)
        objs[i] = obj
        Rs[i] = R

    idx = int(np.nanargmin(objs))
    H_hat = float(H_grid[idx])
    R_hat = float(Rs[idx])
    obj_star = float(objs[idx])
    return GMMResult(H=H_hat, R=R_hat, obj=obj_star, H_grid=H_grid, obj_grid=objs)


# =========================
# Heston calibration & MC (optional)
# =========================
@dataclass
class HestonParams:
    kappa: float
    theta: float
    xi: float
    rho: float
    v0: float
    s0: float


def calibrate_heston_from_returns(all_rets: List[np.ndarray], dt_years: float, s0: float) -> HestonParams:
    """Simple moment-matching; same spirit as user's version."""
    r = np.concatenate(all_rets)
    v = (r ** 2) / max(dt_years, 1e-16)

    v_t, v_tp = v[:-1], v[1:]
    X = np.vstack([np.ones_like(v_t), v_t]).T
    beta, *_ = np.linalg.lstsq(X, v_tp, rcond=None)
    c, phi = float(beta[0]), float(beta[1])
    phi = min(max(phi, -0.99), 0.999999)

    kappa = max((1.0 - phi) / dt_years, 1e-8)
    theta = max(c / (1.0 - phi), 1e-12)

    e = v_tp - (c + phi * v_t)
    denom = np.sum(v_t * dt_years)
    xi2 = np.sum(e ** 2) / max(denom, 1e-16)
    xi = float(np.sqrt(max(xi2, 1e-16)))

    vt_dt = np.maximum(v_t * dt_years, 1e-16)
    eta = (r[1:] + 0.5 * v_t * dt_years) / np.sqrt(vt_dt)
    zeta = e / (xi * np.sqrt(vt_dt))
    rho = float(np.corrcoef(eta, zeta)[0, 1])
    rho = float(np.clip(rho, -0.999, 0.999))

    return HestonParams(kappa=kappa, theta=theta, xi=xi, rho=rho, v0=float(theta), s0=float(s0))


def heston_qe_step(s, v, dt, p: HestonParams, z1, z2):
    """Quadratic-Exponential step for variance; log-Euler for price."""
    z_v = z1
    z_s = p.rho * z1 + np.sqrt(max(1.0 - p.rho ** 2, 0.0)) * z2

    m = p.theta + (v - p.theta) * np.exp(-p.kappa * dt)
    s2 = (v * p.xi ** 2 * np.exp(-p.kappa * dt) / p.kappa) * (1 - np.exp(-p.kappa * dt)) \
         + (p.theta * p.xi ** 2 / (2 * p.kappa)) * (1 - np.exp(-p.kappa * dt)) ** 2
    psi = s2 / (m ** 2 + 1e-16)

    if psi <= 1.5:
        b2 = 2 / psi - 1 + np.sqrt(max(2 / psi, 0.0)) * np.sqrt(max(2 / psi - 1, 0.0))
        a = m / (1 + b2)
        v_next = a * (np.sqrt(max(b2, 0.0)) * z_v + 1) ** 2
    else:
        p_qe = (psi - 1) / (psi + 1)
        beta = (1 - p_qe) / max(m, 1e-16)
        U = 0.5 * (1 + math.erf(z_v / np.sqrt(2)))
        v_next = 0.0 if U <= p_qe else -np.log((1 - p_qe) / (1 - U + 1e-16)) / max(beta, 1e-16)

    v_bar = max(0.5 * (v + v_next), 0.0)
    s_next = s * np.exp(-0.5 * v_bar * dt + np.sqrt(max(v_bar, 0.0) * dt) * z_s)
    return s_next, max(v_next, 0.0)


def simulate_heston_path_daily_independent(
    p: HestonParams,
    steps_fine: int,
    sub_sample: int,
    rng: np.random.Generator,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Simulate ONE day independently:
    - Start from unconditional mean for v0 (theta) and provided s0.
    - Simulate on a fine grid (steps_fine), then subsample every 'sub_sample' steps.
    Returns subsampled price and variance paths.
    """
    s = p.s0
    v = p.theta
    Z = rng.standard_normal(size=(2, steps_fine))
    s_path = np.empty(steps_fine + 1, dtype=float)
    v_path = np.empty(steps_fine + 1, dtype=float)
    s_path[0], v_path[0] = s, v
    dt = 1.0 / (TRADING_DAYS_PER_YEAR * (6.5 * 3600.0 / FINE_DT_SECONDS))

    for i in range(steps_fine):
        s, v = heston_qe_step(s, v, dt, p, Z[0, i], Z[1, i])
        s_path[i + 1], v_path[i + 1] = s, v

    # Subsample
    idx = np.arange(0, steps_fine + 1, sub_sample, dtype=int)
    return s_path[idx], v_path[idx]


def mc_generate_returns(
    params: HestonParams,
    n_days: int,
    dt_sec_target: int,
    rng: np.random.Generator,
) -> List[np.ndarray]:
    """
    Generate daily return arrays at target dt by:
    - fine grid of FINE_DT_SECONDS
    - subsample ratio = dt_sec_target / FINE_DT_SECONDS
    - independent days reset to unconditional mean
    """
    steps_fine_per_day = int((6.5 * 3600) // FINE_DT_SECONDS)
    sub = int(round(dt_sec_target / FINE_DT_SECONDS))
    if sub < 1:
        sub = 1

    out = []
    p_today = HestonParams(**params.__dict__)
    for _ in range(n_days):
        s_path, _ = simulate_heston_path_daily_independent(p_today, steps_fine_per_day, sub, rng)
        r = np.diff(np.log(s_path))
        out.append(r)
        # reset s0 to previous close (optional); v0 always resets to theta by design
        p_today.s0 = float(s_path[-1])
    return out


# =========================
# Driver
# =========================
def main():
    # 0) Load & align
    aligned, dt_sec = choose_dt_and_align(Path(CSV_PATH), verbose=True)
    day_rets = to_daily_returns(aligned)
    days_used = len(day_rets)
    steps_per_day = len(day_rets[0])

    print(f"\n[info] Days used: {days_used} | Bars/day: {steps_per_day} | dt={dt_sec}s")

    # 1) Build empirical moments V
    V_emp, ac_emp = build_moment_vector_V(
        day_rets,
        windows_and_lags=PARAMS_TARGET,
        trunc_c=TRUNCATION_C,
        use_pattern=USE_PATTERN,
    )
    print(f"[moments] V length = {len(V_emp)}")

    # 2) Estimate H & R by GMM
    res_emp = estimate_H_and_R_by_GMM(V_emp, PARAMS_TARGET, H_MIN, H_MAX, H_MESH, W=None)
    print("\n========== EMPIRICAL (GMM) ==========")
    print(f"H_hat = {res_emp.H:.3f} | R_hat = {res_emp.R:.4g} | obj* = {res_emp.obj:.4g}")
    # Simple boundary sentry:
    if abs(res_emp.H - H_MIN) < 1e-6 or abs(res_emp.H - H_MAX) < 1e-6:
        print("⚠️  H is at a boundary. Re-check V/Psi stacking, pattern, truncation and windows.")

    # 3) Calibrate Heston from observed returns (for optional MC)
    first_day = aligned["date"].min()
    s0 = float(aligned.loc[aligned["date"] == first_day, "Price"].iloc[0])
    dt_years = dt_sec / (TRADING_DAYS_PER_YEAR * 6.5 * 3600.0)
    heston_params = calibrate_heston_from_returns(day_rets, dt_years, s0)

    print("\n===== HESTON CALIBRATION (from data) =====")
    for k, v in heston_params.__dict__.items():
        if isinstance(v, float):
            print(f"  {k:6s}: {v:.6g}")
        else:
            print(f"  {k:6s}: {v}")

    # 4) Optional Monte Carlo study
    if DO_MC:
        rng = np.random.default_rng(SEED)
        H_list = []
        for _ in range(N_MC):
            mc_rets = mc_generate_returns(heston_params, n_days=days_used, dt_sec_target=dt_sec, rng=rng)
            V_mc, _ = build_moment_vector_V(
                mc_rets,
                windows_and_lags=PARAMS_TARGET,
                trunc_c=TRUNCATION_C,
                use_pattern=USE_PATTERN,
            )
            res_mc = estimate_H_and_R_by_GMM(V_mc, PARAMS_TARGET, H_MIN, H_MAX, H_MESH, W=None)
            H_list.append(res_mc.H)

        H_arr = np.array(H_list, float)
        print("\n========== MONTE CARLO ==========")
        print(f"Runs: {N_MC} | H_mean={np.nanmean(H_arr):.3f} | H_sd={np.nanstd(H_arr, ddof=1):.3f} "
              f"| [p5, p95]=[{np.nanpercentile(H_arr,5):.3f}, {np.nanpercentile(H_arr,95):.3f}]")

    # 5) Quick sanity prints for diagnostics
    print("\n[diagnostics] Per-window γℓ previews (empirical increments):")
    for k, gam in ac_emp.items():
        preview = ", ".join([f"γ{ell}={gam[ell]:.4g}" for ell in range(0, min(5, len(gam)))])
        print(f"  {k:>6s}: {preview}")

    # Check sign/shape vs Psi at a few H values
    for Hprobe in (0.2, 0.3, 0.4):
        Psi_probe = build_Psi_vector(Hprobe, PARAMS_TARGET)
        corr = np.corrcoef(V_emp, Psi_probe)[0, 1]
        print(f"[diagnostics] corr(V, Psi({Hprobe:.2f})) = {corr:.4f}")


if __name__ == "__main__":
    main()
