In [1]:
# -*- coding: utf-8 -*-
import numpy as np
from numpy import log, sqrt, exp
from dataclasses import dataclass
from typing import Literal, Optional, Tuple
from scipy.stats import norm
from scipy.optimize import minimize, Bounds
from scipy.integrate import quad

# -----------------------------
# 1) Black–Scholes–Merton (BSM)
# -----------------------------
def bsm_price_cp(S, K, T, r=0.0, q=0.0, sigma=0.2, cp: Literal['call','put']='call'):
    """BSM 欧式定价，支持向量化。"""
    S = np.asarray(S); K = np.asarray(K); T = np.asarray(T)
    eps = 1e-16
    T = np.maximum(T, eps)  # 避免 T=0 数值问题
    sigma = np.maximum(sigma, 1e-12)
    d1 = (np.log(S / K) + (r - q + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    df_r = np.exp(-r * T)
    df_q = np.exp(-q * T)
    if cp == 'call':
        return df_q * S * norm.cdf(d1) - df_r * K * norm.cdf(d2)
    else:
        return df_r * K * norm.cdf(-d2) - df_q * S * norm.cdf(-d1)

def bsm_vega(S, K, T, r=0.0, q=0.0, sigma=0.2):
    """BSM Vega（对σ的导数），用于校准加权。"""
    S = np.asarray(S); K = np.asarray(K); T = np.asarray(T)
    T = np.maximum(T, 1e-16)
    d1 = (np.log(S / K) + (r - q + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    return np.exp(-q * T) * S * norm.pdf(d1) * np.sqrt(T)

def bsm_implied_vol(C_mkt, S, K, T, r=0.0, q=0.0, cp='call', tol=1e-8, max_iter=100):
    """Brent 法求隐含波动率（逐点）。返回与 C_mkt 同形状的 σ。"""
    C_mkt = np.asarray(C_mkt, dtype=float)
    S = np.asarray(S); K = np.asarray(K); T = np.asarray(T)
    out = np.full_like(C_mkt, np.nan, dtype=float)

    # 无套利价格区间（近似）检查
    intrinsic = np.maximum(0.0, (S*np.exp(-q*T) - K*np.exp(-r*T)) if cp=='call' else (K*np.exp(-r*T)-S*np.exp(-q*T)))
    upper = S*np.exp(-q*T) if cp=='call' else K*np.exp(-r*T)
    bad = (C_mkt < intrinsic - 1e-12) | (C_mkt > upper + 1e-12)
    if np.any(bad):
        # 越界的点留 NaN；可根据需要 clip 到 [intrinsic, upper]
        pass

    # 对每个点用 Brent
    for idx in np.ndindex(C_mkt.shape):
        if bad[idx] or T[idx] <= 0:
            continue
        target = C_mkt[idx]
        if target <= intrinsic[idx] + 1e-14:
            out[idx] = 1e-12
            continue

        # 构造标的函数
        def f(sig):
            return bsm_price_cp(S[idx], K[idx], T[idx], r, q, sig, cp) - target

        # 扫描找到包根区间
        a, b = 1e-9, 5.0
        fa, fb = f(a), f(b)
        # 如果端点符号相同，扩张上界几次
        expand = 0
        while fa * fb > 0 and expand < 10:
            b *= 2
            fb = f(b)
            expand += 1
        if fa * fb > 0:
            # 仍失败，给个近似（牛顿一步）
            out[idx] = np.nan
            continue

        # Brent
        lo, hi = a, b
        f_lo, f_hi = fa, fb
        for _ in range(max_iter):
            mid = 0.5 * (lo + hi)
            f_mid = f(mid)
            if abs(f_mid) < tol or (hi - lo) < 1e-10:
                out[idx] = max(mid, 1e-12)
                break
            if f_lo * f_mid <= 0:
                hi, f_hi = mid, f_mid
            else:
                lo, f_lo = mid, f_mid
        else:
            out[idx] = np.nan
    return out


# -----------------------------
# 2) Heston 欧式定价（Call）
#     Heston (1993) 半闭式积分：C = S e^{-qT} P1 - K e^{-rT} P2
#     Pj = 1/2 + 1/π ∫ Re{ e^{-i u ln K} φ(u - i(j-1)) / (i u) } du, u∈(0,∞)
# -----------------------------
@dataclass
class HestonParams:
    kappa: float  # 均值回复速度
    theta: float  # 长期方差
    sigma: float  # 波动率的波动（vol of vol）
    rho: float    # 相关系数
    v0: float     # 初始方差

def _heston_cf(u: complex, T: float, r: float, q: float, S0: float, p: HestonParams) -> complex:
    """Heston 特征函数 φ(u) = E[e^{iu ln S_T}] （risk-neutral）。"""
    kappa, theta, sigma, rho, v0 = p.kappa, p.theta, p.sigma, p.rho, p.v0
    x0 = np.log(S0)
    iu = 1j * u
    a = kappa * theta
    b = kappa
    d = np.sqrt((rho * sigma * iu - b)**2 + (sigma**2) * (iu + u**2))
    gp = (b - rho * sigma * iu + d) / (b - rho * sigma * iu - d)
    # 避免数值发散的小扰动
    gp = gp + 0j
    logG = np.log(gp)
    C = (r - q) * iu * T + (a / (sigma**2)) * ((b - rho * sigma * iu + d) * T - 2.0 * logG)
    D = ((b - rho * sigma * iu + d) / (sigma**2)) * (1.0 - np.exp(d * T)) / (1.0 - gp * np.exp(d * T))
    return np.exp(C + D * v0 + iu * x0)

def _Pj(j: int, S0: float, K: float, T: float, r: float, q: float, params: HestonParams) -> float:
    """计算 Heston 的 P1 或 P2 概率项。数值积分使用 quad(0, ∞)。"""
    assert j in (1, 2)
    # 按 Heston 公式，φ(u - i*(j-1))
    shift = 1j * (j - 1)
    lnK = np.log(K)
    def integrand(u):
        u = float(u)
        cf = _heston_cf(u - 1j * (j - 1), T, r, q, S0, params)
        # integrand = Re( e^{-iu lnK} * cf / (i u) )
        val = np.exp(-1j * u * lnK) * cf / (1j * u)
        return np.real(val)

    # 数值积分：在 (0, ∞) 上积分，加入阻尼避免高频震荡
    # 经验上，积分上限用 200~300 对大多数标的已足够，可根据需要调大
    upper = 200.0
    res, _ = quad(integrand, 1e-10, upper, limit=200, epsabs=1e-8, epsrel=1e-6)
    return 0.5 + res / np.pi

def heston_call_price(S, K, T, r=0.0, q=0.0, params: HestonParams = None):
    """Heston 欧式看涨期权价格。支持向量化（逐点积分）。"""
    S = np.atleast_1d(S); K = np.atleast_1d(K); T = np.atleast_1d(T)
    out = np.empty_like(np.broadcast_to(S, np.broadcast(S, K, T).shape), dtype=float)

    it = np.nditer([S, K, T, out], flags=['multi_index'], op_flags=[['readonly']]*3 + [['writeonly']])
    for s, k, tt, o in it:
        tt = max(float(tt), 1e-16)
        P1 = _Pj(1, float(s), float(k), tt, r, q, params)
        P2 = _Pj(2, float(s), float(k), tt, r, q, params)
        price = np.exp(-q * tt) * float(s) * P1 - np.exp(-r * tt) * float(k) * P2
        o[...] = price
    return out

# -----------------------------
# 3) 校准器（最小二乘）
# -----------------------------
@dataclass
class CalibResult:
    x: np.ndarray
    success: bool
    fun: float
    message: str
    model: str

def _ssq(prices_model, prices_mkt, weights=None):
    err = prices_model - prices_mkt
    if weights is None:
        return np.sum(err**2)
    return np.sum((err * weights)**2)

def calibrate_bsm(S, K, T, C_mkt, r=0.0, q=0.0,
                  init_sigma: float = 0.2,
                  weights: Optional[str] = None) -> CalibResult:
    """
    BSM 校准：仅一个参数 sigma。
    weights: None 或 'vega'（用 BSM vega 加权，弱化深度虚值/深度实值点）
    """
    S = np.asarray(S); K = np.asarray(K); T = np.asarray(T); C_mkt = np.asarray(C_mkt)
    def objective(x):
        sigma = max(x[0], 1e-12)
        C_mod = bsm_price_cp(S, K, T, r, q, sigma, 'call')
        if weights == 'vega':
            w = 1.0 / (bsm_vega(S, K, T, r, q, sigma) + 1e-12)
        else:
            w = None
        return _ssq(C_mod, C_mkt, w)

    bnds = Bounds([1e-6], [5.0])
    res = minimize(objective, x0=np.array([init_sigma]), method='L-BFGS-B', bounds=bnds)
    return CalibResult(x=res.x, success=res.success, fun=res.fun, message=res.message, model='BSM')

def calibrate_heston(S, K, T, C_mkt, r=0.0, q=0.0,
                     init: Tuple[float,float,float,float,float]=(1.0, 0.04, 0.5, -0.6, 0.04),
                     bounds=((1e-6, 10.0),   # kappa
                             (1e-6,  2.0),   # theta
                             (1e-6,  5.0),   # sigma
                             (-0.999, 0.999),# rho
                             (1e-6,  2.0)),  # v0
                     vega_weight: bool=True,
                     feller_penalty: float=1e4) -> CalibResult:
    """
    Heston 校准：x = (kappa, theta, sigma, rho, v0)
    - 可选 Vega 加权（用当前点的 BSM vega 近似权重）
    - 可选 Feller 条件惩罚：2*kappa*theta > sigma^2
    """
    S = np.asarray(S); K = np.asarray(K); T = np.asarray(T); C_mkt = np.asarray(C_mkt)

    def objective(x):
        kappa, theta, sigma, rho, v0 = x
        params = HestonParams(kappa, theta, sigma, rho, v0)
        C_mod = heston_call_price(S, K, T, r, q, params)
        if vega_weight:
            # 采用 BSM vega 作为权重（以 Heston 的等效 σ 近似：用整体一个 σ0 或逐点隐含波动率均可；这里用单一σ0）
            # 为简洁，这里用样本平均反推一个 σ0（用 BSM 近似拟合一次）
            # 你也可以改成点对点的 bsm_implied_vol。
            sigma0 = 0.2
            w = 1.0 / (bsm_vega(S, K, T, r, q, sigma0) + 1e-12)
        else:
            w = None
        ssq = _ssq(C_mod, C_mkt, w)

        # Feller 惩罚（违反时加罚）
        feller = 2*kappa*theta - sigma**2
        if feller <= 0:
            ssq += feller_penalty * (1 - np.tanh(10 * feller))  # 软惩罚
        return ssq

    lb = np.array([b[0] for b in bounds])
    ub = np.array([b[1] for b in bounds])
    bnds = Bounds(lb, ub)

    res = minimize(objective, x0=np.array(init), method='L-BFGS-B', bounds=bnds,
                   options=dict(maxiter=200, ftol=1e-9))
    return CalibResult(x=res.x, success=res.success, fun=res.fun, message=res.message, model='Heston')

# -----------------------------
# 4) 最小示例（合成数据）
# -----------------------------
if __name__ == "__main__":
    np.random.seed(0)

    # 市场与网格
    S0 = 100.0
    r, q = 0.01, 0.00
    strikes = np.array([80, 90, 100, 110, 120], dtype=float)
    maturities = np.array([0.25, 0.5, 1.0], dtype=float)
    K, T = np.meshgrid(strikes, maturities)
    K = K.ravel(); T = T.ravel()

    # 用 Heston 真值生成“市场价格”，再加噪声
    true_params = HestonParams(kappa=1.2, theta=0.04, sigma=0.5, rho=-0.6, v0=0.05)
    C_true = heston_call_price(S0, K, T, r, q, true_params)
    noise = 0.01  # 加一点微小噪声
    C_mkt = C_true * (1 + noise * np.random.randn(*C_true.shape))

    # 先用 BSM 拟合一个全局 σ
    bsm_res = calibrate_bsm(S0, K, T, C_mkt, r, q, init_sigma=0.2, weights='vega')
    print("BSM 校准：sigma = %.6f, SSE = %.6f, success=%s" % (bsm_res.x[0], bsm_res.fun, bsm_res.success))

    # 再用 Heston 校准
    heston_init = (1.0, 0.04, 0.4, -0.5, 0.04)
    heston_res = calibrate_heston(S0, K, T, C_mkt, r, q, init=heston_init, vega_weight=True)
    kappa, theta, sigma, rho, v0 = heston_res.x
    print("Heston 校准：kappa=%.4f theta=%.4f sigma=%.4f rho=%.4f v0=%.4f" %
          (kappa, theta, sigma, rho, v0))
    print("SSE=%.6f, success=%s, message=%s" % (heston_res.fun, heston_res.success, heston_res.message))

    # 用校准参数回算拟合价格误差
    C_fit_bsm = bsm_price_cp(S0, K, T, r, q, bsm_res.x[0], 'call')
    C_fit_heston = heston_call_price(S0, K, T, r, q, HestonParams(*heston_res.x))

    mae_bsm = np.mean(np.abs(C_fit_bsm - C_mkt))
    mae_heston = np.mean(np.abs(C_fit_heston - C_mkt))
    print("平均绝对误差：BSM = %.6f, Heston = %.6f" % (mae_bsm, mae_heston))




BSM 校准：sigma = 0.687758, SSE = 721831.997064, success=True


  res, _ = quad(integrand, 1e-10, upper, limit=200, epsabs=1e-8, epsrel=1e-6)


Heston 校准：kappa=0.0622 theta=0.0016 sigma=5.0000 rho=-0.9361 v0=1.2832
SSE=227394.503088, success=False, message=ABNORMAL_TERMINATION_IN_LNSRCH
平均绝对误差：BSM = 5287.846140, Heston = 1809.831361
