In [3]:
import numpy as np
from scipy.stats import norm, qmc
import time

# =======================================
# 1. Hull-White short-rate path simulator
# =======================================

def simulate_hull_white_paths(
    n_paths,
    n_steps,
    T,
    r0,
    a,
    sigma,
    theta_func=None,
    use_sobol=False,
    antithetic=False,
    seed=None,
):
    """
    Simulate short-rate paths under the one-factor Hull-White model using Euler-Maruyama:

        dr_t = [theta(t) - a r_t] dt + sigma dW_t
    """
    dt = T / n_steps
    t_grid = np.linspace(0.0, T, n_steps + 1)

    if theta_func is None:
        def theta_func(t):
            return a * r0

    if use_sobol:
        sobol_engine = qmc.Sobol(d=n_steps, scramble=True, seed=seed)
        u = sobol_engine.random(n_paths)
        eps = np.finfo(float).eps
        u = np.clip(u, eps, 1 - eps)
        Z = norm.ppf(u)
    else:
        rng = np.random.default_rng(seed)
        if antithetic:
            n_half = (n_paths + 1) // 2
            Z_half = rng.standard_normal(size=(n_half, n_steps))
            Z = np.vstack([Z_half, -Z_half])[:n_paths, :]
        else:
            Z = rng.standard_normal(size=(n_paths, n_steps))

    r_paths = np.zeros((n_paths, n_steps + 1))
    r_paths[:, 0] = r0

    for i in range(n_steps):
        t = t_grid[i]
        theta_t = theta_func(t)
        r_t = r_paths[:, i]
        dr = (theta_t - a * r_t) * dt + sigma * np.sqrt(dt) * Z[:, i]
        r_paths[:, i + 1] = r_t + dr

    return t_grid, r_paths


# ========================================
# 2. Discount factors, bonds, swaps, rates
# ========================================

def compute_discount_factors(r_paths, dt):
    integrals = np.cumsum(r_paths[:, :-1] * dt, axis=1)
    integrals = np.hstack([np.zeros((r_paths.shape[0], 1)), integrals])
    disc = np.exp(-integrals)
    return disc


def bond_price_on_paths(disc, idx_t, idx_T):
    return disc[:, idx_T] / disc[:, idx_t]


def swap_value_on_paths(disc, t_grid, K, pay_indices, idx_t):
    """
    Payer swap value at time index idx_t along each path:

        V_swap(t) = P(t,T_0) - P(t,T_N) - K * sum_i Delta_i P(t,T_i)
    """
    pay_indices = np.asarray(pay_indices, dtype=int)
    idx_T0 = pay_indices[0]
    idx_TN = pay_indices[-1]

    P_tT0 = bond_price_on_paths(disc, idx_t, idx_T0)
    P_tTN = bond_price_on_paths(disc, idx_t, idx_TN)
    P_tTi = disc[:, pay_indices] / disc[:, [idx_t]]

    T_pay = t_grid[pay_indices]
    Delta = np.empty_like(T_pay)
    Delta[0] = T_pay[0] - t_grid[idx_t]
    Delta[1:] = T_pay[1:] - T_pay[:-1]

    fixed_leg = K * np.dot(P_tTi, Delta)
    V_swap = P_tT0 - P_tTN - fixed_leg
    return V_swap


def forward_swap_rate_on_paths(disc, t_grid, pay_indices, idx_t):
    """
    Forward par swap rate at time index idx_t along each path.
    """
    pay_indices = np.asarray(pay_indices, dtype=int)
    idx_T0 = pay_indices[0]
    idx_TN = pay_indices[-1]

    P_tT0 = bond_price_on_paths(disc, idx_t, idx_T0)
    P_tTN = bond_price_on_paths(disc, idx_t, idx_TN)
    P_tTi = disc[:, pay_indices] / disc[:, [idx_t]]

    T_pay = t_grid[pay_indices]
    Delta = np.empty_like(T_pay)
    Delta[0] = T_pay[0] - t_grid[idx_t]
    Delta[1:] = T_pay[1:] - T_pay[:-1]

    denom = np.dot(P_tTi, Delta)
    denom = np.where(denom == 0.0, np.nan, denom)
    F = (P_tT0 - P_tTN) / denom
    return F


# =======================================
# 3. LSM Bermudan swaption pricer
# =======================================

def bermudan_pathwise_pv(
    K,
    exercise_indices,
    pay_indices,
    t_grid,
    r_paths,
):
    """
    LSM Bermudan pricer returning pathwise discounted payoffs X_i.
    """
    n_paths, _ = r_paths.shape
    dt = t_grid[1] - t_grid[0]

    disc = compute_discount_factors(r_paths, dt)
    exercise_indices = np.asarray(exercise_indices, dtype=int)
    pay_indices = np.asarray(pay_indices, dtype=int)

    n_ex = len(exercise_indices)
    swap_values = np.zeros((n_paths, n_ex))
    swap_rates = np.zeros((n_paths, n_ex))
    for j, idx_t in enumerate(exercise_indices):
        swap_values[:, j] = swap_value_on_paths(disc, t_grid, K, pay_indices, idx_t)
        swap_rates[:, j] = forward_swap_rate_on_paths(disc, t_grid, pay_indices, idx_t)

    payoffs = np.maximum(swap_values, 0.0)

    cashflow = np.zeros(n_paths)
    exercise_time_idx = np.full(n_paths, exercise_indices[-1], dtype=int)

    last_ex_idx = exercise_indices[-1]
    last_ex_pos = n_ex - 1
    exercise_now = payoffs[:, last_ex_pos] > 0.0
    cashflow[exercise_now] = payoffs[exercise_now, last_ex_pos]
    exercise_time_idx[exercise_now] = last_ex_idx

    for ex_pos in range(n_ex - 2, -1, -1):
        idx_t = exercise_indices[ex_pos]

        alive = exercise_time_idx > idx_t
        if not np.any(alive):
            continue

        alive_idx = np.where(alive)[0]

        df_ex = disc[alive_idx, exercise_time_idx[alive_idx]]
        df_t = disc[alive_idx, idx_t]
        cont_values = cashflow[alive_idx] * df_ex / df_t

        itm_mask = payoffs[alive_idx, ex_pos] > 0.0
        if np.sum(itm_mask) >= 3:
            Xr = swap_rates[alive_idx, ex_pos][itm_mask]
            Yr = cont_values[itm_mask]
            A = np.vstack([np.ones_like(Xr), Xr, Xr ** 2]).T
            beta, *_ = np.linalg.lstsq(A, Yr, rcond=None)
            X_all = swap_rates[alive_idx, ex_pos]
            A_all = np.vstack([np.ones_like(X_all), X_all, X_all ** 2]).T
            C_hat = A_all @ beta
        else:
            C_hat = np.full_like(cont_values, np.mean(cont_values) if cont_values.size > 0 else 0.0)

        immediate = payoffs[alive_idx, ex_pos]
        exercise_decision = immediate > C_hat

        exercise_paths = alive_idx[exercise_decision]
        cashflow[exercise_paths] = immediate[exercise_decision]
        exercise_time_idx[exercise_paths] = idx_t

    df_0 = disc[np.arange(n_paths), exercise_time_idx]
    pv = cashflow * df_0
    return pv


def price_bermudan_swaption_lsm(
    K,
    exercise_indices,
    pay_indices,
    t_grid,
    r_paths,
):
    pv = bermudan_pathwise_pv(K, exercise_indices, pay_indices, t_grid, r_paths)
    return np.mean(pv)


# =======================================
# 4. European swaption under Hull–White (MC)
# =======================================

def price_european_swaption_hw_mc(
    K,
    exercise_index,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    theta_func=None,
    seed=None,
):
    """
    European payer swaption under Hull–White, via Monte Carlo:

        payoff = max(V_swap(T0), 0) discounted back to time 0.
    """
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=False,
        antithetic=False,
        seed=seed,
    )
    dt = t_grid[1] - t_grid[0]
    disc = compute_discount_factors(r_paths, dt)

    idx_T0 = exercise_index
    V_swap_T0 = swap_value_on_paths(disc, t_grid, K, pay_indices, idx_T0)
    payoff = np.maximum(V_swap_T0, 0.0)

    df_T0_paths = disc[:, idx_T0]
    Y = payoff * df_T0_paths
    return np.mean(Y)


def estimate_hw_cv_statistics(
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    theta_func=None,
    seed=None,
):
    """
    One large HW simulation to get X_i (Bermudan) and Y_i (European),
    then compute rho, rho^2 and beta* = Cov(X,Y)/Var(Y).
    """
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=False,
        antithetic=False,
        seed=seed,
    )
    dt = t_grid[1] - t_grid[0]
    disc = compute_discount_factors(r_paths, dt)

    # Bermudan pathwise PV
    X = bermudan_pathwise_pv(K, exercise_indices, pay_indices, t_grid, r_paths)

    # European pathwise PV on same paths
    idx_T0 = exercise_indices[0]
    V_swap_T0 = swap_value_on_paths(disc, t_grid, K, pay_indices, idx_T0)
    payoff_eur = np.maximum(V_swap_T0, 0.0)
    df_T0_paths = disc[:, idx_T0]
    Y = payoff_eur * df_T0_paths

    var_X = np.var(X, ddof=1)
    var_Y = np.var(Y, ddof=1)
    cov_XY = np.cov(X, Y, ddof=1)[0, 1]

    rho_hat = cov_XY / np.sqrt(var_X * var_Y)
    rho2_hat = rho_hat ** 2
    beta_star = cov_XY / var_Y
    vrf_theoretical = 1.0 / (1.0 - rho2_hat) if rho2_hat < 1.0 else np.inf

    return rho_hat, rho2_hat, beta_star, vrf_theoretical


# =======================================
# 5. Pricing wrappers
# =======================================

def price_bermudan_standard_mc(
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    theta_func=None,
    seed=None,
):
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=False,
        antithetic=False,
        seed=seed,
    )
    return price_bermudan_swaption_lsm(K, exercise_indices, pay_indices, t_grid, r_paths)


def price_bermudan_antithetic(
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    theta_func=None,
    seed=None,
):
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=False,
        antithetic=True,
        seed=seed,
    )
    return price_bermudan_swaption_lsm(K, exercise_indices, pay_indices, t_grid, r_paths)


def price_bermudan_sobol_qmc(
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    theta_func=None,
    seed=None,
):
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=True,
        antithetic=False,
        seed=seed,
    )
    return price_bermudan_swaption_lsm(K, exercise_indices, pay_indices, t_grid, r_paths)


def price_bermudan_with_cv_hw(
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_paths,
    n_steps,
    mu_Y,       # E_HW[Y] (European price under HW, via MC or analytic)
    beta_cv,    # beta* = Cov(X,Y)/Var(Y) estimated under HW
    theta_func=None,
    seed=None,
):
    """
    Model-consistent control variate:

        V̂_CV = V̂_MC + beta_cv * (mu_Y - Ŷ_MC),

    where both X (Bermudan) and Y (European) are simulated under Hull–White.
    This is (asymptotically) unbiased for the Hull–White Bermudan price.
    """
    t_grid, r_paths = simulate_hull_white_paths(
        n_paths=n_paths,
        n_steps=n_steps,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        theta_func=theta_func,
        use_sobol=False,
        antithetic=False,
        seed=seed,
    )
    dt = t_grid[1] - t_grid[0]
    disc = compute_discount_factors(r_paths, dt)

    # Bermudan estimate V̂_MC
    berm_price_mc = price_bermudan_swaption_lsm(
        K, exercise_indices, pay_indices, t_grid, r_paths
    )

    # European Ŷ_MC on the same paths
    idx_T0 = exercise_indices[0]
    V_swap_T0 = swap_value_on_paths(disc, t_grid, K, pay_indices, idx_T0)
    payoff_eur = np.maximum(V_swap_T0, 0.0)
    df_T0_paths = disc[:, idx_T0]
    Y = payoff_eur * df_T0_paths
    Y_bar = np.mean(Y)

    price_cv = berm_price_mc + beta_cv * (mu_Y - Y_bar)
    return price_cv


# =======================================
# 6. Variance estimation helpers (with timing)
# =======================================

def estimate_variance_for_method(method, M, R, base_kwargs):
    """
    Run a given pricing method R times with different seeds, for n_paths = M.
    Returns (mean, variance, std_dev_of_one_run, elapsed_time).
    """
    prices = np.zeros(R)
    start = time.perf_counter()
    for r in range(R):
        seed = 1000 + r
        kwargs = base_kwargs.copy()
        kwargs.update({"n_paths": M, "seed": seed})
        prices[r] = method(**kwargs)
    elapsed = time.perf_counter() - start

    mean = np.mean(prices)
    var = np.var(prices, ddof=1)
    se = np.sqrt(var)
    return mean, var, se, elapsed


def estimate_variances_all_methods(
    M,
    R,
    K,
    exercise_indices,
    pay_indices,
    T,
    r0,
    a,
    sigma,
    n_steps,
    mu_Y,
    beta_cv,
    theta_func=None,
):
    base_common = dict(
        K=K,
        exercise_indices=exercise_indices,
        pay_indices=pay_indices,
        T=T,
        r0=r0,
        a=a,
        sigma=sigma,
        n_steps=n_steps,
        theta_func=theta_func,
    )

    mean_std, var_std, se_std, t_std = estimate_variance_for_method(
        price_bermudan_standard_mc, M, R, base_common
    )

    mean_anti, var_anti, se_anti, t_anti = estimate_variance_for_method(
        price_bermudan_antithetic, M, R, base_common
    )

    mean_sob, var_sob, se_sob, t_sob = estimate_variance_for_method(
        price_bermudan_sobol_qmc, M, R, base_common
    )

    base_cv = base_common.copy()
    base_cv.update(dict(mu_Y=mu_Y, beta_cv=beta_cv))
    mean_cv, var_cv, se_cv, t_cv = estimate_variance_for_method(
        price_bermudan_with_cv_hw, M, R, base_cv
    )

    return {
        "standard":   (mean_std,  var_std,  se_std, t_std),
        "antithetic": (mean_anti, var_anti, se_anti, t_anti),
        "sobol":      (mean_sob,  var_sob,  se_sob, t_sob),
        "control":    (mean_cv,   var_cv,   se_cv,  t_cv),
    }




In [4]:
# =======================================
# 7. Main experiment
# =======================================

if __name__ == "__main__":
    # Contract and model
    T_swaption = 5.0
    T_swap_maturity = 10.0
    dt = 1.0 / 52.0
    n_steps = int(T_swap_maturity / dt)

    r0 = 0.02
    a = 0.1
    sigma = 0.01
    K = 0.02

    t_grid = np.linspace(0.0, T_swap_maturity, n_steps + 1)
    exercise_times = np.arange(T_swaption, T_swap_maturity + 1e-12, 0.25)
    exercise_indices = [int(round(t / dt)) for t in exercise_times]
    pay_indices = exercise_indices

    # --- European HW price mu_Y (high-precision MC) ---
    n_paths_eur = 200_000
    seed_eur = 42
    mu_Y = price_european_swaption_hw_mc(
        K=K,
        exercise_index=exercise_indices[0],
        pay_indices=pay_indices,
        T=T_swap_maturity,
        r0=r0,
        a=a,
        sigma=sigma,
        n_paths=n_paths_eur,
        n_steps=n_steps,
        theta_func=None,
        seed=seed_eur,
    )
    print(f"European payer swaption HW price (mu_Y) ≈ {mu_Y:.6f}\n")

    # --- CV statistics: rho, beta*, theoretical VRF ---
    n_paths_stat = 200_000
    rho_hat, rho2_hat, beta_star, vrf_theoretical = estimate_hw_cv_statistics(
        K=K,
        exercise_indices=exercise_indices,
        pay_indices=pay_indices,
        T=T_swap_maturity,
        r0=r0,
        a=a,
        sigma=sigma,
        n_paths=n_paths_stat,
        n_steps=n_steps,
        theta_func=None,
        seed=123,
    )
    theoretical_red = 100.0 * rho2_hat

    print(f"Hull–White CV correlation ρ ≈ {rho_hat:.4f}")
    print(f"ρ² ≈ {rho2_hat:.4f}  (theoretical fraction of variance removed)")
    print(f"Estimated β* ≈ {beta_star:.4f}")
    print(f"Theoretical VRF (optimal β) ≈ {vrf_theoretical:.2f}")
    print(f"Theoretical variance reduction for CV ≈ {theoretical_red:.1f}%\n")

    # --- Variance comparison ---
    Ms = [4_096, 8_192, 16_384, 32_768, 65_536, 131_072]
    R = 30

    for M in Ms:
        res = estimate_variances_all_methods(
            M=M,
            R=R,
            K=K,
            exercise_indices=exercise_indices,
            pay_indices=pay_indices,
            T=T_swap_maturity,
            r0=r0,
            a=a,
            sigma=sigma,
            n_steps=n_steps,
            mu_Y=mu_Y,
            beta_cv=beta_star,
            theta_func=None,
        )

        mean_std, var_std, se_std, t_std = res["standard"]
        mean_anti, var_anti, se_anti, t_anti = res["antithetic"]
        mean_sob, var_sob, se_sob, t_sob = res["sobol"]
        mean_cv, var_cv, se_cv, t_cv     = res["control"]

        vrf_anti = var_std / var_anti
        vrf_sob  = var_std / var_sob
        vrf_cv   = var_std / var_cv

        red_anti = 100.0 * (1.0 - 1.0 / vrf_anti)
        red_sob  = 100.0 * (1.0 - 1.0 / vrf_sob)
        red_cv   = 100.0 * (1.0 - 1.0 / vrf_cv)

        print(f"M = {M:6d}")
        print(f"  Standard MC      : mean={mean_std:.6f},  SE={se_std:.6f},  "
              f"time={t_std:.2f}s")
        print(f"  Antithetic MC    : mean={mean_anti:.6f}, SE={se_anti:.6f}, "
              f"VRF={vrf_anti:.2f}, variance red.={red_anti:.1f}%, "
              f"time={t_anti:.2f}s")
        print(f"  Sobol QMC        : mean={mean_sob:.6f},  SE={se_sob:.6f},  "
              f"VRF={vrf_sob:.2f}, variance red.={red_sob:.1f}%, "
              f"time={t_sob:.2f}s")
        print(f"  HW CV (β*)       : mean={mean_cv:.6f},   SE={se_cv:.6f},   "
              f"VRF={vrf_cv:.2f}, variance red.={red_cv:.1f}% "
              f"(theory ≈ {theoretical_red:.1f}%), "
              f"time={t_cv:.2f}s")
        print()


European payer swaption HW price (mu_Y) ≈ 0.027921

Hull–White CV correlation ρ ≈ 0.8649
ρ² ≈ 0.7481  (theoretical fraction of variance removed)
Estimated β* ≈ 1.2478
Theoretical VRF (optimal β) ≈ 3.97
Theoretical variance reduction for CV ≈ 74.8%

M =   4096
  Standard MC      : mean=0.085009,  SE=0.000928,  time=3.81s
  Antithetic MC    : mean=0.084941, SE=0.000391, VRF=5.62, variance red.=82.2%, time=5.00s
  Sobol QMC        : mean=0.084930,  SE=0.000304,  VRF=9.30, variance red.=89.2%, time=11.47s
  HW CV (β*)       : mean=0.085273,   SE=0.000574,   VRF=2.61, variance red.=61.7% (theory ≈ 74.8%), time=9.31s

M =   8192
  Standard MC      : mean=0.084838,  SE=0.000652,  time=13.82s
  Antithetic MC    : mean=0.084863, SE=0.000306, VRF=4.54, variance red.=78.0%, time=17.43s
  Sobol QMC        : mean=0.084809,  SE=0.000208,  VRF=9.85, variance red.=89.8%, time=24.77s
  HW CV (β*)       : mean=0.085133,   SE=0.000375,   VRF=3.02, variance red.=66.8% (theory ≈ 74.8%), time=22.26s

M =  1