# Heston Calibration - Heatmaps & IV Surfaces

Ce notebook télécharge des options via yfinance, calibre le modèle Heston sur les calls avec PyTorch, puis compare les heatmaps Call/Put Heston avec Black-Scholes. Il affiche également les surfaces d'IV (Market, BS et Heston) pour calls et puts.

## 1. Imports et Configuration

In [None]:
from IPython.display import display
from __future__ import annotations

import math
from pathlib import Path
from typing import Callable, Dict, Tuple

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import torch
import yfinance as yf

from heston_torch import HestonParams, carr_madan_call_torch

torch.set_default_dtype(torch.float64)
DEVICE = torch.device("cpu")
MIN_IV_MATURITY = 0.1

## 2. Fonctions de Téléchargement des Données

In [None]:
def fetch_spot(symbol: str) -> float:
    ticker = yf.Ticker(symbol)
    hist = ticker.history(period="1d")
    if hist.empty:
        raise RuntimeError("Unable to retrieve spot price.")
    return float(hist["Close"].iloc[-1])


def _select_monthly_expirations(expirations, years_ahead: float = 2.5) -> list[str]:
    today = pd.Timestamp.utcnow().date()
    limit_date = today + pd.Timedelta(days=365 * years_ahead)
    monthly: Dict[Tuple[int, int], Tuple[pd.Timestamp, str]] = {}
    for exp in expirations:
        exp_ts = pd.Timestamp(exp)
        exp_date = exp_ts.date()
        if not (today < exp_date <= limit_date):
            continue
        key = (exp_date.year, exp_date.month)
        if key not in monthly or exp_ts < monthly[key][0]:
            monthly[key] = (exp_ts, exp)
    return [item[1] for item in sorted(monthly.values(), key=lambda x: x[0])]


def download_options(symbol: str, option_type: str, years_ahead: float = 2.5) -> pd.DataFrame:
    ticker = yf.Ticker(symbol)
    spot = fetch_spot(symbol)
    expirations = ticker.options
    if not expirations:
        raise RuntimeError(f"No option expirations found for {symbol}")
    selected = _select_monthly_expirations(expirations, years_ahead)
    rows: list[dict] = []
    now = pd.Timestamp.utcnow().tz_localize(None)
    for expiry in selected:
        expiry_dt = pd.Timestamp(expiry)
        T = max((expiry_dt - now).total_seconds() / (365.0 * 24 * 3600), 0.0)
        chain = ticker.option_chain(expiry)
        data = chain.calls if option_type == "call" else chain.puts
        price_col = "C_mkt" if option_type == "call" else "P_mkt"
        for _, row in data.iterrows():
            rows.append(
                {
                    "S0": spot,
                    "K": float(row["strike"]),
                    "T": T,
                    price_col: float(row["lastPrice"]),
                    "iv_market": float(row.get("impliedVolatility", float("nan"))),
                }
            )
    return pd.DataFrame(rows)

## 3. Fonctions de Calibration Heston

In [None]:
def prices_from_unconstrained(
    u: torch.Tensor, S0_t: torch.Tensor, K_t: torch.Tensor, T_t: torch.Tensor, r: float, q: float
) -> torch.Tensor:
    params = HestonParams.from_unconstrained(u[0], u[1], u[2], u[3], u[4])
    prices = []
    for S0_i, K_i, T_i in zip(S0_t, K_t, T_t):
        price_i = carr_madan_call_torch(S0_i, r, q, T_i, params, K_i)
        prices.append(price_i)
    return torch.stack(prices)


def loss(
    u: torch.Tensor,
    S0_t: torch.Tensor,
    K_t: torch.Tensor,
    T_t: torch.Tensor,
    C_mkt_t: torch.Tensor,
    r: float,
    q: float,
) -> torch.Tensor:
    C_heston = prices_from_unconstrained(u, S0_t, K_t, T_t, r, q)
    return torch.mean((C_heston - C_mkt_t) ** 2)


def calibrate_heston_from_calls(
    calls_df: pd.DataFrame,
    r: float = 0.02,
    q: float = 0.0,
    max_points: int = 300,
    max_iters: int = 100,
    lr: float = 5e-3,
    progress_callback: Callable[[int, int, float], None] | None = None,
    log_callback: Callable[[int, float], None] | None = None,
) -> Tuple[dict, list, pd.DataFrame]:
    df = calls_df.copy()
    df = df[df["T"] >= MIN_IV_MATURITY]
    df = df[df["C_mkt"] > 0]
    df = df.sample(n=min(len(df), max_points), random_state=42)

    S0_t = torch.tensor(df["S0"].values, dtype=torch.float64, device=DEVICE)
    K_t = torch.tensor(df["K"].values, dtype=torch.float64, device=DEVICE)
    T_t = torch.tensor(df["T"].values, dtype=torch.float64, device=DEVICE)
    C_mkt_t = torch.tensor(df["C_mkt"].values, dtype=torch.float64, device=DEVICE)

    u = torch.tensor([0.0, 0.0, 0.0, 0.0, 0.0], dtype=torch.float64, device=DEVICE, requires_grad=True)
    optimizer = torch.optim.Adam([u], lr=lr)

    history = []
    for it in range(max_iters):
        optimizer.zero_grad()
        L = loss(u, S0_t, K_t, T_t, C_mkt_t, r, q)
        L.backward()
        optimizer.step()
        curr_loss = float(L.detach().cpu())
        history.append(curr_loss)
        if progress_callback:
            progress_callback(it, max_iters, curr_loss)
        if log_callback and (it % 5 == 0 or it == max_iters - 1):
            log_callback(it, curr_loss)

    params_final = HestonParams.from_unconstrained(u[0], u[1], u[2], u[3], u[4])
    calib = {
        "kappa": float(params_final.kappa.cpu()),
        "theta": float(params_final.theta.cpu()),
        "sigma": float(params_final.sigma.cpu()),
        "rho": float(params_final.rho.cpu()),
        "v0": float(params_final.v0.cpu()),
    }
    summary = pd.DataFrame(
        {
            "points": [len(df)],
            "iters": [max_iters],
            "loss_final": [history[-1] if history else float("nan")],
        }
    )
    return calib, history, summary

## 4. Fonctions Black-Scholes

In [None]:
def bs_d1(S: float, K: float, T: float, r: float, sigma: float) -> float:
    return (math.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * math.sqrt(T))


def bs_d2(S: float, K: float, T: float, r: float, sigma: float) -> float:
    return bs_d1(S, K, T, r, sigma) - sigma * math.sqrt(T)


def norm_cdf(x: float) -> float:
    from scipy.stats import norm
    return norm.cdf(x)


def bs_call_price(S: float, K: float, T: float, r: float, sigma: float) -> float:
    d1 = bs_d1(S, K, T, r, sigma)
    d2 = bs_d2(S, K, T, r, sigma)
    return S * norm_cdf(d1) - K * math.exp(-r * T) * norm_cdf(d2)


def bs_put_price(S: float, K: float, T: float, r: float, sigma: float) -> float:
    d1 = bs_d1(S, K, T, r, sigma)
    d2 = bs_d2(S, K, T, r, sigma)
    return K * math.exp(-r * T) * norm_cdf(-d2) - S * norm_cdf(-d1)


def implied_vol(
    price: float, S: float, K: float, T: float, r: float, option_type: str = "call"
) -> float:
    if price <= 0:
        return float("nan")
    func = bs_call_price if option_type == "call" else bs_put_price
    sigma = 0.3
    for _ in range(50):
        theo = func(S, K, T, r, sigma)
        if abs(theo - price) < 1e-6:
            break
        vega = S * math.sqrt(T) * norm_cdf(bs_d1(S, K, T, r, sigma)) / math.sqrt(2 * math.pi)
        if vega < 1e-8:
            break
        sigma -= (theo - price) / vega
        sigma = max(sigma, 1e-4)
    return sigma

## 5. Fonctions Heatmaps

In [None]:
def params_from_calib(calib: dict) -> HestonParams:
    return HestonParams(
        kappa=torch.tensor(calib["kappa"], dtype=torch.float64, device=DEVICE),
        theta=torch.tensor(calib["theta"], dtype=torch.float64, device=DEVICE),
        sigma=torch.tensor(calib["sigma"], dtype=torch.float64, device=DEVICE),
        rho=torch.tensor(calib["rho"], dtype=torch.float64, device=DEVICE),
        v0=torch.tensor(calib["v0"], dtype=torch.float64, device=DEVICE),
    )


def compute_heston_heatmaps(
    calib: dict,
    r: float,
    q: float,
    S0_ref: float,
    span: float = 100.0,
    points: int = 21,
    maturity: float = 1.0,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    S_min, S_max = S0_ref - span, S0_ref + span
    K_min, K_max = S0_ref - span, S0_ref + span
    S_grid = np.linspace(S_min, S_max, points)
    K_grid = np.linspace(K_min, K_max, points)

    params_tensor = params_from_calib(calib)
    call_prices = np.zeros((len(S_grid), len(K_grid)))

    with torch.no_grad():
        for i, S in enumerate(S_grid):
            for j, K in enumerate(K_grid):
                S_t = torch.tensor(S, dtype=torch.float64, device=DEVICE)
                K_t = torch.tensor(K, dtype=torch.float64, device=DEVICE)
                T_t = torch.tensor(maturity, dtype=torch.float64, device=DEVICE)
                call_prices[i, j] = float(
                    carr_madan_call_torch(S_t, r, q, T_t, params_tensor, K_t).cpu()
                )

    put_prices = call_prices - (S_grid[:, None] - K_grid[None, :] * np.exp(-r * maturity))
    return S_grid, K_grid, call_prices, put_prices


def compute_bs_heatmaps(
    S_grid: np.ndarray, K_grid: np.ndarray, maturity: float, r: float, vol: float
) -> Tuple[np.ndarray, np.ndarray]:
    call_prices = np.zeros((len(S_grid), len(K_grid)))
    put_prices = np.zeros((len(S_grid), len(K_grid)))
    for i, S in enumerate(S_grid):
        for j, K in enumerate(K_grid):
            call_prices[i, j] = bs_call_price(S, K, maturity, r, vol)
            put_prices[i, j] = bs_put_price(S, K, maturity, r, vol)
    return call_prices, put_prices


def plot_heatmap(prices: np.ndarray, K_grid: np.ndarray, S_grid: np.ndarray, title: str):
    fig = go.Figure(
        data=go.Heatmap(
            z=prices,
            x=K_grid,
            y=S_grid,
            colorscale="Viridis",
            colorbar=dict(title="Price"),
        )
    )
    fig.update_layout(
        title=title,
        xaxis_title="Strike K",
        yaxis_title="Spot S",
        width=600,
        height=500,
    )
    return fig


def heston_monte_carlo_price(
    S0: float,
    K: float,
    T: float,
    r: float,
    q: float,
    params: HestonParams,
    n_paths: int = 50000,
    n_steps: int = 100,
    option_type: str = "call",
) -> float:
    """Price option using Monte Carlo simulation with Heston model."""
    dt = T / n_steps
    sqrt_dt = np.sqrt(dt)
    
    # Extract Heston parameters
    kappa = float(params.kappa.cpu())
    theta = float(params.theta.cpu())
    sigma = float(params.sigma.cpu())
    rho = float(params.rho.cpu())
    v0 = float(params.v0.cpu())
    
    # Initialize arrays
    S = np.ones(n_paths) * S0
    v = np.ones(n_paths) * v0
    
    # Generate correlated random numbers
    for _ in range(n_steps):
        Z1 = np.random.standard_normal(n_paths)
        Z2 = rho * Z1 + np.sqrt(1 - rho**2) * np.random.standard_normal(n_paths)
        
        # Update variance (with Milstein scheme)
        v_sqrt = np.sqrt(np.maximum(v, 0))
        v = v + kappa * (theta - v) * dt + sigma * v_sqrt * sqrt_dt * Z2
        v = np.maximum(v, 0)  # Ensure non-negative variance
        
        # Update stock price (with Euler scheme)
        S = S * np.exp((r - q - 0.5 * v) * dt + v_sqrt * sqrt_dt * Z1)
    
    # Compute payoff
    if option_type == "call":
        payoff = np.maximum(S - K, 0)
    else:  # put
        payoff = np.maximum(K - S, 0)
    
    # Discount and average
    return np.exp(-r * T) * np.mean(payoff)


def compute_heston_heatmaps_mc(
    calib: dict,
    r: float,
    q: float,
    S0_ref: float,
    span: float = 100.0,
    points: int = 21,
    maturity: float = 1.0,
    n_paths: int = 50000,
    n_steps: int = 100,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Compute heatmaps using Monte Carlo simulation."""
    S_min, S_max = S0_ref - span, S0_ref + span
    K_min, K_max = S0_ref - span, S0_ref + span
    S_grid = np.linspace(S_min, S_max, points)
    K_grid = np.linspace(K_min, K_max, points)
    
    params_tensor = params_from_calib(calib)
    call_prices = np.zeros((len(S_grid), len(K_grid)))
    put_prices = np.zeros((len(S_grid), len(K_grid)))
    
    print(f"Computing {len(S_grid)}x{len(K_grid)} = {len(S_grid)*len(K_grid)} option prices with Monte Carlo...")
    total = len(S_grid) * len(K_grid)
    count = 0
    
    for i, S in enumerate(S_grid):
        for j, K in enumerate(K_grid):
            call_prices[i, j] = heston_monte_carlo_price(
                S, K, maturity, r, q, params_tensor, n_paths, n_steps, "call"
            )
            put_prices[i, j] = heston_monte_carlo_price(
                S, K, maturity, r, q, params_tensor, n_paths, n_steps, "put"
            )
            count += 1
            if count % 50 == 0:
                print(f"Progress: {count}/{total} ({100*count/total:.1f}%)")
    
    return S_grid, K_grid, call_prices, put_prices


### Explication: Simulation Monte Carlo

Les fonctions ci-dessus utilisent la **simulation Monte Carlo** pour pricer les options sous le modèle Heston:

1. **Simulation de trajectoires**: Génération de 50,000 trajectoires du processus stochastique de Heston
2. **Corrélation**: Les mouvements browniens du prix et de la variance sont corrélés (paramètre ρ)
3. **Schéma d'Euler/Milstein**: Discrétisation temporelle pour simuler l'évolution du prix et de la variance
4. **Payoff et actualisation**: Calcul du payoff moyen actualisé pour obtenir le prix de l'option

Cette méthode est plus flexible que Carr-Madan et permet de visualiser l'impact des paramètres Heston calibrés.

## 6. Fonctions IV Surfaces

In [None]:
def add_iv_columns(
    df: pd.DataFrame, price_col: str, option_type: str, r: float, q: float, params_tensor: HestonParams
) -> pd.DataFrame:
    df = df.copy()
    iv_bs_list = []
    iv_heston_list = []
    for _, row in df.iterrows():
        S0, K, T, price = row["S0"], row["K"], row["T"], row[price_col]
        iv_bs_list.append(implied_vol(price, S0, K, T, r, option_type))

        with torch.no_grad():
            call_heston = float(
                carr_madan_call_torch(
                    torch.tensor(S0, dtype=torch.float64, device=DEVICE),
                    r,
                    q,
                    torch.tensor(T, dtype=torch.float64, device=DEVICE),
                    params_tensor,
                    torch.tensor(K, dtype=torch.float64, device=DEVICE),
                ).cpu()
            )
        if option_type == "put":
            put_heston = call_heston - (S0 - K * math.exp(-r * T))
            price_heston = put_heston
        else:
            price_heston = call_heston

        iv_heston_list.append(implied_vol(price_heston, S0, K, T, r, option_type))

    df["iv_bs"] = iv_bs_list
    df["iv_heston"] = iv_heston_list
    return df


def prepare_iv_surface(df: pd.DataFrame, iv_col: str, S0_ref: float, span: float) -> pd.DataFrame:
    df = df.copy()
    df = df[df["T"] >= MIN_IV_MATURITY]
    df = df[(df["K"] >= S0_ref - span) & (df["K"] <= S0_ref + span)]
    df = df.dropna(subset=[iv_col])
    df = df[np.isfinite(df[iv_col])]
    if len(df) < 5:
        raise ValueError(f"Pas assez de données pour {iv_col}")
    df["moneyness"] = df["K"] / S0_ref
    return df


def plot_iv_surface(df: pd.DataFrame, S0_ref: float, title: str):
    fig = go.Figure(
        data=go.Scatter3d(
            x=df["moneyness"],
            y=df["T"],
            z=df[df.columns[df.columns.str.contains("iv")]].iloc[:, -1],
            mode="markers",
            marker=dict(size=3, color=df["T"], colorscale="Viridis"),
        )
    )
    fig.update_layout(
        title=title,
        scene=dict(
            xaxis_title="Moneyness (K/S₀)",
            yaxis_title="Maturity T",
            zaxis_title="Implied Vol",
        ),
        width=500,
        height=450,
    )
    return fig

## 7. Paramètres de Configuration

In [None]:
# Configuration
ticker = "SPY"
rf_rate = 0.02
years_ahead = 2.5
max_quotes = 300
max_iters = 80
lr = 5e-3
heatmap_span = 100.0
heatmap_points = 21
heatmap_maturity = 1.0

## 8. Téléchargement des Données

In [None]:
print(f"Téléchargement des options pour {ticker}...")
calls_df = download_options(ticker, "call", years_ahead=years_ahead)
puts_df = download_options(ticker, "put", years_ahead=years_ahead)
print(f"{len(calls_df)} calls et {len(puts_df)} puts téléchargés pour {ticker}.")

S0_ref = float(calls_df["S0"].median())
print(f"Spot de référence: {S0_ref:.2f}")

## 9. Calibration Heston

In [None]:
def log_cb(iter_idx: int, loss_val: float) -> None:
    print(f"Iter {iter_idx:03d} | loss = {loss_val:.6e}")

print("Calibration Heston en cours...")
calib, history, summary = calibrate_heston_from_calls(
    calls_df,
    r=rf_rate,
    q=0.0,
    max_points=max_quotes,
    max_iters=max_iters,
    lr=lr,
    log_callback=log_cb,
)

print("\nRésumé calibration:")
display(summary)

print("\nParamètres Heston calibrés:")
display(pd.Series(calib, name="params").to_frame())

## 10. Calcul des Heatmaps via Monte Carlo

Utilisation de simulations Monte Carlo pour pricer les options avec les paramètres Heston calibrés.


In [None]:
print("Calcul des heatmaps Heston...")
S_grid, K_grid, call_heston, put_heston = compute_heston_heatmaps_mc(
    calib,
    r=rf_rate,
    q=0.0,
    S0_ref=S0_ref,
    span=heatmap_span,
    points=heatmap_points,
    maturity=heatmap_maturity,
)

params_tensor = params_from_calib(calib)
with torch.no_grad():
    atm_call = carr_madan_call_torch(
        torch.tensor(S0_ref, dtype=torch.float64, device=DEVICE),
        rf_rate,
        0.0,
        torch.tensor(heatmap_maturity, dtype=torch.float64, device=DEVICE),
        params_tensor,
        torch.tensor(S0_ref, dtype=torch.float64, device=DEVICE),
    )
vol_bs = implied_vol(float(atm_call.cpu()), S0_ref, S0_ref, heatmap_maturity, rf_rate)

print(f"\nVolatilité BS ATM: {vol_bs:.4f}")
print("Calcul des heatmaps Black-Scholes...")
call_bs, put_bs = compute_bs_heatmaps(S_grid, K_grid, heatmap_maturity, rf_rate, vol_bs)

summary_heatmap = pd.DataFrame(
    {
        "Reference spot": [S0_ref],
        "Rate": [rf_rate],
        "Maturity T": [heatmap_maturity],
        "Strike range": [f"{K_grid[0]:.2f} → {K_grid[-1]:.2f} ({len(K_grid)} pts)"],
        "Spot range": [f"{S_grid[0]:.2f} → {S_grid[-1]:.2f} ({len(S_grid)} pts)"],
        "κ": [calib["kappa"]],
        "θ": [calib["theta"]],
        "σ": [calib["sigma"]],
        "ρ": [calib["rho"]],
        "v₀": [calib["v0"]],
        "σ_BS_ATM": [vol_bs],
    }
)
display(summary_heatmap)

## 11. Visualisation des Heatmaps

In [None]:
fig_call_heston = plot_heatmap(call_heston, K_grid, S_grid, "Call Price (Heston)")
fig_call_bs = plot_heatmap(call_bs, K_grid, S_grid, "Call Price (Black-Scholes)")

fig_call_heston
fig_call_bs


In [None]:
fig_put_heston = plot_heatmap(put_heston, K_grid, S_grid, "Put Price (Heston)")
fig_put_bs = plot_heatmap(put_bs, K_grid, S_grid, "Put Price (Black-Scholes)")

fig_put_heston
fig_put_bs


## 12. Surfaces d'IV

In [None]:
print("Calcul des volatilités implicites...")
calls_with_iv = add_iv_columns(calls_df, "C_mkt", "call", rf_rate, 0.0, params_tensor)
puts_with_iv = add_iv_columns(puts_df, "P_mkt", "put", rf_rate, 0.0, params_tensor)

def _surface_or_none(df: pd.DataFrame, iv_col: str, label: str) -> pd.DataFrame | None:
    try:
        return prepare_iv_surface(df, iv_col, S0_ref, heatmap_span)
    except ValueError as exc:
        print(f"{label}: {exc}")
        return None

surfaces_calls = [
    ("Call Market IV", _surface_or_none(calls_with_iv, "iv_market", "Call Market IV")),
    ("Call BS IV", _surface_or_none(calls_with_iv, "iv_bs", "Call BS IV")),
    ("Call Heston IV", _surface_or_none(calls_with_iv, "iv_heston", "Call Heston IV")),
]
surfaces_puts = [
    ("Put Market IV", _surface_or_none(puts_with_iv, "iv_market", "Put Market IV")),
    ("Put BS IV", _surface_or_none(puts_with_iv, "iv_bs", "Put BS IV")),
    ("Put Heston IV", _surface_or_none(puts_with_iv, "iv_heston", "Put Heston IV")),
]

### 12.1 Surfaces IV des Calls

In [None]:
for title, surface_obj in surfaces_calls:
    if surface_obj is not None:
        fig = plot_iv_surface(surface_obj, S0_ref, title)
        fig
    else:
        print(f"{title}: Surface indisponible")

### 12.2 Surfaces IV des Puts

In [None]:
for title, surface_obj in surfaces_puts:
    if surface_obj is not None:
        fig = plot_iv_surface(surface_obj, S0_ref, title)
        fig
    else:
        print(f"{title}: Surface indisponible")