In [1]:
import numpy as np
from dataclasses import dataclass

# =========================
# Heston + Carr–Madan (FFT)
# =========================

@dataclass
class HestonParams:
    kappa: float
    theta: float
    sigma: float
    rho: float
    v0: float

def heston_cf(u, T, S0, r, q, p: HestonParams):
    """
    Characteristic function φ(u) = E[exp(i u ln S_T)] under Q.
    Uses the Little Heston Trap parametrization.
    u can be scalar or numpy array.
    """
    i = 1j
    x0 = np.log(S0)
    a  = p.kappa * p.theta
    b  = p.kappa - p.rho * p.sigma * i * u
    d  = np.sqrt(b*b + (p.sigma**2) * (i*u + u*u))
    g  = (b - d) / (b + d)

    eDT = np.exp(-d * T)
    one_minus_g_eDT = 1 - g * eDT
    one_minus_g     = 1 - g
    # small guards
    one_minus_g_eDT = np.where(np.abs(one_minus_g_eDT) < 1e-15, 1e-15, one_minus_g_eDT)
    one_minus_g     = np.where(np.abs(one_minus_g)     < 1e-15, 1e-15, one_minus_g)

    C = i*u*(r - q)*T + (a/(p.sigma**2)) * ((b - d)*T - 2.0*np.log(one_minus_g_eDT/one_minus_g))
    D = ((b - d)/(p.sigma**2)) * ((1 - eDT) / one_minus_g_eDT)
    return np.exp(C + D*p.v0 + i*u*x0)

def _simpson_weights(N: int):
    """Simpson weights on an N-point uniform grid (N must be even)."""
    if N % 2 != 0:
        raise ValueError("N must be even for Simpson weights.")
    w = np.ones(N)
    w[1:N-1:2] = 4
    w[2:N-2:2] = 2
    return w

def heston_fft_calls(
    S0: float,
    T: float,
    r: float,
    q: float,
    p: HestonParams,
    N: int = 4096,      # power-of-two recommended; must be even
    eta: float = 0.25,  # frequency step Δv
    alpha: float = 1.5  # damping (>0)
):
    """
    Carr–Madan FFT for call prices across a K-grid.
    IMPORTANT: k = ln K (log-STRIKE), not ln(K/S0).

    Returns
    -------
    K : (N,) ascending strikes
    C : (N,) call prices for these K
    """
    n = np.arange(N)
    v = eta * n  # frequency grid

    i = 1j
    # ψ(v) = e^{-rT} φ(v - i(α+1)) / [(α + iv)(α + iv + 1)]
    phi_shift = heston_cf(v - (alpha + 1)*i, T, S0, r, q, p)
    denom = (alpha**2 + alpha - v**2 + i*(2*alpha + 1)*v)  # (α+iv)(α+iv+1)
    psi = np.exp(-r*T) * phi_shift / denom

    # Simpson weights for the v-integral
    w = _simpson_weights(N) * (eta / 3.0)

    # FFT coupling
    lam = 2.0 * np.pi / (N * eta)   # Δk (log-strike step)
    b   = 0.5 * N * lam             # half-width in k
    x   = psi * np.exp(1j * b * v) * w

    F   = np.fft.fft(x)
    F   = np.real(F)

    j = np.arange(N)
    k = -b + j * lam                 # k = ln K
    K = np.exp(k)

    calls = np.exp(-alpha * k) / np.pi * F
    order = np.argsort(K)
    return K[order], np.maximum(calls[order], 0.0)

def heston_fft_call_price(
    S0: float, K: float, T: float, r: float, q: float, p: HestonParams,
    N: int = 4096, eta: float = 0.25, alpha: float = 1.5
):
    """Price a single call via FFT + linear interpolation on the K-grid."""
    K_grid, C_grid = heston_fft_calls(S0, T, r, q, p, N=N, eta=eta, alpha=alpha)
    if K <= K_grid[0]:
        return C_grid[0]
    if K >= K_grid[-1]:
        return C_grid[-1]
    idx = np.searchsorted(K_grid, K)
    x0, x1 = K_grid[idx-1], K_grid[idx]
    y0, y1 = C_grid[idx-1], C_grid[idx]
    return y0 + (y1 - y0) * (K - x0) / (x1 - x0)

def heston_fft_put_price(
    S0: float, K: float, T: float, r: float, q: float, p: HestonParams,
    N: int = 4096, eta: float = 0.25, alpha: float = 1.5
):
    """Put via put–call parity."""
    C = heston_fft_call_price(S0, K, T, r, q, p, N=N, eta=eta, alpha=alpha)
    return C - S0*np.exp(-q*T) + K*np.exp(-r*T)




In [2]:
# parameters
S0, r, q, T = 100.0, 0.01, 0.00, 1.0
hp = HestonParams(kappa=1.5, theta=0.04, sigma=0.3, rho=-0.7, v0=0.04)

# full grid of calls
K, C = heston_fft_calls(S0, T, r, q, hp, N=4096, eta=0.25, alpha=1.5)

# single strikes
call_100 = heston_fft_call_price(S0, 100, T, r, q, hp)

call_100


np.float64(8.07774347291964)

In [3]:
# Simulate Heston price with given parameters
S0 = 100          # Initial stock price
K = 100           # Strike price
T = 1             # Time to maturity
r = 0.01          # Risk-free rate
v0 = 0.04         # Initial variance
kappa = 2         # Mean reversion speed
theta = 0.04      # Long-run variance
sigma = 0.3       # Volatility of variance
rho = -0.7        # Correlation
N = 16384         # Number of grid points
alpha = 1.5       # Damping factor

# Simulation parameters
n_paths = 10000   # Number of paths
n_steps = 252     # Number of time steps (daily)
dt = T/n_steps    # Time step size

# Initialize arrays for paths
S = np.zeros((n_paths, n_steps + 1))
v = np.zeros((n_paths, n_steps + 1))

# Set initial values
S[:, 0] = S0
v[:, 0] = v0

# Generate correlated random numbers
rng = np.random.default_rng()
dW1 = rng.normal(0, np.sqrt(dt), (n_paths, n_steps))
dW2 = rho * dW1 + np.sqrt(1 - rho**2) * rng.normal(0, np.sqrt(dt), (n_paths, n_steps))

# Euler-Maruyama simulation
for t in range(n_steps):
    # Ensure variance stays positive
    v[:, t] = np.maximum(v[:, t], 0)

    # Update stock price
    S[:, t+1] = S[:, t] * np.exp((r - 0.5*v[:, t])*dt + np.sqrt(v[:, t])*dW1[:, t])

    # Update variance
    v[:, t+1] = v[:, t] + kappa*(theta - v[:, t])*dt + sigma*np.sqrt(v[:, t])*dW2[:, t]

# Calculate call option prices
payoffs = np.maximum(S[:, -1] - K, 0)
discounted_payoffs = np.exp(-r*T) * payoffs
price = np.mean(discounted_payoffs)

# Calculate 95% confidence interval
std_error = np.std(discounted_payoffs) / np.sqrt(n_paths)
ci_lower = price - 1.96 * std_error
ci_upper = price + 1.96 * std_error

print(f"Heston call option price: {price:.8f}")
print(f"95% Confidence Interval: [{ci_lower:.8f}, {ci_upper:.8f}]")



Heston call option price: 8.10766805
95% Confidence Interval: [7.89253969, 8.32279640]


In [4]:
# Generate strike grid centered around S0
strikes = np.arange(S0-20, S0+25, 5)

# Get FFT prices for all strikes
K_fft, C_fft = heston_fft_calls(S0, T, r, q, hp, N=4096, eta=0.25, alpha=1.5)

# Interpolate FFT prices to our strike grid
fft_prices = np.interp(strikes, K_fft, C_fft)

# Calculate MC prices for all strikes
mc_prices = []
mc_ci_lower = []
mc_ci_upper = []

for K in strikes:
    payoffs = np.maximum(S[:, -1] - K, 0)
    discounted_payoffs = np.exp(-r*T) * payoffs
    price = np.mean(discounted_payoffs)

    # Calculate 95% confidence interval
    std_error = np.std(discounted_payoffs) / np.sqrt(n_paths)
    mc_prices.append(price)
    mc_ci_lower.append(price - 1.96 * std_error)
    mc_ci_upper.append(price + 1.96 * std_error)

# Print comparison table
print("\nPrice Comparison:")
print("Strike   FFT Price   MC Price   MC 95% CI")
print("-" * 45)
for i in range(len(strikes)):
    print(f"{strikes[i]:6.1f}   {fft_prices[i]:9.4f}   {mc_prices[i]:8.4f}   [{mc_ci_lower[i]:8.4f}, {mc_ci_upper[i]:8.4f}]")




Price Comparison:
Strike   FFT Price   MC Price   MC 95% CI
---------------------------------------------
  80.0     22.3804    22.3013   [ 21.9792,  22.6234]
  85.0     18.2617    18.2023   [ 17.9013,  18.5033]
  90.0     14.4595    14.4111   [ 14.1351,  14.6871]
  95.0     11.0425    11.0176   [ 10.7704,  11.2648]
 100.0      8.0777     8.1077   [  7.8925,   8.3228]
 105.0      5.6161     5.7018   [  5.5203,   5.8833]
 110.0      3.6817     3.7996   [  3.6514,   3.9479]
 115.0      2.2629     2.4006   [  2.2837,   2.5175]
 120.0      1.2993     1.4408   [  1.3519,   1.5296]


In [5]:
# pip install torch --upgrade
import torch
import torch.nn as nn
import torch.nn.functional as F

# =========================
# Torch Heston + Carr–Madan
# =========================

class HestonPricingLayer(nn.Module):
    """
    Differentiable Heston pricer (Carr–Madan FFT) in torch.
    Vectorized over batch.
    """
    def __init__(self, N=4096, eta=0.25, alpha=1.5):
        super().__init__()
        if N % 2 != 0:
            raise ValueError("N must be even.")
        if alpha <= 0:
            raise ValueError("alpha must be > 0.")
        self.N = N
        self.eta = eta
        self.alpha = alpha

        # Precompute frequency grid and Simpson weights as buffers
        n = torch.arange(N, dtype=torch.float64)  # use float64 for stability
        v = eta * n
        w = torch.ones(N, dtype=torch.float64)
        w[1:N-1:2] = 4.0
        w[2:N-2:2] = 2.0
        w = w * (eta / 3.0)

        self.register_buffer("v", v)
        self.register_buffer("w", w)

        dk = 2.0 * torch.pi / (N * eta)
        b = 0.5 * N * dk
        self.dk = dk
        self.b = b

    def _heston_cf(self, u, T, S0, r, q, kappa, theta, sigma, rho, v0):
        """
        Little Heston Trap CF in torch, vectorized over batch and frequency.
        Inputs:
            u: (..., N) complex
            scalars or tensors broadcastable for T,S0,r,q,kappa,theta,sigma,rho,v0
        """
        i = torch.complex(torch.tensor(0., dtype=torch.float64, device=u.device),
                          torch.tensor(1., dtype=torch.float64, device=u.device))
        x0 = torch.log(S0)

        a = kappa * theta
        b = kappa - rho * sigma * i * u
        d = torch.sqrt(b*b + (sigma**2) * (u*u + i*u))

        g = (b - d) / (b + d)
        # trap: enforce |g|<1
        mask = (g.abs() >= 1.0)
        g = torch.where(mask, 1.0 / g, g)

        eDT = torch.exp(-d * T)
        one_minus_g = 1.0 - g
        one_minus_g_eDT = 1.0 - g * eDT

        eps = torch.tensor(1e-15, dtype=torch.float64, device=u.device)
        one_minus_g = torch.where(one_minus_g.abs() < eps, eps, one_minus_g)
        one_minus_g_eDT = torch.where(one_minus_g_eDT.abs() < eps, eps, one_minus_g_eDT)

        C = i*u*(r - q)*T + (a/(sigma**2)) * ((b - d)*T - 2.0*torch.log(one_minus_g_eDT/one_minus_g))
        D = ((b - d)/(sigma**2)) * ((1.0 - eDT)/one_minus_g_eDT)

        return torch.exp(C + D*v0 + i*u*x0)

    def forward(self, S0, K, T, r, q, kappa, theta, sigma, rho, v0):
        """
        Returns call price C(S0,K,T,r,q; params)
        All inputs are tensors of shape (batch,)
        """
        device = S0.device
        dtype = torch.float64

        v = self.v.to(device=device)  # (N,)
        w = self.w.to(device=device)  # (N,)
        N = self.N
        alpha = torch.as_tensor(self.alpha, dtype=dtype, device=device)

        # Build shifted CF on the batch × N grid
        # u = v - i(α+1)
        i = torch.complex(torch.tensor(0., dtype=dtype, device=device),
                          torch.tensor(1., dtype=dtype, device=device))
        u_shift = v - (alpha + 1.0)*i  # (N,) complex

        # Broadcast u over batch: (B, N)
        B = S0.shape[0]
        u = u_shift.unsqueeze(0).expand(B, N)

        # Cast all reals to float64
        S0 = S0.to(dtype)
        K  = K.to(dtype)
        T  = T.to(dtype)
        r  = r.to(dtype)
        q  = q.to(dtype)
        kappa = kappa.to(dtype)
        theta = theta.to(dtype)
        sigma = sigma.to(dtype)
        rho   = rho.to(dtype)
        v0    = v0.to(dtype)

        phi_shift = self._heston_cf(u, T[:,None], S0[:,None], r[:,None], q[:,None],
                                    kappa[:,None], theta[:,None], sigma[:,None],
                                    rho[:,None], v0[:,None])  # (B,N) complex

        denom = (alpha**2 + alpha - v**2 + (2*alpha + 1.0)*i*v)  # (N,) complex
        denom = torch.where(denom.abs() < 1e-30, torch.complex(torch.tensor(1e-30, dtype=dtype, device=device),
                                                               torch.tensor(0., dtype=dtype, device=device)),
                            denom)
        psi = torch.exp(-r[:,None]*T[:,None]) * phi_shift / denom  # (B,N) complex

        # FFT coupling
        dk = self.dk.to(device=device)
        b  = self.b.to(device=device)
        x  = psi * torch.exp(1j * b * v) * w  # (B,N) complex

        # torch.fft.fft over last dim
        F = torch.fft.fft(x, dim=-1)  # (B,N) complex
        F = F.real  # (B,N)

        j = torch.arange(N, dtype=dtype, device=device)
        k_grid = -b + j * dk      # (N,)
        K_grid = torch.exp(k_grid)

        calls_grid = torch.exp(-alpha * k_grid)[None, :] / torch.pi * F  # (B,N)

        # Interp lineaire en log-strike
        k_target = torch.log(K).to(dtype)  # (B,)
        # local indices
        # convert k_target to position in grid
        # pos = (k_target + b)/dk
        pos = (k_target + b) / dk
        idx1 = torch.clamp(pos.floor().long(), 0, N-2)     # left index
        idx2 = idx1 + 1

        k0 = k_grid[idx1]  # (B,)
        k1 = k_grid[idx2]
        c0 = calls_grid.gather(1, idx1.view(-1,1)).squeeze(1)
        c1 = calls_grid.gather(1, idx2.view(-1,1)).squeeze(1)
        w1 = (k_target - k0) / (k1 - k0 + 1e-16)
        price = c0 + (c1 - c0) * w1  # (B,)

        # Optionnel: clamp léger pour stabilité numérique, mais on évite de couper négatifs microscopiques
        return price


# =============
# Neural network
# =============

class HestonCalibrator(nn.Module):
    """
    NN -> Heston params, then pricing layer -> call price.
    Inputs per sample: [r, T, S0, K, iv_bs]  # évite C_mkt en entrée
    """
    def __init__(self, hidden=128, N=4096, eta=0.25, alpha=1.5):
        super().__init__()
        self.feature_net = nn.Sequential(
            nn.Linear(5, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 6)  # raw outputs: kappa, theta, sigma, rho, v0, q_shift
        )
        self.pricer = HestonPricingLayer(N=N, eta=eta, alpha=alpha)

        # On inclut q comme variable exogène? Ici on suppose q=0
        # Si tu veux apprendre un petit décalage de dividende effectif sur les données, q_shift sert à ça.
        # Sinon mets-le à zéro lors du forward.

    def forward(self, x, C_mkt=None):
        """
        x: tensor (B,5) with columns [r, T, S0, K, iv_bs]
        C_mkt: optional (B,) market call price (only needed for loss)
        Returns:
          params dict and C_pred
        """
        r, T, S0, K, iv = x.unbind(dim=1)  # each (B,)
        raw = self.feature_net(x)

        raw_kappa, raw_theta, raw_sigma, raw_rho, raw_v0, raw_qshift = raw.unbind(dim=1)

        # domain constraints
        softplus = F.softplus
        kappa = softplus(raw_kappa) + 1e-6
        theta = softplus(raw_theta) + 1e-8
        sigma = softplus(raw_sigma) + 1e-8
        rho   = 0.999 * torch.tanh(raw_rho)
        v0    = softplus(raw_v0) + 1e-10
        q     = torch.zeros_like(r) + 0.0 + 0.0*raw_qshift  # fixe q=0 ; ou utilise petite correction: 1e-3*tanh(raw_qshift)

        C_pred = self.pricer(S0, K, T, r, q, kappa, theta, sigma, rho, v0)

        out = {
            "kappa": kappa, "theta": theta, "sigma": sigma, "rho": rho, "v0": v0,
            "C_pred": C_pred
        }
        if C_mkt is None:
            return out
        loss = F.mse_loss(C_pred, C_mkt)
        out["loss"] = loss
        return out


# =========
# Training
# =========


def bs_vega(S0, K, T, r, q, vol):
    # vega = ∂C/∂vol sous Black–Scholes (approx pour pondérer la perte)
    from math import erf, sqrt, log, exp
    def ncdf_pdf(x):
        # retourne (phi, Phi)
        from math import pi
        Phi = 0.5*(1.0 + erf(x/sqrt(2.0)))
        phi = (1.0/sqrt(2.0*pi))*torch.exp(-0.5*torch.as_tensor(x)**2)
        return phi, Phi
    S0 = torch.as_tensor(S0); K = torch.as_tensor(K); T = torch.as_tensor(T)
    r = torch.as_tensor(r); q = torch.as_tensor(q); vol = torch.as_tensor(vol)
    eps = 1e-12
    d1 = (torch.log(S0/K) + (r - q + 0.5*vol*vol)*T) / (vol*torch.sqrt(T+eps)+eps)
    phi, _ = ncdf_pdf(d1)
    return S0*torch.exp(-q*T)*torch.as_tensor(phi)*torch.sqrt(T+eps)

def weighted_mse(C_pred, C_mkt, vega, eps=1e-8):
    w = 1.0/(vega.abs() + eps)  # plus de poids là où Vega est petit
    diff = (C_pred - C_mkt)
    return torch.mean((w*diff)**2)

def feller_penalty(kappa, theta, sigma):
    # Feller: 2*kappa*theta >= sigma^2 ; pénalise les violations
    viol = torch.relu(sigma*sigma - 2.0*kappa*theta)
    return viol


def train_step(model, optimizer, batch_x, batch_Cmkt, lambda_feller=1e-3, lambda_ridge=1e-6):
    """
    batch_x: (B,5) = [r, T, S0, K, iv_bs]
    batch_Cmkt: (B,)
    """
    model.train()
    optimizer.zero_grad()

    out = model(batch_x, C_mkt=batch_Cmkt)
    C_pred = out["C_pred"]

    r, T, S0, K, iv = batch_x.unbind(dim=1)
    vega = bs_vega(S0, K, T, r, torch.zeros_like(r), iv)  # q≈0 pour le poids

    mse = weighted_mse(C_pred, batch_Cmkt, vega)

    # pénalités sur paramètres
    pen_feller = feller_penalty(out["kappa"], out["theta"], out["sigma"]).mean()
    pen_ridge  = (out["kappa"].mean() + out["theta"].mean() + out["sigma"].mean()
                  + out["v0"].mean() + out["rho"].pow(2).mean())  # léger l2

    loss = mse + lambda_feller*pen_feller + lambda_ridge*pen_ridge
    loss.backward()

    torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
    optimizer.step()

    return {
        "loss": loss.detach(),
        "mse": mse.detach(),
        "feller": pen_feller.detach()
    }



# =================
# Example dummy use
# =================
if __name__ == "__main__":
    torch.set_default_dtype(torch.float64)

    # Fake batch (remplace par tes données réelles)
    B = 32
    r  = torch.full((B,), 0.01)
    T  = torch.linspace(0.1, 2.0, B)
    S0 = torch.full((B,), 100.0)
    K  = torch.linspace(60.0, 140.0, B)
    iv = torch.full((B,), 0.2)

    # Prix marché factices: par exemple Black-Scholes comme cible
    # En pratique, mets tes vrais prix (mid) ici
    from math import erf, sqrt, log, exp
    def ncdf(x): return 0.5*(1.0 + erf(x/sqrt(2.0)))
    def bs_call(S0,K,T,r,q,vol):
        if T<=0 or vol<=0: return max(0.0, S0 - K*exp(-r*T))
        d1 = (log(S0/K)+(r - q + 0.5*vol*vol)*T)/(vol*sqrt(T))
        d2 = d1 - vol*sqrt(T)
        return S0*exp(-q*T)*ncdf(d1) - K*exp(-r*T)*ncdf(d2)

    C_mkt = torch.tensor([bs_call(S0[i].item(), K[i].item(), T[i].item(), r[i].item(), 0.0, iv[i].item())
                          for i in range(B)], dtype=torch.float64)

    X = torch.stack([r, T, S0, K, iv], dim=1)

    model = HestonCalibrator(hidden=128, N=2048, eta=0.20, alpha=1.5)  # N=2048 pour accélérer l’exemple
    opt = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(5):  # démo courte
        stats = train_step(model, opt, X, C_mkt)
        print(f"epoch {epoch} | loss {stats['loss'].item():.6e}")

    with torch.no_grad():
        preds = model(X)
        print("kappa mean:", preds["kappa"].mean().item(),
              "theta mean:", preds["theta"].mean().item(),
              "sigma mean:", preds["sigma"].mean().item(),
              "rho mean:", preds["rho"].mean().item(),
              "v0 mean:", preds["v0"].mean().item())


AttributeError: 'float' object has no attribute 'to'