In [8]:
# asian_option_pricer.py
import math
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict

import numpy as np


@dataclass
class MCResult:
    price: float
    stderr: float
    ci95: Tuple[float, float]
    extras: Dict


def _validate_inputs(
    spot: float,
    strike: Optional[float],
    r: float,
    q: float,
    sigma: float,
    T: float,
    N: int,
    option_type: str,
    strike_type: str,
    paths: int,
    observed_fixings: Optional[List[float]],
):
    if not (spot > 0):
        raise ValueError("spot must be > 0.")
    if strike_type == "fixed" and not (strike is not None and strike > 0):
        raise ValueError("fixed strike requires K > 0; use strike=None for 'floating'.")
    if strike_type == "floating" and strike is not None:
        raise ValueError("floating strike requires strike=None.")
    if T <= 0:
        raise ValueError("T must be > 0 (years).")
    if not (isinstance(N, int) and N >= 1):
        raise ValueError("N (number of fixings) must be an integer >= 1.")
    if sigma < 0:
        raise ValueError("sigma must be >= 0.")
    if option_type not in {"call", "put"}:
        raise ValueError("option_type must be 'call' or 'put'.")
    if strike_type not in {"fixed", "floating"}:
        raise ValueError("strike_type must be 'fixed' or 'floating'.")
    if not (isinstance(paths, int) and paths >= 1):
        raise ValueError("paths must be an integer >= 1.")
    if observed_fixings is not None:
        if any(x <= 0 for x in observed_fixings):
            raise ValueError("observed_fixings must contain positive prices.")
        if len(observed_fixings) > N:
            raise ValueError("len(observed_fixings) cannot exceed N.")


def _print_inputs_summary(
    spot: float,
    strike: Optional[float],
    r: float,
    q: float,
    sigma: float,
    T: float,
    N: int,
    option_type: str,
    strike_type: str,
    paths: int,
    seed: Optional[int],
    observed_fixings: Optional[List[float]],
    use_antithetic: bool,
    use_geom_cv: bool,
    delta_bump: float,
    vega_bump: float,
):
    print("=== Inputs Summary ===")
    print(f"S0={spot:.6f}  r={r:.6f}  q={q:.6f}  sigma={sigma:.6f}  T={T:.6f}")
    print(f"N={N}  type={option_type}/{strike_type}  paths={paths}  seed={seed}")
    if strike_type == "fixed":
        print(f"K={strike:.6f}")
    if observed_fixings and len(observed_fixings) > 0:
        print(
            f"seasoning: {len(observed_fixings)}/{N} fixes locked "
            f"(first={observed_fixings[0]:.6f}, last={observed_fixings[-1]:.6f})"
        )
    else:
        print("seasoning: none")
    print(f"antithetic={use_antithetic}  geom_cv={use_geom_cv}")
    print(f"delta_bump={delta_bump:.6f}  vega_bump={vega_bump:.6f}")


def _rng_and_first_step_checks(seed: Optional[int], r: float, q: float, sigma: float, T: float, N: int):
    rng = np.random.default_rng(seed)
    Z = rng.standard_normal(50_000)
    print("=== RNG Check (Z ~ N(0,1)) ===")
    print(f"mean={Z.mean():.4f}  std={Z.std(ddof=1):.4f}  min={Z.min():.4f}  max={Z.max():.4f}")
    dt = T / N
    mu_dt = (r - q - 0.5 * sigma**2) * dt
    pilot = mu_dt + sigma * np.sqrt(dt) * rng.standard_normal(50_000)
    print("=== First-Step Log-Return Check ===")
    print(f"empirical_mean={pilot.mean():.6f}  theory_mean={mu_dt:.6f}")


def _simulate_paths_arith_and_geom_avgs(
    spot: float,
    r: float,
    q: float,
    sigma: float,
    T: float,
    N: int,
    paths: int,
    seed: Optional[int],
    observed_fixings: Optional[List[float]],
    use_antithetic: bool,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Simulate equally spaced fixings; return arithmetic and geometric averages for each path.
    Observed fixings are included directly; only remaining fixes are simulated.
    """
    rng = np.random.default_rng(seed)
    k = len(observed_fixings) if observed_fixings is not None else 0
    remaining = N - k
    dt = T / N

    # Prepare arrays for arithmetic and geometric accumulators
    # Start with observed portion
    arith_sum = 0.0
    geom_log_sum = 0.0
    if k > 0:
        obs = np.array(observed_fixings, dtype=float)
        arith_sum += obs.sum()
        geom_log_sum += np.log(obs).sum()

    # If no remaining fixings, just replicate across paths
    if remaining == 0:
        arith_avg = np.full(paths, arith_sum / N, dtype=float)
        geom_avg = np.full(paths, np.exp(geom_log_sum / N), dtype=float)
        return arith_avg, geom_avg

    # Number of effective simulated paths after antithetic pairing
    base_paths = paths if not use_antithetic else (paths + (paths % 2)) // 2

    # Precompute drift/vol per step (for equal spacing)
    drift = (r - q - 0.5 * sigma**2) * dt
    vol = sigma * math.sqrt(dt)

    # Simulate remaining fixings
    arith_acc = np.zeros(base_paths, dtype=float)
    geom_log_acc = np.zeros(base_paths, dtype=float)

    # Simulate the price process at each remaining fixing time
    S = np.full(base_paths, spot, dtype=float)
    for _ in range(remaining):
        Z = rng.standard_normal(base_paths)
        S = S * np.exp(drift + vol * Z)
        arith_acc += S
        geom_log_acc += np.log(S)

    # Combine with observed pieces
    arith_avg_base = (arith_sum + arith_acc) / N
    geom_avg_base = np.exp((geom_log_sum + geom_log_acc) / N)

    if not use_antithetic:
        return arith_avg_base, geom_avg_base

    # Build antithetic mates by re-running the same loop with -Z
    rng = np.random.default_rng(seed)  # reset to reproduce Z sequence
    arith_acc_a = np.zeros(base_paths, dtype=float)
    geom_log_acc_a = np.zeros(base_paths, dtype=float)
    S = np.full(base_paths, spot, dtype=float)
    for _ in range(remaining):
        Z = rng.standard_normal(base_paths)
        S = S * np.exp(drift - vol * Z)  # antithetic step
        arith_acc_a += S
        geom_log_acc_a += np.log(S)
    arith_avg_anti = (arith_sum + arith_acc_a) / N
    geom_avg_anti = np.exp((geom_log_sum + geom_log_acc_a) / N)

    # Concatenate base and antithetic to reach ~paths
    arith_all = np.concatenate([arith_avg_base, arith_avg_anti], axis=0)
    geom_all = np.concatenate([geom_avg_base, geom_avg_anti], axis=0)

    # Trim if overshoot
    if arith_all.shape[0] > paths:
        arith_all = arith_all[:paths]
        geom_all = geom_all[:paths]

    return arith_all, geom_all


def _asian_payoff_fixed_from_avgs(arith_avg: np.ndarray, option_type: str, K: float) -> np.ndarray:
    if option_type == "call":
        return np.maximum(arith_avg - K, 0.0)
    else:
        return np.maximum(K - arith_avg, 0.0)


def _discount_factor(r: float, T: float) -> float:
    return math.exp(-r * T)


def _geom_asian_price_closed_form(spot: float, K: float, r: float, q: float, sigma: float, T: float, N: int, option_type: str) -> float:
    """
    Closed-form price for *geometric-average* Asian option (fixed strike) under GBM with equally spaced fixings.
    """
    # Effective parameters for geometric average
    sigma_g = sigma * math.sqrt((2 * N + 1) / (6 * N))
    mu_g = (r - q - 0.5 * sigma**2) * (N + 1) / (2 * N) + q
    F_g = spot * math.exp((mu_g - q) * T)
    d1 = (math.log(F_g / K) + 0.5 * sigma_g**2 * T) / (sigma_g * math.sqrt(T))
    d2 = d1 - sigma_g * math.sqrt(T)
    from math import erf, sqrt

    def norm_cdf(x):
        return 0.5 * (1.0 + erf(x / sqrt(2.0)))

    df = math.exp(-r * T)
    if option_type == "call":
        return df * (F_g * norm_cdf(d1) - K * norm_cdf(d2))
    else:
        return df * (K * norm_cdf(-d2) - F_g * norm_cdf(-d1))


def price_asian_mc(
    spot: float,
    strike: Optional[float],
    r: float,
    q: float,
    sigma: float,
    T: float,
    N: int,
    option_type: str,  # "call" or "put"
    strike_type: str,  # "fixed" or "floating"
    paths: int = 100_000,
    seed: Optional[int] = 42,
    observed_fixings: Optional[List[float]] = None,
    use_antithetic: bool = True,
    use_geom_cv: bool = True,
    delta_bump: Optional[float] = None,
    vega_bump: Optional[float] = None,
    return_payoffs: bool = False,
) -> MCResult:
    """
    Monte Carlo price for arithmetic-average Asian options under risk-neutral GBM.

    - Fixed-strike options supported directly (with geometric-average control variate).
    - Floating-strike needs terminal price; use price_asian_mc_floating for that case.
    """
    _validate_inputs(spot, strike, r, q, sigma, T, N, option_type, strike_type, paths, observed_fixings)

    # Default bumps
    if delta_bump is None:
        delta_bump = 0.01 * spot  # 1% spot bump
    if vega_bump is None:
        vega_bump = 0.01 if sigma == 0 else 0.01 * sigma  # 1 vol point or 1% of sigma

    _print_inputs_summary(
        spot, strike, r, q, sigma, T, N, option_type, strike_type, paths, seed,
        observed_fixings, use_antithetic, use_geom_cv, delta_bump, vega_bump
    )
    _rng_and_first_step_checks(seed, r, q, sigma, T, N)

    df = _discount_factor(r, T)

    if strike_type == "floating":
        raise NotImplementedError(
            "Floating-strike requires terminal price S_T along each path. "
            "Use price_asian_mc_floating(...) instead."
        )

    # Simulate averages
    arith_avg, geom_avg = _simulate_paths_arith_and_geom_avgs(
        spot, r, q, sigma, T, N, paths, seed, observed_fixings, use_antithetic
    )

    # Fixed-strike payoff
    K = float(strike)
    payoff = _asian_payoff_fixed_from_avgs(arith_avg, option_type, K)

    # Control variate with geometric-average payoff (pathwise) using closed-form as control mean
    extras = {"cv_used": False}
    if use_geom_cv:
        if option_type == "call":
            payoff_geom = np.maximum(geom_avg - K, 0.0)
        else:
            payoff_geom = np.maximum(K - geom_avg, 0.0)
        try:
            geom_price_cf = _geom_asian_price_closed_form(spot, K, r, q, sigma, T, N, option_type)
            mY = geom_price_cf / df  # undiscounted control mean
            X = payoff
            Y = payoff_geom
            covXY = np.cov(X, Y, ddof=1)[0, 1]
            varY = np.var(Y, ddof=1)
            beta = 0.0 if varY == 0 else covXY / varY
            adjusted = X - beta * (Y - mY)
            disc_payoffs = df * adjusted
            price = float(np.mean(disc_payoffs))
            stderr = float(np.std(disc_payoffs, ddof=1) / np.sqrt(len(disc_payoffs)))
            extras.update(
                {
                    "cv_used": True,
                    "cv_beta": float(beta),
                    "cv_corr": float(np.corrcoef(X, Y)[0, 1]) if varY > 0 else float("nan"),
                }
            )
        except Exception as e:
            disc_payoffs = df * payoff
            price = float(np.mean(disc_payoffs))
            stderr = float(np.std(disc_payoffs, ddof=1) / np.sqrt(len(disc_payoffs)))
            extras.update({"cv_used": False, "cv_error": str(e)})
    else:
        disc_payoffs = df * payoff
        price = float(np.mean(disc_payoffs))
        stderr = float(np.std(disc_payoffs, ddof=1) / np.sqrt(len(disc_payoffs)))

    # 95% CI
    ci_lo = price - 1.96 * stderr
    ci_hi = price + 1.96 * stderr

    # Greeks via CRN bumps
    def _price_only(S0: float, vol: float) -> float:
        aa, ga = _simulate_paths_arith_and_geom_avgs(
            S0, r, q, vol, T, N, paths, seed, observed_fixings, use_antithetic
        )
        X = _asian_payoff_fixed_from_avgs(aa, option_type, K)
        if use_geom_cv:
            if option_type == "call":
                Y = np.maximum(ga - K, 0.0)
            else:
                Y = np.maximum(K - ga, 0.0)
            try:
                geom_cf = _geom_asian_price_closed_form(S0, K, r, q, vol, T, N, option_type)
                mY = geom_cf / df
                covXY = np.cov(X, Y, ddof=1)[0, 1]
                varY = np.var(Y, ddof=1)
                beta = 0.0 if varY == 0 else covXY / varY
                adj = X - beta * (Y - mY)
                return float(df * np.mean(adj))
            except Exception:
                return float(df * np.mean(X))
        else:
            return float(df * np.mean(X))

    # Delta
    p_up = _price_only(spot + delta_bump, sigma)
    p_dn = _price_only(max(1e-12, spot - delta_bump), sigma)
    delta = (p_up - p_dn) / (2.0 * delta_bump)

    # Vega (absolute vol bump)
    vol_up = sigma + vega_bump
    vol_dn = max(1e-12, sigma - vega_bump)
    p_up_v = _price_only(spot, vol_up)
    p_dn_v = _price_only(spot, vol_dn)
    vega = (p_up_v - p_dn_v) / (2.0 * vega_bump)

    print("=== Results ===")
    print(f"price={price:.6f}  stderr={stderr:.6f}  ci95=({ci_lo:.6f}, {ci_hi:.6f})")
    print("=== Greeks (CRN) ===")
    print(f"Delta={delta:.6f}  Vega={vega:.6f}")
    if extras.get("cv_used", False):
        print(f"control_variate: used  corr={extras.get('cv_corr', float('nan')):.3f}")
    print(f"antithetic={use_antithetic}  seed={seed}")

    extras.update(
        {
            "antithetic": use_antithetic,
            "seed": seed,
            "paths": paths,
            "delta_bump": delta_bump,
            "vega_bump": vega_bump,
        }
    )

    return MCResult(price=price, stderr=stderr, ci95=(ci_lo, ci_hi), extras=extras)


def price_asian_mc_floating(
    spot: float,
    r: float,
    q: float,
    sigma: float,
    T: float,
    N: int,
    option_type: str,  # "call" or "put"
    paths: int = 100_000,
    seed: Optional[int] = 42,
    observed_fixings: Optional[List[float]] = None,
    use_antithetic: bool = True,
    delta_bump: Optional[float] = None,
    vega_bump: Optional[float] = None,
) -> MCResult:
    """
    Floating-strike Asian option under GBM with equally spaced fixings.
    payoff_call = max(S_T - A, 0); payoff_put = max(A - S_T, 0).
    Includes seasoning via observed_fixings. Greeks via CRN bumps.
    """
    _validate_inputs(spot, None, r, q, sigma, T, N, option_type, "floating", paths, observed_fixings)

    if delta_bump is None:
        delta_bump = 0.01 * spot
    if vega_bump is None:
        vega_bump = 0.01 if sigma == 0 else 0.01 * sigma

    _print_inputs_summary(
        spot, None, r, q, sigma, T, N, option_type, "floating", paths, seed,
        observed_fixings, use_antithetic, False, delta_bump, vega_bump
    )
    _rng_and_first_step_checks(seed, r, q, sigma, T, N)

    df = _discount_factor(r, T)
    rng = np.random.default_rng(seed)
    k = len(observed_fixings) if observed_fixings is not None else 0
    remaining = N - k
    dt = T / N
    drift = (r - q - 0.5 * sigma**2) * dt
    vol = sigma * math.sqrt(dt)

    base_paths = paths if not use_antithetic else (paths + (paths % 2)) // 2

    arith_sum_obs = 0.0
    if k > 0:
        arith_sum_obs = float(np.sum(np.array(observed_fixings, dtype=float)))

    def sim_once(sign: float) -> Tuple[np.ndarray, np.ndarray]:
        S = np.full(base_paths, spot, dtype=float)
        arith_acc = np.zeros(base_paths, dtype=float)
        for _ in range(remaining):
            Z = rng.standard_normal(base_paths)
            S *= np.exp(drift + sign * vol * Z)
            arith_acc += S
        S_T = S.copy()
        A = (arith_sum_obs + arith_acc) / N
        return S_T, A

    S_T_base, A_base = sim_once(+1.0)
    if use_antithetic:
        rng = np.random.default_rng(seed)
        S_T_anti, A_anti = sim_once(-1.0)
        S_T_all = np.concatenate([S_T_base, S_T_anti], axis=0)
        A_all = np.concatenate([A_base, A_anti], axis=0)
        if S_T_all.shape[0] > paths:
            S_T_all = S_T_all[:paths]
            A_all = A_all[:paths]
    else:
        S_T_all, A_all = S_T_base, A_base

    if option_type == "call":
        payoff = np.maximum(S_T_all - A_all, 0.0)
    else:
        payoff = np.maximum(A_all - S_T_all, 0.0)

    disc = df * payoff
    price = float(np.mean(disc))
    stderr = float(np.std(disc, ddof=1) / math.sqrt(len(disc)))
    ci_lo, ci_hi = price - 1.96 * stderr, price + 1.96 * stderr

    def _price_only(S0: float, vol_abs: float) -> float:
        rng_local = np.random.default_rng(seed)
        def sim_side(sign: float):
            S = np.full(base_paths, S0, dtype=float)
            arith_acc = np.zeros(base_paths, dtype=float)
            drift_loc = (r - q - 0.5 * vol_abs**2) * dt
            vol_loc = vol_abs * math.sqrt(dt)
            for _ in range(remaining):
                Z = rng_local.standard_normal(base_paths)
                S *= np.exp(drift_loc + sign * vol_loc * Z)
                arith_acc += S
            return S.copy(), (arith_sum_obs + arith_acc) / N

        S_T_b, A_b = sim_side(+1.0)
        if use_antithetic:
            rng_local = np.random.default_rng(seed)
            S_T_a, A_a = sim_side(-1.0)
            S_T_all2 = np.concatenate([S_T_b, S_T_a], axis=0)
            A_all2 = np.concatenate([A_b, A_a], axis=0)
            if S_T_all2.shape[0] > paths:
                S_T_all2 = S_T_all2[:paths]
                A_all2 = A_all2[:paths]
        else:
            S_T_all2, A_all2 = S_T_b, A_b

        if option_type == "call":
            payoff_loc = np.maximum(S_T_all2 - A_all2, 0.0)
        else:
            payoff_loc = np.maximum(A_all2 - S_T_all2, 0.0)
        return float(df * np.mean(payoff_loc))

    p_up = _price_only(spot + delta_bump, sigma)
    p_dn = _price_only(max(1e-12, spot - delta_bump), sigma)
    delta = (p_up - p_dn) / (2.0 * delta_bump)

    vol_up = sigma + vega_bump
    vol_dn = max(1e-12, sigma - vega_bump)
    p_up_v = _price_only(spot, vol_up)
    p_dn_v = _price_only(spot, vol_dn)
    vega = (p_up_v - p_dn_v) / (2.0 * vega_bump)

    print("=== Results ===")
    print(f"price={price:.6f}  stderr={stderr:.6f}  ci95=({ci_lo:.6f}, {ci_hi:.6f})")
    print("=== Greeks (CRN) ===")
    print(f"Delta={delta:.6f}  Vega={vega:.6f}")
    print(f"antithetic={use_antithetic}  seed={seed}")

    extras = {
        "antithetic": use_antithetic,
        "seed": seed,
        "paths": paths,
        "delta_bump": delta_bump,
        "vega_bump": vega_bump,
    }
    return MCResult(price=price, stderr=stderr, ci95=(ci_lo, ci_hi), extras=extras)


def black_scholes_call(S0: float, K: float, r: float, q: float, sigma: float, T: float) -> float:
    """Vanilla Black–Scholes call price (continuous dividend yield q)."""
    if T <= 0 or sigma <= 0:
        F = S0 * math.exp((r - q) * T)
        return math.exp(-r * T) * max(F - K, 0.0)

    d1 = (math.log(S0 / K) + (r - q + 0.5 * sigma**2) * T) / (sigma * math.sqrt(T))
    d2 = d1 - sigma * math.sqrt(T)
    from math import erf, sqrt

    def norm_cdf(x):
        return 0.5 * (1.0 + erf(x / sqrt(2.0)))

    return S0 * math.exp(-q * T) * norm_cdf(d1) - K * math.exp(-r * T) * norm_cdf(d2)




In [9]:
if __name__ == "__main__":
    # Example 1: Unseasoned fixed-strike call
    _ = price_asian_mc(
        spot=100, strike=100, r=0.03, q=0.00, sigma=0.20, T=0.5, N=12,
        option_type="call", strike_type="fixed", paths=200_000, seed=7
    )

    # Example 2: Seasoned floating-strike put
    obs = [98.2, 99.0, 101.3, 100.7]  # observed fixings
    _ = price_asian_mc_floating(
        spot=100, r=0.02, q=0.01, sigma=0.25, T=1.0, N=24, option_type="put",
        paths=250_000, seed=11, observed_fixings=obs
    )

    # Example 3: N=1 sanity vs Black–Scholes (fixed-strike)
    res3 = price_asian_mc(
        spot=100, strike=100, r=0.01, q=0.00, sigma=0.18, T=1.0, N=1,
        option_type="call", strike_type="fixed", paths=300_000, seed=99
    )
    bs = black_scholes_call(100, 100, 0.01, 0.00, 0.18, 1.0)
    print("=== Vanilla Cross-Check ===")
    print(f"MC={res3.price:.6f} vs BS={bs:.6f} (diff={res3.price - bs:.6f})")

=== Inputs Summary ===
S0=100.000000  r=0.030000  q=0.000000  sigma=0.200000  T=0.500000
N=12  type=call/fixed  paths=200000  seed=7
K=100.000000
seasoning: none
antithetic=True  geom_cv=True
delta_bump=1.000000  vega_bump=0.002000
=== RNG Check (Z ~ N(0,1)) ===
mean=-0.0055  std=0.9967  min=-4.5022  max=4.0616
=== First-Step Log-Return Check ===
empirical_mean=0.000532  theory_mean=0.000417
=== Results ===
price=3.500465  stderr=0.000238  ci95=(3.499998, 3.500931)
=== Greeks (CRN) ===
Delta=0.528852  Vega=14.340360
control_variate: used  corr=1.000
antithetic=True  seed=7
=== Inputs Summary ===
S0=100.000000  r=0.020000  q=0.010000  sigma=0.250000  T=1.000000
N=24  type=put/floating  paths=250000  seed=11
seasoning: 4/24 fixes locked (first=98.200000, last=100.700000)
antithetic=True  geom_cv=False
delta_bump=1.000000  vega_bump=0.002500
=== RNG Check (Z ~ N(0,1)) ===
mean=-0.0015  std=0.9998  min=-3.7079  max=4.8749
=== First-Step Log-Return Check ===
empirical_mean=-0.000837  theory