In [None]:
# === Cell 1: imports & dataclasses ===
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import Optional, Tuple, Callable

Array = np.ndarray

@dataclass
class AMCPaths:
    S: Array                 # (P, M+1)
    Z: Optional[Array]       # (P, M)
    t: Array                 # (M+1,)    t[0]=0, t[-1]=T
    disc: float              # exp(-r*dt)
    dt: float                # T/M
    r: float                 # risk-free rate
    q: float                 # dividend yield
    sigma: float             # volatility
    S0: float                # initial price


In [None]:
# === Cell 2: antithetic normals & GBM paths ===
def make_Z_antithetic(P: int, M: int, seed: Optional[int] = None, dtype=np.float64) -> Array:
    rng = np.random.default_rng(seed)
    half = (P + 1) // 2
    Zh = rng.standard_normal((half, M), dtype=dtype)
    Z  = np.concatenate([Zh, -Zh], axis=0)[:P, :]
    return Z

def simulate_gbm_paths(
    S0: float, r: float, q: float, sigma: float, T: float, M: int, P: int,
    *, seed: Optional[int] = None, antithetic: bool = True, return_Z: bool = True,
    dtype=np.float64, Z: Optional[Array] = None,
) -> AMCPaths:
    if M <= 0 or P <= 0:
        raise ValueError("M and P must be positive.")
    if sigma < 0: raise ValueError("sigma must be nonnegative.")
    if T < 0: raise ValueError("T must be nonnegative.")

    dt = T / M
    t  = np.linspace(0.0, T, M + 1, dtype=dtype)
    mu_step  = (r - q - 0.5 * sigma * sigma) * dt
    vol_step = sigma * np.sqrt(dt)

    if Z is not None:
        Z_used = np.asarray(Z, dtype=dtype)
        if Z_used.shape != (P, M):
            raise ValueError(f"Z must have shape (P,M)=({P},{M})")
    else:
        if antithetic:
            Z_used = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
        else:
            rng = np.random.default_rng(seed)
            Z_used = rng.standard_normal((P, M), dtype=dtype)

    log_inc = mu_step + vol_step * Z_used
    np.cumsum(log_inc, axis=1, out=log_inc)
    S = np.empty((P, M + 1), dtype=dtype)
    S[:, 0]  = S0
    np.exp(log_inc, out=log_inc)
    S[:, 1:] = S0 * log_inc

    disc = float(np.exp(-r * dt))
    return AMCPaths(S=S, Z=Z_used if return_Z else None, t=t, disc=disc, dt=dt,
                    r=r, q=q, sigma=sigma, S0=S0)


In [None]:
# === Cell 3: Local Vol ===
@dataclass
class LocalVolSurface:
    t_grid: np.ndarray        # (Nt,)
    s_grid: np.ndarray        # (Ns,)
    sigma_table: np.ndarray   # (Nt, Ns)
    floor: float = 0.02
    cap: float   = 3.00

    def sigma(self, s: float, t: float) -> float:
        t = float(np.clip(t, self.t_grid[0], self.t_grid[-1]))
        s = float(np.clip(s, self.s_grid[0], self.s_grid[-1]))
        i = np.searchsorted(self.t_grid, t)
        j = np.searchsorted(self.s_grid, s)
        i0 = max(0, min(i-1, len(self.t_grid)-1)); i1 = max(0, min(i, len(self.t_grid)-1))
        j0 = max(0, min(j-1, len(self.s_grid)-1)); j1 = max(0, min(j, len(self.s_grid)-1))
        wt = 0.0 if i1==i0 else (t - self.t_grid[i0])/(self.t_grid[i1]-self.t_grid[i0])
        ws = 0.0 if j1==j0 else (s - self.s_grid[j0])/(self.s_grid[j1]-self.s_grid[j0])
        sig0 = (1-ws)*self.sigma_table[i0, j0] + ws*self.sigma_table[i0, j1]
        sig1 = (1-ws)*self.sigma_table[i1, j0] + ws*self.sigma_table[i1, j1]
        sig  = (1-wt)*sig0 + wt*sig1
        return float(np.clip(sig, self.floor, self.cap))

def simulate_localvol_paths(
    S0: float, r: float, q: float, T: float, M: int, P: int,
    lv: LocalVolSurface, *, seed: int | None = None, Z: np.ndarray | None = None,
) -> AMCPaths:
    dtype = np.float64
    if M <= 0 or P <= 0: raise ValueError("M and P must be positive.")
    dt = T / M
    t  = np.linspace(0.0, T, M + 1, dtype=dtype)
    sqrt_dt = np.sqrt(dt, dtype=dtype)

    if Z is not None:
        Z_used = np.asarray(Z, dtype=dtype)
        if Z_used.shape != (P, M):
            raise ValueError(f"Z must have shape (P,M)=({P},{M})")
    else:
        Z_used = make_Z_antithetic(P, M, seed=seed, dtype=dtype)

    S = np.empty((P, M+1), dtype=dtype); S[:, 0] = S0
    for i in range(M):
        ti   = t[i]
        sig0 = np.array([lv.sigma(s, ti) for s in S[:, i]], dtype=dtype)
        mu0  = (r - q) - 0.5*sig0**2
        S_star = S[:, i] * np.exp(mu0*dt + sig0*sqrt_dt*Z_used[:, i])
        sig1 = np.array([lv.sigma(s, ti+dt) for s in S_star], dtype=dtype)
        sig  = 0.5*(sig0 + sig1)
        mu   = (r - q) - 0.5*sig**2
        S[:, i+1] = S[:, i] * np.exp(mu*dt + sig*sqrt_dt*Z_used[:, i])

    disc = float(np.exp(-r * dt))
    return AMCPaths(S=S, Z=Z_used, t=t, disc=disc, dt=dt, r=r, q=q, sigma=float("nan"), S0=S0)


In [None]:
# === Cell 4: Heston & SLV ===
def _heston_qe_step(v: np.ndarray, kappa: float, theta: float, sigma_v: float, dt: float, Z: np.ndarray) -> np.ndarray:
    m  = theta + (v - theta)*np.exp(-kappa*dt)
    s2 = (v*sigma_v**2*np.exp(-kappa*dt)/kappa)*(1-np.exp(-kappa*dt)) \
       + theta*sigma_v**2*(1-np.exp(-kappa*dt))**2/(2*kappa)
    psi = s2/np.maximum(m, 1e-16)**2
    v_next = np.empty_like(v); psi_c = 1.5
    i1 = psi <= psi_c
    if np.any(i1):
        b2 = 2/psi[i1] - 1 + np.sqrt(2/psi[i1]) * np.sqrt(2/psi[i1]-1)
        a  = m[i1]/(1+b2)
        v_next[i1] = a*(np.sqrt(b2)+Z[i1])**2
    i2 = ~i1
    if np.any(i2):
        p  = (psi[i2]-1)/(psi[i2]+1); beta = (1-p)/m[i2]
        U  = 0.5*(Z[i2]+1.0)
        v_next[i2] = np.where(U<=p, 0.0, -np.log((1-p)/(1-U))/beta)
    return np.maximum(v_next, 0.0)

def simulate_heston_paths(
    S0: float, r: float, q: float, T: float, M: int, P: int,
    *, kappa: float, theta: float, sigma_v: float, rho: float, v0: float,
    seed: int | None = None, Zs: np.ndarray | None = None, Zv: np.ndarray | None = None
) -> AMCPaths:
    dtype = np.float64
    if M <= 0 or P <= 0: raise ValueError("M and P must be positive.")
    dt = T / M
    t  = np.linspace(0.0, T, M + 1, dtype=dtype)
    sqrt_dt = np.sqrt(dt, dtype=dtype)

    if (Zs is None) or (Zv is None):
        Z1 = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
        Z2 = make_Z_antithetic(P, M, seed=None, dtype=dtype)
        Zv_used = Z2
        Zs_used = rho*Z2 + np.sqrt(max(1.0 - rho*rho, 0.0))*Z1
    else:
        Zs_used = np.asarray(Zs, dtype=dtype); Zv_used = np.asarray(Zv, dtype=dtype)
        if Zs_used.shape!=(P,M) or Zv_used.shape!=(P,M):
            raise ValueError("Zs/Zv must be shape (P,M)")

    S = np.empty((P, M+1), dtype=dtype); S[:,0] = S0
    v = np.empty((P, M+1), dtype=dtype); v[:,0] = max(v0, 1e-12)

    for i in range(M):
        v[:,i+1] = _heston_qe_step(v[:,i], kappa, theta, sigma_v, dt, Zv_used[:,i])
        diff = np.sqrt(np.maximum(v[:,i], 0.0))
        mu   = (r - q) - 0.5*diff**2
        S[:,i+1] = S[:,i] * np.exp(mu*dt + diff*sqrt_dt*Zs_used[:,i])

    disc = float(np.exp(-r * dt))
    return AMCPaths(S=S, Z=Zs_used, t=t, disc=disc, dt=dt, r=r, q=q, sigma=float("nan"), S0=S0)

def simulate_slv_paths(
    S0: float, r: float, q: float, T: float, M: int, P: int,
    *, lv: LocalVolSurface, kappa: float, theta: float, sigma_v: float, rho: float, v0: float,
    seed: int | None = None, Zs: np.ndarray | None = None, Zv: np.ndarray | None = None
) -> AMCPaths:
    dtype = np.float64
    if M <= 0 or P <= 0: raise ValueError("M and P must be positive.")
    dt = T / M
    t  = np.linspace(0.0, T, M + 1, dtype=dtype)
    sqrt_dt = np.sqrt(dt, dtype=dtype)

    if (Zs is None) or (Zv is None):
        Z1 = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
        Z2 = make_Z_antithetic(P, M, seed=None, dtype=dtype)
        Zv_used = Z2
        Zs_used = rho*Z2 + np.sqrt(max(1.0 - rho*rho, 0.0))*Z1
    else:
        Zs_used = np.asarray(Zs, dtype=dtype); Zv_used = np.asarray(Zv, dtype=dtype)
        if Zs_used.shape!=(P,M) or Zv_used.shape!=(P,M):
            raise ValueError("Zs/Zv must be shape (P,M)")

    S = np.empty((P, M+1), dtype=dtype); S[:,0] = S0
    v = np.empty((P, M+1), dtype=dtype); v[:,0] = max(v0, 1e-12)

    for i in range(M):
        v[:,i+1] = _heston_qe_step(v[:,i], kappa, theta, sigma_v, dt, Zv_used[:,i])
        sig_lv = np.array([lv.sigma(s, t[i]) for s in S[:, i]], dtype=dtype)
        diff   = sig_lv * np.sqrt(np.maximum(v[:,i], 0.0))
        mu     = (r - q) - 0.5*diff**2
        S[:,i+1] = S[:,i] * np.exp(mu*dt + diff*sqrt_dt*Zs_used[:,i])

    disc = float(np.exp(-r * dt))
    return AMCPaths(S=S, Z=Zs_used, t=t, disc=disc, dt=dt, r=r, q=q, sigma=float("nan"), S0=S0)


In [None]:
# === Cell 5: antithetic dispatchers ===
def prepare_antithetic_paths_for_american_gbm(
    S0: float, r: float, q: float, sigma: float, T: float, M: int, P: int,
    *, seed: Optional[int] = None, dtype=np.float64,
) -> Tuple[AMCPaths, AMCPaths]:
    Z = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
    pack_pos = simulate_gbm_paths(S0, r, q, sigma, T, M, P, seed=None,
                                  antithetic=False, return_Z=True, dtype=dtype, Z=Z)
    pack_neg = simulate_gbm_paths(S0, r, q, sigma, T, M, P, seed=None,
                                  antithetic=False, return_Z=True, dtype=dtype, Z=-Z)
    return pack_pos, pack_neg

def prepare_antithetic_paths_for_american_lv(
    S0: float, r: float, q: float, T: float, M: int, P: int, lv: LocalVolSurface,
    *, seed: Optional[int] = None, dtype=np.float64,
) -> Tuple[AMCPaths, AMCPaths]:
    Z = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
    pack_pos = simulate_localvol_paths(S0, r, q, T, M, P, lv, seed=None, Z=Z)
    pack_neg = simulate_localvol_paths(S0, r, q, T, M, P, lv, seed=None, Z=-Z)
    return pack_pos, pack_neg

def prepare_antithetic_paths_for_american_heston(
    S0: float, r: float, q: float, T: float, M: int, P: int,
    *, kappa: float, theta: float, sigma_v: float, rho: float, v0: float,
    seed: Optional[int] = None, dtype=np.float64,
) -> Tuple[AMCPaths, AMCPaths]:
    Z1 = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
    Z2 = make_Z_antithetic(P, M, seed=None, dtype=dtype)
    Zs = rho * Z2 + np.sqrt(max(1.0 - rho*rho, 0.0)) * Z1
    Zv = Z2
    pack_pos = simulate_heston_paths(S0, r, q, T, M, P,
                                     kappa=kappa, theta=theta, sigma_v=sigma_v,
                                     rho=rho, v0=v0, Zs=Zs, Zv=Zv)
    pack_neg = simulate_heston_paths(S0, r, q, T, M, P,
                                     kappa=kappa, theta=theta, sigma_v=sigma_v,
                                     rho=rho, v0=v0, Zs=-Zs, Zv=-Zv)
    return pack_pos, pack_neg

def prepare_antithetic_paths_for_american_slv(
    S0: float, r: float, q: float, T: float, M: int, P: int,
    *, lv: LocalVolSurface, kappa: float, theta: float, sigma_v: float, rho: float, v0: float,
    seed: Optional[int] = None, dtype=np.float64,
) -> Tuple[AMCPaths, AMCPaths]:
    Z1 = make_Z_antithetic(P, M, seed=seed, dtype=dtype)
    Z2 = make_Z_antithetic(P, M, seed=None, dtype=dtype)
    Zs = rho * Z2 + np.sqrt(max(1.0 - rho*rho, 0.0)) * Z1
    Zv = Z2
    pack_pos = simulate_slv_paths(S0, r, q, T, M, P,
                                  lv=lv, kappa=kappa, theta=theta, sigma_v=sigma_v,
                                  rho=rho, v0=v0, Zs=Zs, Zv=Zv)
    pack_neg = simulate_slv_paths(S0, r, q, T, M, P,
                                  lv=lv, kappa=kappa, theta=theta, sigma_v=sigma_v,
                                  rho=rho, v0=v0, Zs=-Zs, Zv=-Zv)
    return pack_pos, pack_neg

def prepare_antithetic_paths_for_american(
    mode: str,
    S0: float, r: float, q: float, T: float, M: int, P: int,
    *, sigma: float = None, lv: LocalVolSurface = None,
    kappa: float = None, theta: float = None, sigma_v: float = None, rho: float = None, v0: float = None,
    seed: Optional[int] = None, dtype=np.float64,
) -> Tuple[AMCPaths, AMCPaths]:
    if mode == "GBM":
        return prepare_antithetic_paths_for_american_gbm(S0, r, q, sigma, T, M, P, seed=seed, dtype=dtype)
    elif mode == "LV":
        return prepare_antithetic_paths_for_american_lv(S0, r, q, T, M, P, lv, seed=seed, dtype=dtype)
    elif mode == "Heston":
        return prepare_antithetic_paths_for_american_heston(S0, r, q, T, M, P,
                                                            kappa=kappa, theta=theta, sigma_v=sigma_v,
                                                            rho=rho, v0=v0, seed=seed, dtype=dtype)
    elif mode == "SLV":
        return prepare_antithetic_paths_for_american_slv(S0, r, q, T, M, P,
                                                         lv=lv, kappa=kappa, theta=theta, sigma_v=sigma_v,
                                                         rho=rho, v0=v0, seed=seed, dtype=dtype)
    else:
        raise ValueError(f"Unknown model {mode}")


In [None]:
# === Cell 6: LSM ===
# ===== Smooth utilities (shared by forward LSM & AAD) =====
TAU = 0.10

def _softplus_np(x):
    # oftplus(x) = log1p(exp(x))
    return np.where(x > 20, x, np.log1p(np.exp(x)))

def _sigmoid_np(x):
    # σ(x) = 1/(1+e^{-x})
    return 1.0 / (1.0 + np.exp(-x))

def smooth_max_np(a, b, tau=TAU):
    # max(a,b) ≈ b + tau * softplus((a-b)/tau)
    return b + tau * _softplus_np((a - b) / tau)

def _ols_beta(X: Array, y: Array) -> Array:
    beta, *_ = np.linalg.lstsq(X, y, rcond=None)
    return beta

def basis_poly2(x: Array) -> Array:
    x = np.asarray(x).reshape(-1, 1)
    return np.column_stack([np.ones_like(x), x, x**2])

def basis_poly3(x: Array) -> Array:
    x = np.asarray(x).reshape(-1, 1)
    return np.column_stack([np.ones_like(x), x, x**2, x**3])

def payoff_call_now(S_slice: Array, K: float, tau: float = TAU) -> Array:
    # max(S-K,0)tau * softplus((S-K)/tau)
    return tau * _softplus_np((S_slice - K) / tau)

def payoff_put_now(S_slice: Array, K: float, tau: float = TAU) -> Array:
    return tau * _softplus_np((K - S_slice) / tau)


@dataclass
class LSMResult:
    price: float
    stderr: float
    C_path: Array
    exercised: Array
    tau_idx: Array
    r2: Array
    itm_counts: Array
    betas: list
    near_boundary_mask: Optional[Array] = None

def _lsm_single_pack(
    pack: AMCPaths, K: float,
    payoff_now: Callable[[Array, float], Array],
    basis_fn: Callable[[Array], Array] = basis_poly3,
) -> LSMResult:
    S, dt, disc = pack.S, pack.dt, pack.disc
    P, M1 = S.shape
    M = M1 - 1

    # continuation value container at each path (starting from maturity)
    C = payoff_now(S[:, M], K).astype(S.dtype, copy=False)

    r2_list, itm_list = [], []
    betas_list = []
    betas_by_m = [None] * (M + 1)
    tau_idx = np.zeros(P, dtype=np.int32)
    exercised = np.zeros(P, dtype=bool)

    # Backward induction (M-1 ... 1)
    for m in range(M - 1, 0, -1):
        Sm = S[:, m]
        E  = payoff_now(Sm, K)
        itm = (E > 0.0)
        itm_list.append(int(itm.sum()))

        if np.any(itm):
            X = basis_fn(Sm[itm])
            y = disc * C[itm]
            beta = _ols_beta(X, y)
            betas_list.append(beta)
            betas_by_m[m] = beta

            y_hat = X @ beta
            ss_res = np.sum((y - y_hat)**2)
            ss_tot = np.sum((y - y.mean())**2) if y.size > 1 else np.nan
            r2 = 1.0 - ss_res / ss_tot if np.isfinite(ss_tot) and ss_tot > 0 else np.nan
            r2_list.append(r2)

            C_hat = np.zeros(P, dtype=S.dtype)
            C_hat[itm] = (basis_fn(Sm[itm]) @ beta)
        else:
            # keep shapes consistent even when no ITM
            empty_beta = np.full(basis_fn(np.array([0.0])).shape[1], np.nan)
            betas_list.append(empty_beta)
            betas_by_m[m] = None
            r2_list.append(np.nan)
            C_hat = np.zeros(P, dtype=S.dtype)
                    # ----- Smooth decision: blend between exercise and continue -----
        # C_new = w*E + (1-w)*(disc*C_old)
        w = _sigmoid_np((E - C_hat) / TAU)        # w in (0,1)
        C = w * E + (1.0 - w) * (disc * C)

        exer_hard = (E > 0.0) & (E >= C_hat)
        new_ex = exer_hard & (~exercised)
        tau_idx[new_ex] = m
        exercised[new_ex] = True


    # Pricing stats at t0
    C0 = C
    price = C0.mean()
    stderr = C0.std(ddof=1) / np.sqrt(P)

    r2_arr  = np.array(r2_list[::-1])
    itm_arr = np.array(itm_list[::-1])
    betas_list.reverse()  # now chronological 1..M-1

    # === after backward loop, before return ===
    #Build near-boundary mask using |payoff - continuation_hat| at the path's τ
    near_boundary_mask = np.zeros(P, dtype=bool)
    boundary_gap = np.full(P, np.inf, dtype=S.dtype)

    for p in range(P):
        m = int(tau_idx[p]) if exercised[p] else 0
        if m <= 0:
            continue
        Sm = S[p, m]
        E  = payoff_now(Sm, K)
        beta_m = betas_by_m[m]
        if (beta_m is not None) and (E > 0.0):
            x_row = basis_fn(np.array([Sm]))           # shape (1, k)
            C_hat_pm = float(x_row @ beta_m)
            boundary_gap[p] = abs(float(E - C_hat_pm))

    tau0 = max(0.02 * K, 3.0 * TAU)                                     #threshold for "near boundary"
    near_boundary_mask = (boundary_gap < tau0)

    return LSMResult(
        price=price,
        stderr=stderr,
        C_path=C0,
        exercised=exercised,
        tau_idx=tau_idx,
        r2=r2_arr,
        itm_counts=itm_arr,
        betas=betas_list,
        near_boundary_mask=near_boundary_mask,
    )


def lsm_price(
    pack: AMCPaths, K: float,
    payoff_now: Callable[[Array, float], Array],
    basis_fn: Callable[[Array], Array] = basis_poly3,
    *, twin_pack: Optional[AMCPaths] = None,
) -> LSMResult:
    res_pos = _lsm_single_pack(pack, K, payoff_now, basis_fn)
    if twin_pack is None:
        return res_pos

    res_neg = _lsm_single_pack(twin_pack, K, payoff_now, basis_fn)

    C_avg = 0.5 * (res_pos.C_path + res_neg.C_path)
    P = C_avg.shape[0]
    price = C_avg.mean()
    stderr = C_avg.std(ddof=1) / np.sqrt(P)

    r2 = np.nanmean(np.vstack([res_pos.r2, res_neg.r2]), axis=0)
    itm_counts = 0.5 * (res_pos.itm_counts + res_neg.itm_counts)

    if (res_pos.near_boundary_mask is not None) and (res_neg.near_boundary_mask is not None):
        nb_mask = (res_pos.near_boundary_mask | res_neg.near_boundary_mask)
    else:
        nb_mask = res_pos.near_boundary_mask if res_pos.near_boundary_mask is not None else None

    return LSMResult(
        price=price, stderr=stderr, C_path=C_avg,
        exercised=(res_pos.exercised | res_neg.exercised),
        tau_idx=np.minimum(res_pos.tau_idx, res_neg.tau_idx),
        r2=r2, itm_counts=itm_counts, betas=[res_pos.betas, res_neg.betas],
        near_boundary_mask=nb_mask,
    )


In [None]:
# === Cell 7: quick self-test (small P for notebook) ===
S0, r, q, sigma, T = 100.0, 0.03, 0.00, 0.2, 1.0
K = 100.0
M, P, seed = 50, 50_000, 7
amc = simulate_gbm_paths(S0, r, q, sigma, T, M, P, seed=seed, antithetic=True, return_Z=True)
lhs = np.exp(-r * T) * amc.S[:, -1].mean()
rhs = S0 * np.exp(-q * T)
print(f"Martingale check: {lhs:.6f} vs {rhs:.6f}, rel.err={abs(lhs-rhs)/rhs:.3e}")

pack_pos, pack_neg = prepare_antithetic_paths_for_american("GBM", S0, r, q, T, M, P, sigma=sigma, seed=seed)
diff_normals = np.max(np.abs(pack_pos.Z + pack_neg.Z))
print(f"Antithetic Z consistency (≈0): {diff_normals:.3e}")

res = lsm_price(amc, K, payoff_put_now, basis_poly3)
print(f"[Single pack] American Put LSM price={res.price:.6f}, SE={res.stderr:.6f}")

res2 = lsm_price(pack_pos, K, payoff_put_now, basis_poly3, twin_pack=pack_neg)
print(f"[Antithetic avg] American Put LSM price={res2.price:.6f}, SE={res2.stderr:.6f}")


Martingale check: 100.035679 vs 100.000000, rel.err=3.568e-04
Antithetic Z consistency (≈0): 0.000e+00


  C_hat_pm = float(x_row @ beta_m)


[Single pack] American Put LSM price=6.539392, SE=0.031400
[Antithetic avg] American Put LSM price=6.539392, SE=0.010165


In [None]:
# === Cell 8: FD Greeks with CRN ===
def simulate_gbm_paths_from_Z(
    S0: float, r: float, q: float, sigma: float, T: float, *, Z: np.ndarray,
    out_log: bool = False, dtype=np.float64,
) -> AMCPaths:
    P, M = Z.shape
    dt = T / M
    mu_step  = (r - q - 0.5 * sigma * sigma) * dt
    vol_step = sigma * np.sqrt(dt)
    log_inc = mu_step + vol_step * Z
    np.cumsum(log_inc, axis=1, out=log_inc)
    S = np.empty((P, M + 1), dtype=dtype)
    S[:, 0] = S0
    np.exp(log_inc, out=log_inc)
    S[:, 1:] = S0 * log_inc
    disc = float(np.exp(-r * dt))
    t = np.linspace(0.0, T, M + 1, dtype=dtype)
    return AMCPaths(S=S, Z=Z, t=t, disc=disc, dt=dt, r=r, q=q, sigma=sigma, S0=S0)

def _scale_pack_S0(pack: AMCPaths, S0_new: float) -> AMCPaths:
    S = np.empty_like(pack.S)
    S[:, 0] = S0_new
    S[:, 1:] = S0_new * (pack.S[:, 1:] / pack.S[:, 0][:, None])
    return AMCPaths(S=S, Z=pack.Z, t=pack.t, disc=pack.disc, dt=pack.dt,
                    r=pack.r, q=pack.q, sigma=pack.sigma, S0=S0_new)

def fd_greeks(
    S0: float, K: float, r: float, q: float, sigma: float, T: float,
    M: int, P: int, payoff_now, basis_fn,
    *, seed: int = 42, h_S: float = 1e-2, h_sig: float = 1e-4
):
    base_pack = simulate_gbm_paths(S0, r, q, sigma, T, M, P, seed=seed, antithetic=False, return_Z=True)
    base_res  = lsm_price(base_pack, K, payoff_now, basis_fn)

    pack_Sp = _scale_pack_S0(base_pack, S0 + h_S)
    pack_Sm = _scale_pack_S0(base_pack, S0 - h_S)
    p_plus  = lsm_price(pack_Sp, K, payoff_now, basis_fn).price
    p_minus = lsm_price(pack_Sm, K, payoff_now, basis_fn).price
    delta   = (p_plus - p_minus) / (2 * h_S)
    gamma   = (p_plus - 2 * base_res.price + p_minus) / (h_S ** 2)

    pack_vp = simulate_gbm_paths_from_Z(S0, r, q, sigma + h_sig, T, Z=base_pack.Z)
    pack_vm = simulate_gbm_paths_from_Z(S0, r, q, sigma - h_sig, T, Z=base_pack.Z)
    p_vp    = lsm_price(pack_vp, K, payoff_now, basis_fn).price
    p_vm    = lsm_price(pack_vm, K, payoff_now, basis_fn).price
    vega    = (p_vp - p_vm) / (2 * h_sig)

    return dict(delta=delta, gamma=gamma, vega=vega), base_res, base_pack


In [None]:
# === Cell 9: AAD Frozen Greeks ===
def aad_frozen_greeks(pack: AMCPaths, lsm_res: "LSMResult", K: float, payoff_now) -> dict:
    S = pack.S; Z = pack.Z
    P, M1 = S.shape
    M = M1 - 1
    dt = pack.dt

    Wcum = np.cumsum(Z, axis=1) * np.sqrt(dt)  # (P, M)

    S0_vec = S[:, 0][:, None]
    dS_dS0 = S[:, 1:] / S0_vec

    m_idx = np.arange(1, M + 1)[None, :]
    t_m = m_idx * dt
    dlogS_dsig = -pack.sigma * t_m + Wcum
    dS_dsig = S[:, 1:] * dlogS_dsig
        # call: σ((S-K)/τ); put: -σ((K-S)/τ)）
    if payoff_now is payoff_call_now:
        dPhi_dS = _sigmoid_np((S[:, 1:] - K) / TAU).astype(S.dtype)
    else:
        dPhi_dS = -_sigmoid_np((K - S[:, 1:]) / TAU).astype(S.dtype)


    tau = lsm_res.tau_idx.copy()
    tau[tau == 0] = M
    rows = np.arange(P)

    disc_pow = (pack.disc ** tau)

    dphi_m  = dPhi_dS[rows, tau - 1]
    dS0_m   = dS_dS0[rows, tau - 1]
    dsig_m  = dS_dsig[rows, tau - 1]

    dPV_dS0  = disc_pow * dphi_m * dS0_m
    dPV_dsig = disc_pow * dphi_m * dsig_m

    return dict(delta=dPV_dS0.mean(), vega=dPV_dsig.mean())


In [None]:
# === Colab: mount drive ===
from google.colab import drive
drive.mount('/content/drive')

import os, sys, types, importlib.util, importlib
ROOT = "/content/drive/MyDrive/aad_edge_pushing/aad"
CORE_DIR = os.path.join(ROOT, "core")
OPS_DIR  = os.path.join(ROOT, "ops")

for p in [ROOT, CORE_DIR, OPS_DIR]:
    print(p, "->", "OK" if os.path.isdir(p) else "MISSING")
    if not os.path.isdir(p):
        raise FileNotFoundError(f"Not a directory: {p}")

for name in ["aad", "aad.core", "aad.ops"]:
    if name in sys.modules:
        del sys.modules[name]

aad = types.ModuleType("aad")
aad.__path__ = []
sys.modules["aad"] = aad

aad_core = types.ModuleType("aad.core")
aad_core.__path__ = []
sys.modules["aad.core"] = aad_core

aad_ops = types.ModuleType("aad.ops")
aad_ops.__path__ = []
sys.modules["aad.ops"] = aad_ops

def _load(fullname: str, path: str):
    if not os.path.isfile(path):
        raise FileNotFoundError(f'{fullname} -> file not found: {path}')
    spec = importlib.util.spec_from_file_location(fullname, path)
    if spec is None or spec.loader is None:
        raise ImportError(f'Cannot load spec for {fullname} from {path}')
    mod = importlib.util.module_from_spec(spec)
    sys.modules[fullname] = mod
    spec.loader.exec_module(mod)
    return mod
core_files_in_order = [
    ("aad.core.node",   os.path.join(CORE_DIR, "node.py")),
    ("aad.core.tape",   os.path.join(CORE_DIR, "tape.py")),
    ("aad.core.var",    os.path.join(CORE_DIR, "var.py")),
    ("aad.core.engine", os.path.join(CORE_DIR, "engine.py")),
    ("aad.core.seeds",  os.path.join(CORE_DIR, "seeds.py")),
    ("aad.core.taylor_backprop", os.path.join(CORE_DIR, "taylor_backprop.py")),
]
for fullname, path in core_files_in_order:
    _load(fullname, path)

ops_files = [
    ("aad.ops.arithmetic",     os.path.join(OPS_DIR, "arithmetic.py")),
    ("aad.ops.transcendental", os.path.join(OPS_DIR, "transcendental.py")),
    ("aad.ops.special",        os.path.join(OPS_DIR, "special.py")),
]
for fullname, path in ops_files:
    _load(fullname, path)

for sub in ("arithmetic", "transcendental", "special"):
    m = importlib.import_module(f"aad.ops.{sub}")
    for k, v in m.__dict__.items():
        if not k.startswith("_"):
            setattr(aad_ops, k, v)

print("✅ Loaded aad from aad_edge_pushing/aad")

try:
    from aad.core.seeds import grads
    from aad.core.engine import edge_push_hessian, hvp_for
    from aad.core.taylor_backprop import taylor_backpropagate
    from aad.ops import exp, log
    f = lambda d: exp(d["x"]) + log(d["x"])
    g = grads(f, {"x": 2.0})
    print("smoke grads:", {k: float(v) for k, v in g.items()})
except Exception as e:
    print("❌ smoke failed:", e)
    raise


Mounted at /content/drive
/content/drive/MyDrive/aad_edge_pushing/aad -> OK
/content/drive/MyDrive/aad_edge_pushing/aad/core -> OK
/content/drive/MyDrive/aad_edge_pushing/aad/ops -> OK
✅ Loaded aad from aad_edge_pushing/aad
smoke grads: {'x': 7.88905609893065}


In [None]:
# === Cell A: build frozen context from your LSM pricer (fixed) ===
import numpy as np

# ---- set problem ----
S0, r, q, sigma, T = 100.0, 0.03, 0.00, 0.20, 1.0
K = 100.0
M, P, seed = 50, 50_000, 42
is_put = True  # choose call/put

# ---- generate base paths & run LSM to get first exercise time tau ----
base_pack = simulate_gbm_paths(
    S0, r, q, sigma, T, M, P,
    seed=seed, antithetic=False, return_Z=True
)
res_base  = lsm_price(
    base_pack, K,
    payoff_put_now if is_put else payoff_call_now,
    basis_poly3
)

# ---- freeze Z, Wcum, tau ----
Z      = np.asarray(base_pack.Z)                  # (P, M)
dt     = float(base_pack.dt)
t_grid = np.asarray(base_pack.t)                  # (M+1,)

tau = np.asarray(res_base.tau_idx, dtype=np.int32).copy()
tau[tau == 0] = M                                 # maturity → step M (no early exercise)

# cumulative Brownian (per path)
Wcum = np.cumsum(Z, axis=1, dtype=np.float64) * np.sqrt(dt)   # (P, M)

# optional near-boundary mask from LSM (may not exist)
nbm = getattr(res_base, "near_boundary_mask", None)
if nbm is not None:
    nbm = np.asarray(nbm, dtype=bool)
    if nbm.shape[0] != P:
        # shape guard
        nbm = None

# ---- store frozen context (plus base params for numeric baselines) ----
frozen_ctx = dict(
    Z=Z, Wcum=Wcum, tau=tau, dt=dt, t_grid=t_grid, P=int(P), M=int(M),
    payoff=("put" if is_put else "call"),
    near_boundary_mask=nbm,
    # stash base scalars for FD / off-tape math
    S0_base=float(S0), sigma_base=float(sigma), r_base=float(r), K_base=float(K),
)

# ---- diagnostics ----
early_ratio = float((tau < M).mean())            # truly early exercised ratio
nb_count    = (int(nbm.sum()) if nbm is not None else "N/A")

print(
    "Frozen context ready:",
    f"P={P}, M={M}, payoff={frozen_ctx['payoff']}, "
    f"early_exercise_ratio={early_ratio:.3f}, near-boundary={nb_count}"
)


  C_hat_pm = float(x_row @ beta_m)


Frozen context ready: P=50000, M=50, payoff=put, early_exercise_ratio=0.851, near-boundary=38900


In [None]:
# === Cell B: Frozen-LSM scalar price with AAD (grads + EP Hessian, smoothed) ===
import numpy as np, pandas as pd
from aad.ops import exp as aexp, log as alog
from aad.core.engine import edge_push_hessian
from aad.core.seeds import grads

# ---------------- Shared smoothing (align with forward LSM) ----------------
TAU = 0.10

def _softplus_np(x):
    return np.where(x > 20, x, np.log1p(np.exp(x)))

def smooth_max_np(a, b, tau=TAU):
    return b + tau * _softplus_np((a - b) / tau)

def softplus_ad(x):
    return alog(1.0 + aexp(x))

def smooth_max_ad(a, b, tau=TAU):
    return b + tau * softplus_ad((a - b) / tau)

def _to_float(x):
    try:
        return float(x)
    except Exception:
        for attr in ("x", "val", "value", "data"):
            if hasattr(x, attr):
                try:
                    return float(getattr(x, attr))
                except Exception:
                    pass
        return float(np.array(x, dtype=float))

def _aad_mean(x):
    try:
        n = int(np.size(x))
        if n <= 1:
            return x.item() if hasattr(x, "item") else x
        total = None
        for xi in np.ravel(x):
            total = xi if total is None else (total + xi)
        return total * (1.0 / n)
    except Exception:
        return x

def _aad_reduce_scalar(x):
    shp = getattr(x, "shape", None)
    if shp is not None and shp != ():
        return _aad_mean(x)
    return x

# ---------------- frozen LSM：Full sample（For sanity check） ----------------
def frozen_lsm_price_aad(params):
    """Scalar frozen-LSM price (τ/Z frozen), AAD ops + smooth payoff/decision."""
    S0, sigma, r, T, K = params["S0"], params["sigma"], params["r"], params["T"], params["K"]
    Wcum, tau, dt, tgrid = (frozen_ctx[k] for k in ["Wcum","tau","dt","t_grid"])
    P, is_put = frozen_ctx["P"], (frozen_ctx["payoff"] == "put")

    total = 0.0
    for p in range(P):
        m = int(tau[p])
        t_m = tgrid[m]
        W_t = Wcum[p, m-1] if m >= 1 else 0.0
        mu  = r - 0.5 * sigma * sigma
        Sm  = S0 * aexp(mu * t_m + sigma * W_t)
        if is_put:
            payoff = smooth_max_ad(K, Sm, TAU)   # max(K-Sm,0)
        else:
            payoff = smooth_max_ad(Sm, K, TAU)   # max(Sm-K,0)
        total += aexp(-r * (m * dt)) * payoff
    return total / P

# ---------------- Frozrn LSM：Per batch ----------------
def frozen_lsm_price_aad_batch(params, batch_idx, index_subset=None):
    S0, sigma, r, K = params["S0"], params["sigma"], params["r"], params["K"]
    tau_all = frozen_ctx["tau"][batch_idx].astype(int)
    dt, tgrid, Wcum = frozen_ctx["dt"], frozen_ctx["t_grid"], frozen_ctx["Wcum"]
    is_put = (frozen_ctx["payoff"] == "put")

    # numeric copies for off-tape math
    S0_, sig_, r_, K_ = (frozen_ctx.get(k, _to_float(v))
                         for k, v in zip(["S0_base","sigma_base","r_base","K_base"],
                                         [S0, sigma, r, K]))

    # active indices（进入 AAD）
    if index_subset is None:
        active = np.arange(len(batch_idx))
    else:
        mask = np.isin(batch_idx, index_subset)
        active = np.nonzero(mask)[0]

    B = len(batch_idx)
    total = 0.0

    if index_subset is None:
        active = np.arange(B, dtype=int)
    else:
        mask = np.isin(batch_idx, index_subset)   # shape = (B,)
        active = np.nonzero(mask)[0]

    t_m_all = tgrid[tau_all]
    W_t_all = np.zeros(B)
    ge1 = tau_all >= 1
    if np.any(ge1):
        W_t_all[ge1] = Wcum[batch_idx[ge1], tau_all[ge1]-1]
    m_dt_all = tau_all * dt

    # --- off-tape ---
    if active.size < B:
        ia = np.setdiff1d(np.arange(B), active, assume_unique=True)
        tm, wt, mdt = t_m_all[ia], W_t_all[ia], m_dt_all[ia]
        mu = r_ - 0.5 * sig_ * sig_
        Sm = S0_ * np.exp(mu * tm + sig_ * wt)
        if is_put:
            payoff = smooth_max_np(K_, Sm, TAU)
        else:
            payoff = smooth_max_np(Sm, K_, TAU)
        total += np.mean(np.exp(-r_ * mdt) * payoff)

    # --- on-tape ---
    if active.size > 0:
        tm, wt, mdt = t_m_all[active], W_t_all[active], m_dt_all[active]
        acc = None
        for k in range(len(tm)):
            mu_k = r - 0.5 * sigma * sigma
            Sm_k = S0 * aexp(mu_k * tm[k] + sigma * wt[k])
            if is_put:
                pay_k = smooth_max_ad(K, Sm_k, TAU)
            else:
                pay_k = smooth_max_ad(Sm_k, K, TAU)
            pv_k  = aexp(-r * mdt[k]) * pay_k
            acc = pv_k if acc is None else (acc + pv_k)
        total += acc * (1.0 / len(tm))
    return _aad_reduce_scalar(total)

# ---------------- EP Hessian(3x3) ----------------
def _ep_partial_block(H_full, inputs, keys=("S0","sigma","r")):
    in_keys = list(inputs.keys())
    idx = [in_keys.index(k) for k in keys]
    H = np.asarray(H_full, float)
    H = 0.5 * (H + H.T)
    return H[np.ix_(idx, idx)]

def edge_push_hessian_batched(inputs, batch_size=256, subset_idx=None):
    """
    按 τ 分桶之后跑 EP。
    - inputs: {"S0":..., "sigma":..., "r":..., "K":...}
    - subset_idx: 如果你只想跑 near-boundary 那些路径，就传它；否则就是全量
    """
    tau_all = np.asarray(frozen_ctx["tau"], dtype=int)
    P = tau_all.shape[0]

    if subset_idx is not None:
        mask = np.isin(np.arange(P), subset_idx)
        tau_used = tau_all[mask]
        idx_used = np.arange(P)[mask]
    else:
        tau_used = tau_all
        idx_used = np.arange(P)
    buckets = {}
    for path_id, tstop in zip(idx_used, tau_used):
        buckets.setdefault(int(tstop), []).append(int(path_id))

    H_acc, n_eff = None, 0

    for tstop, path_ids in sorted(buckets.items(), key=lambda kv: kv[0]):
        path_ids = np.asarray(path_ids, dtype=int)

        for s in range(0, len(path_ids), batch_size):
            batch_ids = path_ids[s:s+batch_size]

            f_batch = (lambda idx=batch_ids:
                       (lambda prm: frozen_lsm_price_aad_batch(prm, idx, index_subset=idx)))(batch_ids)

            H_full = edge_push_hessian(f_batch, inputs, sparse=False)
            H_blk  = _ep_partial_block(H_full, inputs, ("S0","sigma","r"))

            H_acc = H_blk if H_acc is None else (H_acc + H_blk)
            n_eff += 1

    if n_eff == 0:
        raise RuntimeError("No buckets produced EP runs.")
    return H_acc / n_eff


inputs = {"S0": S0, "sigma": sigma, "r": r, "T": T, "K": K}
g = grads(frozen_lsm_price_aad, inputs)
print("Frozen-LSM grads (AAD):", {k: float(v) for k, v in g.items()})

nbm = frozen_ctx.get("near_boundary_mask", None)
subset_idx = np.nonzero(nbm)[0] if (nbm is not None and nbm.any()) else None
nb_count = 0 if subset_idx is None else subset_idx.size
print(f"Active-set size: {nb_count} / {frozen_ctx['P']}")


Frozen-LSM grads (AAD): {'S0': 0.5791020635160776, 'sigma': 39.963220156696394, 'r': -45.14516432961233, 'T': 0.0, 'K': 0.48905756500802056}
Active-set size: 38900 / 50000


In [None]:
# === Cell C: AAD (EP) vs Forward Bumping (CRN) — Greeks & Timing (smoothed) ===
import numpy as np, pandas as pd, time
from aad.core.engine import edge_push_hessian
from aad.core.seeds import grads
from aad.ops import exp as aexp

BATCH       = 1000
MAX_BATCHES = 40
CAP_ACTIVE  = 500
FD_STEP_1   = 1e-4
FD_STEP_2   = 1e-3
np.set_printoptions(suppress=True)
print(f"[Run] batch={BATCH}, max_batches={MAX_BATCHES}, cap_active={CAP_ACTIVE}, fd1={FD_STEP_1}, fd2={FD_STEP_2}")

P      = int(frozen_ctx["P"])
dt     = float(frozen_ctx["dt"])
tgrid  = frozen_ctx["t_grid"]
tau    = frozen_ctx["tau"].astype(int)
Wcum   = frozen_ctx["Wcum"]
is_put = (frozen_ctx["payoff"] == "put")

m_idx  = tau
t_m    = tgrid[m_idx]
W_t    = np.where(m_idx > 0, Wcum[np.arange(P), m_idx - 1], 0.0)
m_dt   = m_idx * dt

S0, sigma, r, T, K = float(frozen_ctx["S0_base"]), float(frozen_ctx["sigma_base"]), float(frozen_ctx["r_base"]), float(frozen_ctx.get("T_base", 1.0)), float(frozen_ctx["K_base"])
names_3 = ["S0","sigma","r"]

def make_batches(P, batch_size, max_batches=None, cap=None, seed=123):
    rng = np.random.default_rng(seed)
    n_full = int(np.ceil(P / batch_size))
    batches = []
    for bi in range(n_full):
        if max_batches is not None and len(batches) >= max_batches:
            break
        s, e = bi * batch_size, min(P, (bi + 1) * batch_size)
        idx = np.arange(s, e, dtype=int)
        if cap is not None and len(idx) > cap:
            idx = np.sort(rng.choice(idx, size=cap, replace=False))
        batches.append(idx)
    if not batches:
        raise RuntimeError("No batches; check knobs.")
    return batches

batches = make_batches(P, BATCH, MAX_BATCHES, CAP_ACTIVE, seed=123)
print(f"[Batches] {len(batches)} × ~{(CAP_ACTIVE or BATCH)} ≈ {len(batches)*(CAP_ACTIVE or BATCH)} paths total")

def price_numeric_on_indices(S0, sigma, r, T, K, indices):
    tm, wt, mdt = t_m[indices], W_t[indices], m_dt[indices]
    mu  = r - 0.5 * sigma * sigma
    Sm  = S0 * np.exp(mu * tm + sigma * wt)
    if is_put:
        payoff = smooth_max_np(K, Sm, TAU)
    else:
        payoff = smooth_max_np(Sm, K, TAU)
    return float(np.mean(np.exp(-r * mdt) * payoff))

def price_aad_on_indices(d, indices):
    S0, sigma, r, T, K = d["S0"], d["sigma"], d["r"], d["T"], d["K"]
    total = 0.0
    for p in indices:
        tm, wt, mdt = float(t_m[p]), float(W_t[p]), float(m_dt[p])
        mu  = r - 0.5 * sigma * sigma
        Sm  = S0 * aexp(mu * tm + sigma * wt)
        if is_put:
            payoff = smooth_max_ad(K, Sm, TAU)
        else:
            payoff = smooth_max_ad(Sm, K, TAU)
        total += aexp(-r * mdt) * payoff
    return total / float(len(indices))

t0 = time.perf_counter()
g_ep_acc = np.zeros(3, float)
for idx in batches:
    inputs3 = {"S0": S0, "sigma": sigma, "r": r}
    f3 = lambda prm3: price_aad_on_indices(
        {"S0": prm3["S0"], "sigma": prm3["sigma"], "r": prm3["r"], "T": float(T), "K": float(K)},
        idx
    )
    g3 = grads(f3, inputs3)
    g_ep_acc += np.array([float(g3["S0"]), float(g3["sigma"]), float(g3["r"])])
g1_ep = g_ep_acc / len(batches)
t_g1_ep = (time.perf_counter() - t0) * 1e3

from aad.core.tape import global_tape

t0 = time.perf_counter()
H_ep_acc = np.zeros((3, 3), float)
for idx in batches:
    global_tape.reset()

    inputs3 = {"S0": S0, "sigma": sigma, "r": r}

    def f3(prm3, idx=idx):
        return price_aad_on_indices(
            {
                "S0": prm3["S0"],
                "sigma": prm3["sigma"],
                "r": prm3["r"],
                "T": float(T),
                "K": float(K),
            },
            idx,
        )

    H_full = edge_push_hessian(f3, inputs3, sparse=False)
    H_full = np.array(H_full, float)
    H_full = 0.5 * (H_full + H_full.T)
    H_ep_acc += H_full

H2_ep = H_ep_acc / len(batches)
t_h2_ep = (time.perf_counter() - t0) * 1e3


def grad_fd_on_indices(x0, idx, h):
    f = lambda x: price_numeric_on_indices(x[0], x[1], x[2], T, K, idx)
    g = np.zeros(3, float)
    for i in range(3):
        xp, xm = x0.copy(), x0.copy()
        xp[i] += h; xm[i] -= h
        g[i] = (f(xp) - f(xm)) / (2*h)
    return g

t0 = time.perf_counter()
x0 = np.array([S0, sigma, r], float)
g_fd_acc = np.zeros(3, float)
for idx in batches:
    g_fd_acc += grad_fd_on_indices(x0, idx, FD_STEP_1)
g1_fd = g_fd_acc / len(batches)
t_g1_fd = (time.perf_counter() - t0) * 1e3

def hess_fd_on_indices(x0, idx, h):
    f = lambda x: price_numeric_on_indices(x[0], x[1], x[2], T, K, idx)
    H = np.zeros((3,3), float)
    f00 = f(x0)
    # diag
    for i in range(3):
        xp, xm = x0.copy(), x0.copy()
        xp[i] += h; xm[i] -= h
        H[i,i] = (f(xp) - 2*f00 + f(xm)) / (h*h)
    # off-diag
    for i in range(3):
        for j in range(i+1,3):
            xpp, xpm, xmp, xmm = [x0.copy() for _ in range(4)]
            xpp[i]+=h; xpp[j]+=h
            xpm[i]+=h; xpm[j]-=h
            xmp[i]-=h; xmm[j]-=h
            xmp[j]+=h; xmm[i]-=h
            H[i,j] = H[j,i] = (f(xpp)-f(xpm)-f(xmp)+f(xmm))/(4*h*h)
    return H

t0 = time.perf_counter()
H_fd_acc = np.zeros((3,3), float)
for idx in batches:
    H_fd_acc += hess_fd_on_indices(x0, idx, FD_STEP_2)
H2_fd = H_fd_acc / len(batches)
t_h2_fd = (time.perf_counter() - t0) * 1e3

df_g_ep = pd.Series(g1_ep, index=names_3, name="AAD-EP")
df_g_fd = pd.Series(g1_fd, index=names_3, name="FD")
df_H_ep = pd.DataFrame(H2_ep, index=names_3, columns=names_3)
df_H_fd = pd.DataFrame(H2_fd, index=names_3, columns=names_3)
df_H_diff = df_H_ep - df_H_fd

print("\n--- Timing (ms) ---")
print(f"AAD 1st (grads): {t_g1_ep:.1f}   |   FD 1st: {t_g1_fd:.1f}")
print(f"AAD 2nd (EP):    {t_h2_ep:.1f}   |   FD 2nd:  {t_h2_fd:.1f}")

print("\n--- First-order Greeks (batch-avg) ---")
print(pd.concat([df_g_ep, df_g_fd], axis=1))

print("\n--- Second-order Hessian 3×3 (AAD-EP) ---")
print(df_H_ep.round(6))
print("\n--- Second-order Hessian 3×3 (FD) ---")
print(df_H_fd.round(6))
print("\n--- Diff (AAD-EP - FD) ---")
print(df_H_diff.round(6))
print(f"\nmax|diff(H)| = {np.abs(df_H_diff.values).max():.3e}")

Gamma = H2_ep[0,0]; Vanna = H2_ep[0,1]; Volga = H2_ep[1,1]
print(f"\n[Key Greeks AAD]  Delta={g1_ep[0]:.4e}, Vega={g1_ep[1]:.4e}, Rho={g1_ep[2]:.4e}")
print(f"[Key Greeks AAD]  Gamma={Gamma:.4e}, Vanna={Vanna:.4e}, Volga={Volga:.4e}")
print(f"[Key Greeks  FD]  Delta={g1_fd[0]:.4e}, Vega={g1_fd[1]:.4e}, Rho={g1_fd[2]:.4e}")
print(f"[Key Greeks  FD]  Gamma={H2_fd[0,0]:.4e}, Vanna={H2_fd[0,1]:.4e}, Volga={H2_fd[1,1]:.4e}")


[Run] batch=1000, max_batches=40, cap_active=500, fd1=0.0001, fd2=0.001
[Batches] 40 × ~500 ≈ 20000 paths total

--- Timing (ms) ---
AAD 1st (grads): 43562.0   |   FD 1st: 17.2
AAD 2nd (EP):    538931.6   |   FD 2nd:  51.6

--- First-order Greeks (batch-avg) ---
          AAD-EP         FD
S0      0.581865   0.581865
sigma  40.115347  40.115347
r     -44.968157 -44.968158

--- Second-order Hessian 3×3 (AAD-EP) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Second-order Hessian 3×3 (FD) ---
             S0      sigma           r
S0     0.009615   0.160795    0.961112
sigma  0.160795  -8.269513  -24.030756
r      0.961112 -24.030756  138.539383

--- Diff (AAD-EP - FD) ---
             S0     sigma         r
S0     0.000000 -0.000002  0.000353
sigma -0.000002 -0.000008 -0.005276
r      0.000353 -0.005276  0.023477

max|diff(H)| = 2.348e-02

[Key Greeks AAD]  Delta=5.8187e-

In [None]:
# === Cell D: Taylor-backprop Hessian (batched, clean) ===
import time, numpy as np, pandas as pd
from aad.core.taylor_backprop import taylor_backpropagate
from aad.core.tape import global_tape
from aad.core.var import ADVar

KEYS_3 = ["S0", "sigma", "r"]

def _to_scalar(x):
    if isinstance(x, ADVar):
        return float(x.val)
    return float(x)

def _taylor_hessian_once(f_callable, inputs3):
    global_tape.reset()
    prm_ad = {
        k: ADVar(_to_scalar(inputs3[k]), requires_grad=True, name=k)
        for k in KEYS_3
    }
    y = f_callable(prm_ad)
    if not isinstance(y, ADVar):
        raise TypeError(f"f_callable must return ADVar, got {type(y)}")

    order = [prm_ad[k] for k in KEYS_3]
    g, H = taylor_backpropagate(order)

    return 0.5 * (H + H.T)

def taylor_hessian_batched(inputs3, batches):
    H_acc = np.zeros((3, 3), float)
    n_eff = 0

    for idx in batches:
        def f3(prm3, idx=idx):
            return price_aad_on_indices(
                {
                    "S0": prm3["S0"],
                    "sigma": prm3["sigma"],
                    "r": prm3["r"],
                    "T": float(T),
                    "K": float(K),
                },
                idx,
            )

        H_acc += _taylor_hessian_once(f3, inputs3)
        n_eff += 1

    if n_eff == 0:
        raise RuntimeError("No batches for Taylor.")

    return H_acc / n_eff

inputs_clean = {
    "S0": float(S0.val) if isinstance(S0, ADVar) else float(S0),
    "sigma": float(sigma.val) if isinstance(sigma, ADVar) else float(sigma),
    "r": float(r.val) if isinstance(r, ADVar) else float(r),
}

t0 = time.perf_counter()
H2_taylor = taylor_hessian_batched(inputs_clean, batches)
t_h2_taylor = (time.perf_counter() - t0) * 1e3

names_3 = ["S0", "sigma", "r"]
print("\n--- Second-order Hessian 3×3 (Taylor) ---")
print(pd.DataFrame(H2_taylor, index=names_3, columns=names_3).round(6))
print(f"\n[Timing] Taylor 2nd (batched): {t_h2_taylor:.1f} ms")



--- Second-order Hessian 3×3 (Taylor) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

[Timing] Taylor 2nd (batched): 25307.0 ms


In [None]:
# === Cell E: EP vs FD vs Taylor — speed & accuracy comparison ===
import numpy as np, pandas as pd

def frob(A):
    A = np.asarray(A, float)
    return float(np.sqrt(np.sum(A*A)))

print("\n================ SPEED (this run) ================")
print(f"EP (Hessian)   : {t_h2_ep:.1f} ms")
print(f"Taylor (Hess)  : {t_h2_taylor:.1f} ms")
print(f"FD (Hessian)   : {t_h2_fd:.1f} ms")
print(f"×(Taylor vs EP): ×{(t_h2_taylor / max(t_h2_ep,1e-12)):.2f}")
print(f"×(FD vs EP)    : ×{(t_h2_fd / max(t_h2_ep,1e-12)):.2f}")

den = max(frob(H2_ep), 1e-12)
relerr_T = frob(H2_taylor - H2_ep) / den
relerr_F = frob(H2_fd     - H2_ep) / den

print("\n================ ACCURACY (relative to EP) ================")
print(f"relerr(Taylor, EP) = {relerr_T:.3e}")
print(f"relerr(FD,    EP) = {relerr_F:.3e}")

df_H_ep     = pd.DataFrame(H2_ep,     index=names_3, columns=names_3)
df_H_taylor = pd.DataFrame(H2_taylor, index=names_3, columns=names_3)
df_H_fd     = pd.DataFrame(H2_fd,     index=names_3, columns=names_3)

print("\n--- Hessian (Edge-Pushing) ---")
print(df_H_ep.round(6))
print("\n--- Hessian (Taylor) ---")
print(df_H_taylor.round(6))
print("\n--- Hessian (FD) ---")
print(df_H_fd.round(6))

print("\n--- Diff: Taylor - EP ---")
print((df_H_taylor - df_H_ep).round(6))
print("\n--- Diff: FD - EP ---")
print((df_H_fd - df_H_ep).round(6))



EP (Hessian)   : 538931.6 ms
Taylor (Hess)  : 25307.0 ms
FD (Hessian)   : 51.6 ms
×(Taylor vs EP): ×0.05
×(FD vs EP)    : ×0.00

relerr(Taylor, EP) = 2.645e-15
relerr(FD,    EP) = 1.724e-04

--- Hessian (Edge-Pushing) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Hessian (Taylor) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Hessian (FD) ---
             S0      sigma           r
S0     0.009615   0.160795    0.961112
sigma  0.160795  -8.269513  -24.030756
r      0.961112 -24.030756  138.539383

--- Diff: Taylor - EP ---
        S0  sigma    r
S0     0.0   -0.0  0.0
sigma -0.0    0.0 -0.0
r      0.0   -0.0  0.0

--- Diff: FD - EP ---
             S0     sigma         r
S0    -0.000000  0.000002 -0.000353
sigma  0.000002  0.000008  0.005276
r     -0.00

In [None]:
# === Cell E: EP vs FD vs Taylor — speed & accuracy comparison ===
import numpy as np, pandas as pd

def frob(A):
    A = np.asarray(A, float)
    return float(np.sqrt(np.sum(A*A)))

print("\n================ SPEED (this run) ================")
print(f"EP (Hessian)   : {t_h2_ep:.1f} ms")
print(f"Taylor (Hess)  : {t_h2_taylor:.1f} ms")
print(f"FD (Hessian)   : {t_h2_fd:.1f} ms")
print(f"×(Taylor vs EP): ×{(t_h2_taylor / max(t_h2_ep,1e-12)):.2f}")
print(f"×(FD vs EP)    : ×{(t_h2_fd / max(t_h2_ep,1e-12)):.2f}")

den = max(frob(H2_ep), 1e-12)
relerr_T = frob(H2_taylor - H2_ep) / den
relerr_F = frob(H2_fd     - H2_ep) / den

print("\n================ ACCURACY (relative to EP) ================")
print(f"relerr(Taylor, EP) = {relerr_T:.3e}")
print(f"relerr(FD,    EP) = {relerr_F:.3e}")

df_H_ep     = pd.DataFrame(H2_ep,     index=names_3, columns=names_3)
df_H_taylor = pd.DataFrame(H2_taylor, index=names_3, columns=names_3)
df_H_fd     = pd.DataFrame(H2_fd,     index=names_3, columns=names_3)

print("\n--- Hessian (Edge-Pushing) ---")
print(df_H_ep.round(6))
print("\n--- Hessian (Taylor) ---")
print(df_H_taylor.round(6))
print("\n--- Hessian (FD) ---")
print(df_H_fd.round(6))

print("\n--- Diff: Taylor - EP ---")
print((df_H_taylor - df_H_ep).round(6))
print("\n--- Diff: FD - EP ---")
print((df_H_fd - df_H_ep).round(6))



EP (Hessian)   : 538931.6 ms
Taylor (Hess)  : 25307.0 ms
FD (Hessian)   : 51.6 ms
×(Taylor vs EP): ×0.05
×(FD vs EP)    : ×0.00

relerr(Taylor, EP) = 2.645e-15
relerr(FD,    EP) = 1.724e-04

--- Hessian (Edge-Pushing) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Hessian (Taylor) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Hessian (FD) ---
             S0      sigma           r
S0     0.009615   0.160795    0.961112
sigma  0.160795  -8.269513  -24.030756
r      0.961112 -24.030756  138.539383

--- Diff: Taylor - EP ---
        S0  sigma    r
S0     0.0   -0.0  0.0
sigma -0.0    0.0 -0.0
r      0.0   -0.0  0.0

--- Diff: FD - EP ---
             S0     sigma         r
S0    -0.000000  0.000002 -0.000353
sigma  0.000002  0.000008  0.005276
r     -0.00

In [None]:
# === Final consolidated output: EP / FD / Taylor full comparison ===
import numpy as np, pandas as pd

names_3 = ["S0", "sigma", "r"]

# --- Run config (echo) ---
print("\n[Run] batch={}, max_batches={}, cap_active={}, fd1={}, fd2={}".format(
    BATCH,
    MAX_BATCHES,
    CAP_ACTIVE,
    FD_STEP_1,
    FD_STEP_2,
))
print("[Batches] {} × ~{} ≈ {} paths total".format(
    len(batches),
    (CAP_ACTIVE or BATCH),
    len(batches) * (CAP_ACTIVE or BATCH),
))

# --- Problem setup (frozen) ---
print("\n=== Problem setup (frozen) ===")
print(f"S0={S0:.4f}, K={K:.4f}, r={r:.4f}, q={q:.4f}, sigma={sigma:.4f}, T={T:.4f}")
print(f"P={frozen_ctx['P']}, M={frozen_ctx['M']}, payoff={frozen_ctx['payoff']}")
early_ratio = float((frozen_ctx["tau"] < frozen_ctx["M"]).mean())
print(f"early-exercise ratio ≈ {early_ratio:.3f}")
nbm = frozen_ctx.get("near_boundary_mask", None)
if nbm is not None:
    print(f"near-boundary size = {int(nbm.sum())}")
else:
    print("near-boundary size = N/A")

# --- Timing summary ---
print("\n--- Timing (ms) ---")
print(f"AAD 1st (grads): {t_g1_ep:.1f}   |   FD 1st: {t_g1_fd:.1f}")
print(f"AAD 2nd (EP):    {t_h2_ep:.1f}   |   FD 2nd:  {t_h2_fd:.1f}")
print(f"Taylor 2nd:      {t_h2_taylor:.1f}")

# --- First-order Greeks ---
df_greeks = pd.concat(
    [
        pd.Series(g1_ep, index=names_3, name="AAD-EP"),
        pd.Series(g1_fd, index=names_3, name="FD"),
    ],
    axis=1,
)
print("\n--- First-order Greeks (batch-avg) ---")
print(df_greeks.round(6))

# --- Second-order Hessians ---
df_H_ep = pd.DataFrame(H2_ep, index=names_3, columns=names_3)
df_H_fd = pd.DataFrame(H2_fd, index=names_3, columns=names_3)
df_H_taylor = pd.DataFrame(H2_taylor, index=names_3, columns=names_3)

print("\n--- Second-order Hessian 3×3 (AAD-EP) ---")
print(df_H_ep.round(6))
print("\n--- Second-order Hessian 3×3 (FD) ---")
print(df_H_fd.round(6))
print("\n--- Second-order Hessian 3×3 (Taylor) ---")
print(df_H_taylor.round(6))

# --- Diff matrices ---
print("\n--- Diff (Taylor - EP) ---")
print((df_H_taylor - df_H_ep).round(6))
print("\n--- Diff (EP - FD) ---")
print((df_H_ep - df_H_fd).round(6))
print("\n--- Diff (Taylor - FD) ---")
print((df_H_taylor - df_H_fd).round(6))

# --- Max difference summary ---
maxdiff_T_EP = np.abs(H2_taylor - H2_ep).max()
maxdiff_EP_FD = np.abs(H2_ep - H2_fd).max()
maxdiff_T_FD  = np.abs(H2_taylor - H2_fd).max()
print(f"\nmax|diff(H_Taylor - EP)| = {maxdiff_T_EP:.3e}")
print(f"max|diff(H_EP - FD)|     = {maxdiff_EP_FD:.3e}")
print(f"max|diff(H_Taylor - FD)| = {maxdiff_T_FD:.3e}")

# --- Key Greeks summary ---
Gamma_E = H2_ep[0, 0];    Vanna_E = H2_ep[0, 1];    Volga_E = H2_ep[1, 1]
Gamma_F = H2_fd[0, 0];    Vanna_F = H2_fd[0, 1];    Volga_F = H2_fd[1, 1]
Gamma_T = H2_taylor[0, 0]; Vanna_T = H2_taylor[0, 1]; Volga_T = H2_taylor[1, 1]

print(f"\n[Key Greeks  AAD]   Delta={g1_ep[0]:.4e}, Vega={g1_ep[1]:.4e}, Rho={g1_ep[2]:.4e}")
print(f"[Key Greeks  AAD]   Gamma={Gamma_E:.4e}, Vanna={Vanna_E:.4e}, Volga={Volga_E:.4e}")
print(f"[Key Greeks   FD]   Delta={g1_fd[0]:.4e}, Vega={g1_fd[1]:.4e}, Rho={g1_fd[2]:.4e}")
print(f"[Key Greeks   FD]   Gamma={Gamma_F:.4e}, Vanna={Vanna_F:.4e}, Volga={Volga_F:.4e}")
print(f"[Key Greeks Taylor] Delta={g1_ep[0]:.4e}, Vega={g1_ep[1]:.4e}, Rho={g1_ep[2]:.4e}")
print(f"[Key Greeks Taylor] Gamma={Gamma_T:.4e}, Vanna={Vanna_T:.4e}, Volga={Volga_T:.4e}")



[Run] batch=1000, max_batches=40, cap_active=500, fd1=0.0001, fd2=0.001
[Batches] 40 × ~500 ≈ 20000 paths total

=== Problem setup (frozen) ===
S0=100.0000, K=100.0000, r=0.0300, q=0.0000, sigma=0.2000, T=1.0000
P=50000, M=50, payoff=put
early-exercise ratio ≈ 0.851
near-boundary size = 38900

--- Timing (ms) ---
AAD 1st (grads): 43562.0   |   FD 1st: 17.2
AAD 2nd (EP):    538931.6   |   FD 2nd:  51.6
Taylor 2nd:      25307.0

--- First-order Greeks (batch-avg) ---
          AAD-EP         FD
S0      0.581865   0.581865
sigma  40.115347  40.115347
r     -44.968157 -44.968158

--- Second-order Hessian 3×3 (AAD-EP) ---
             S0      sigma           r
S0     0.009615   0.160793    0.961465
sigma  0.160793  -8.269521  -24.036032
r      0.961465 -24.036032  138.562860

--- Second-order Hessian 3×3 (FD) ---
             S0      sigma           r
S0     0.009615   0.160795    0.961112
sigma  0.160795  -8.269513  -24.030756
r      0.961112 -24.030756  138.539383

--- Second-order Hessi

In [None]:
import numpy as np, pandas as pd

names_3 = ["S0", "sigma", "r"]

# Convert ms → seconds
t_ep_2nd  = t_h2_ep     / 1000
t_fd_2nd  = t_h2_fd     / 1000
t_tay_2nd = t_h2_taylor / 1000

df_time = pd.DataFrame({
    "AAD-EP (2nd) Time (s)": [t_ep_2nd],
    "FD (2nd) Time (s)":     [t_fd_2nd],
    "Taylor (2nd) Time (s)": [t_tay_2nd],
    "Speedup EP vs FD":      [t_fd_2nd / t_ep_2nd],
    "Speedup Taylor vs FD":  [t_fd_2nd / t_tay_2nd],
}, index=["Value"]).round(4)


# === Other Results (unchanged) ===
df_greeks = pd.DataFrame({
    "AAD-EP": g1_ep,
    "FD":     g1_fd
}, index=names_3).round(6)

df_key = pd.DataFrame({
    "AAD-EP":  [H2_ep[0,0],  H2_ep[0,1],  H2_ep[1,1]],
    "FD":      [H2_fd[0,0],  H2_fd[0,1],  H2_fd[1,1]],
    "Taylor":  [H2_taylor[0,0], H2_taylor[0,1], H2_taylor[1,1]]
}, index=["Gamma", "Vanna", "Volga"]).round(6)

df_diff = pd.DataFrame({
    "max|Taylor - EP|": [np.abs(H2_taylor - H2_ep).max()],
    "max|EP - FD|":     [np.abs(H2_ep - H2_fd).max()],
    "max|Taylor - FD|": [np.abs(H2_taylor - H2_fd).max()],
}).applymap(lambda x: f"{x:.3e}")


# ========= Final Print =========
print("\n=== 2nd-Order Runtime & Speedup Summary ===")
print(df_time)

print("\n=== First-order Greeks (Δ, Vega, Rho) ===")
print(df_greeks)

print("\n=== Key 2nd-order Greeks (Gamma, Vanna, Volga) ===")
print(df_key)

print("\n=== Maximum Absolute Differences Between Methods ===")
print(df_diff)



=== 2nd-Order Runtime & Speedup Summary ===
       AAD-EP (2nd) Time (s)  FD (2nd) Time (s)  Taylor (2nd) Time (s)  \
Value               538.9316             0.0516                 25.307   

       Speedup EP vs FD  Speedup Taylor vs FD  
Value            0.0001                 0.002  

=== First-order Greeks (Δ, Vega, Rho) ===
          AAD-EP         FD
S0      0.581865   0.581865
sigma  40.115347  40.115347
r     -44.968157 -44.968158

=== Key 2nd-order Greeks (Gamma, Vanna, Volga) ===
         AAD-EP        FD    Taylor
Gamma  0.009615  0.009615  0.009615
Vanna  0.160793  0.160795  0.160793
Volga -8.269521 -8.269513 -8.269521

=== Maximum Absolute Differences Between Methods ===
  max|Taylor - EP| max|EP - FD| max|Taylor - FD|
0        3.695e-13    2.348e-02        2.348e-02


  }).applymap(lambda x: f"{x:.3e}")


In [None]:
import numpy as np
import pandas as pd

names_3 = ["S0", "sigma", "r"]

# ===== 1. Runtime + Speedup 表格（秒） =====
# 把 ms 换成 s
time_1st_ep  = t_g1_ep      / 1000.0
time_1st_fd  = t_g1_fd      / 1000.0
time_2nd_ep  = t_h2_ep      / 1000.0
time_2nd_fd  = t_h2_fd      / 1000.0
time_2nd_tay = t_h2_taylor  / 1000.0

df_time = pd.DataFrame({
    "Method": [
        "AAD-EP (1st)", "FD (1st)",
        "AAD-EP (2nd)", "FD (2nd)", "Taylor (2nd)"
    ],
    "Order": [
        "1st", "1st",
        "2nd", "2nd", "2nd"
    ],
    "Time (s)": [
        time_1st_ep, time_1st_fd,
        time_2nd_ep, time_2nd_fd, time_2nd_tay
    ]
})

display(
    df_time.style.format({
        "Time (s)": "{:.4f}"
    })
)


# ===== 2. Key Greeks（Δ, Vega, Rho, Gamma, Vanna, Volga）表格 =====
Gamma_E = H2_ep[0, 0];    Vanna_E = H2_ep[0, 1];    Volga_E = H2_ep[1, 1]
Gamma_F = H2_fd[0, 0];    Vanna_F = H2_fd[0, 1];    Volga_F = H2_fd[1, 1]
Gamma_T = H2_taylor[0, 0]; Vanna_T = H2_taylor[0, 1]; Volga_T = H2_taylor[1, 1]

df_greeks_all = pd.DataFrame(
    data={
        "Delta": [g1_ep[0],  g1_fd[0],  g1_ep[0]],
        "Vega":  [g1_ep[1],  g1_fd[1],  g1_ep[1]],
        "Rho":   [g1_ep[2],  g1_fd[2],  g1_ep[2]],
        "Gamma": [Gamma_E,   Gamma_F,   Gamma_T],
        "Vanna": [Vanna_E,   Vanna_F,   Vanna_T],
        "Volga": [Volga_E,   Volga_F,   Volga_T],
    },
    index=["AAD-EP", "FD", "Taylor"]
)

display(
    df_greeks_all.style.format("{:.6f}")
)


df_H_ep     = pd.DataFrame(H2_ep,     index=names_3, columns=names_3)
df_H_fd     = pd.DataFrame(H2_fd,     index=names_3, columns=names_3)
df_H_taylor = pd.DataFrame(H2_taylor, index=names_3, columns=names_3)

print("Hessian (AAD-EP)")
display(df_H_ep.style.format("{:.6f}"))

print("Hessian (FD)")
display(df_H_fd.style.format("{:.6f}"))

print("Hessian (Taylor)")
display(df_H_taylor.style.format("{:.6f}"))


maxdiff_T_EP = np.abs(H2_taylor - H2_ep).max()
maxdiff_EP_FD = np.abs(H2_ep - H2_fd).max()
maxdiff_T_FD  = np.abs(H2_taylor - H2_fd).max()

df_diff = pd.DataFrame({
    "Pair": ["Taylor - EP", "EP - FD", "Taylor - FD"],
    "max|diff(H)|": [maxdiff_T_EP, maxdiff_EP_FD, maxdiff_T_FD],
})

display(
    df_diff.style.format({"max|diff(H)|": "{:.3e}"})
)


Unnamed: 0,Method,Order,Time (s)
0,AAD-EP (1st),1st,43.562
1,FD (1st),1st,0.0172
2,AAD-EP (2nd),2nd,538.9316
3,FD (2nd),2nd,0.0516
4,Taylor (2nd),2nd,25.307


Unnamed: 0,Delta,Vega,Rho,Gamma,Vanna,Volga
AAD-EP,0.581865,40.115347,-44.968157,0.009615,0.160793,-8.269521
FD,0.581865,40.115347,-44.968158,0.009615,0.160795,-8.269513
Taylor,0.581865,40.115347,-44.968157,0.009615,0.160793,-8.269521


Hessian (AAD-EP)


Unnamed: 0,S0,sigma,r
S0,0.009615,0.160793,0.961465
sigma,0.160793,-8.269521,-24.036032
r,0.961465,-24.036032,138.56286


Hessian (FD)


Unnamed: 0,S0,sigma,r
S0,0.009615,0.160795,0.961112
sigma,0.160795,-8.269513,-24.030756
r,0.961112,-24.030756,138.539383


Hessian (Taylor)


Unnamed: 0,S0,sigma,r
S0,0.009615,0.160793,0.961465
sigma,0.160793,-8.269521,-24.036032
r,0.961465,-24.036032,138.56286


Unnamed: 0,Pair,max|diff(H)|
0,Taylor - EP,3.695e-13
1,EP - FD,0.02348
2,Taylor - FD,0.02348


In [None]:
import pandas as pd
import numpy as np

# === Key Greeks ===
Gamma_E = H2_ep[0, 0];    Vanna_E = H2_ep[0, 1];    Volga_E = H2_ep[1, 1]
Gamma_F = H2_fd[0, 0];    Vanna_F = H2_fd[0, 1];    Volga_F = H2_fd[1, 1]
Gamma_T = H2_taylor[0, 0]; Vanna_T = H2_taylor[0, 1]; Volga_T = H2_taylor[1, 1]

# === Hessian max-diff ===
maxdiff_T_EP = np.abs(H2_taylor - H2_ep).max()
maxdiff_EP_FD = np.abs(H2_ep - H2_fd).max()
maxdiff_T_FD  = np.abs(H2_taylor - H2_fd).max()

df_greeks_all = pd.DataFrame(
    data={
        "Delta": [g1_ep[0],  g1_fd[0],  g1_ep[0]],
        "Vega":  [g1_ep[1],  g1_fd[1],  g1_ep[1]],
        "Rho":   [g1_ep[2],  g1_fd[2],  g1_ep[2]],
        "Gamma": [Gamma_E,   Gamma_F,   Gamma_T],
        "Vanna": [Vanna_E,   Vanna_F,   Vanna_T],
        "Volga": [Volga_E,   Volga_F,   Volga_T],
        "max|diff(H) vs EP|": [
            None,               # EP baseline
            maxdiff_EP_FD,      # FD
            maxdiff_T_EP        # Taylor
        ],
        "max|diff(H) vs FD|": [
            maxdiff_EP_FD,      # EP
            None,               # FD baseline
            maxdiff_T_FD        # Taylor
        ],
    },
    index=["AAD-EP", "FD", "Taylor"]
)

display(
    df_greeks_all.style.format({
        "Delta": "{:.6f}", "Vega": "{:.6f}", "Rho": "{:.6f}",
        "Gamma": "{:.6f}", "Vanna": "{:.6f}", "Volga": "{:.6f}",
        "max|diff(H) vs EP|": "{:.3e}",
        "max|diff(H) vs FD|": "{:.3e}",
    }).set_properties(**{"text-align": "center"})
)


Unnamed: 0,Delta,Vega,Rho,Gamma,Vanna,Volga,max|diff(H) vs EP|,max|diff(H) vs FD|
AAD-EP,0.581865,40.115347,-44.968157,0.009615,0.160793,-8.269521,,0.02348
FD,0.581865,40.115347,-44.968158,0.009615,0.160795,-8.269513,0.02348,
Taylor,0.581865,40.115347,-44.968157,0.009615,0.160793,-8.269521,3.695e-13,0.02348
