In [26]:
import numpy as np
import pandas as pd
import time

i = 1j

# --- Characteristic functions ---
def chf_merton(u, r, tau, theta):
    sigma, xi, muJ, sigmaJ = theta
    omega_bar = xi * (np.exp(muJ + 0.5*sigmaJ**2) - 1)
    mu = r - 0.5*sigma**2 - omega_bar
    return np.exp(i*u*mu*tau - 0.5*sigma**2*u**2*tau +
                  xi*tau*(np.exp(i*muJ*u - 0.5*sigmaJ**2*u**2) - 1))

def chf_kou(u, r, tau, theta):
    sigma, xi, alpha1, alpha2, p1 = theta
    p2 = 1 - p1
    omega_bar = xi * (p1*alpha1/(alpha1-1) + p2*alpha2/(alpha2+1) - 1)
    mu = r - 0.5*sigma**2 - omega_bar
    return np.exp(i*u*mu*tau - 0.5*sigma**2*u**2*tau + xi*tau*((p1*alpha1)/(alpha1 - i*u) + (p2*alpha2)/(alpha2 + i*u) - 1))

def chf_heston(u, r, tau, theta):
    v0, v_bar, kappa, gamma, rho = theta
    d = np.sqrt(np.power(kappa-gamma*rho*i*u,2)+(u**2+i*u)*gamma**2)
    g  = (kappa-gamma*rho*i*u-d)/(kappa-gamma*rho*i*u+d)
    C  = (1.0-np.exp(-d*tau))/(gamma**2*(1.0-g*np.exp(-d*tau)))*(kappa-gamma*rho*i*u-d)
    A  = r * i*u *tau + kappa*v_bar*tau/gamma**2 *(kappa-gamma*rho*i*u-d) - 2*kappa*v_bar/gamma**2*np.log((1.0-g*np.exp(-d*tau))/(1.0-g))
    return np.exp(A + C*v0) 

def chf_bates(u, r, tau, theta):
    v0, v_bar, kappa, gamma, rho, xi, muJ, sigmaJ = theta
    # Heston Part
    d = np.sqrt(np.power(kappa-gamma*rho*i*u,2)+((u**2)+i*u)*(gamma**2))
    g  = (kappa-gamma*rho*i*u-d)/(kappa-gamma*rho*i*u+d)
    C  = (1.0-np.exp(-d*tau))/((gamma**2)*(1.0-g*np.exp(-d*tau)))*(kappa-gamma*rho*i*u-d)
    AHes = r * i*u *tau + kappa*v_bar*tau/gamma**2 *(kappa-gamma*rho*i*u-d) - 2*kappa*v_bar/gamma**2*np.log((1.0-g*np.exp(-d*tau))/(1.0-g))
    # Merton Jump Part
    omega_bar_jump = xi * (np.exp(muJ + 0.5 * sigmaJ**2) - 1)
    AJump = xi * tau * (np.exp(i * u * muJ - 0.5 * sigmaJ**2 * u**2) - 1) - i * u * tau * omega_bar_jump
    
    return np.exp(C*v0 + AHes + AJump)

# --- COS methods ---
def chi_psi_vec(k, a, b, c, d):
    omega = k * np.pi / (b - a)
    # Initialize arrays with the full size
    xi, psi = np.zeros_like(omega, dtype=float), np.zeros_like(omega, dtype=float)
    
    # Handle k=0 case by assigning to the first row
    xi[0, :] = np.exp(d) - np.exp(c)
    psi[0, :] = d - c
    
    # Handle k > 0 case
    if k.shape[0] > 1:
        omega_nz = omega[1:, :]
        
        # Calculate xi and psi for the non-zero omega slice
        denom = 1.0 + omega_nz**2
        xi_nz = (np.cos(omega_nz * (d - a)) * np.exp(d) + omega_nz * np.sin(omega_nz * (d - a)) * np.exp(d) - 
                 np.cos(omega_nz * (c - a)) * np.exp(c) - omega_nz * np.sin(omega_nz * (c - a)) * np.exp(c)) / denom
        psi_nz = (np.sin(omega_nz * (d - a)) - np.sin(omega_nz * (c - a))) / omega_nz
        
        # Assign the results to the correct slice of the original arrays
        xi[1:, :] = xi_nz
        psi[1:, :] = psi_nz
        
    return xi, psi

def payoff_coefficients_vec(CP, k, a, b):
    # Ensure CP is an array
    CP_arr, is_call = np.atleast_1d(CP), (np.atleast_1d(CP) == 'call')
    c = np.where(is_call, 0.0, a)
    d = np.where(is_call, b, 0.0)
    
    s = np.where(is_call, 1.0, -1.0)
    
    xi, psi = chi_psi_vec(k, a, b, c, d)
    
    H_k = (2.0 / (b - a)) * s * (xi - psi)
    return H_k

def cos_pricer(model, CP, S, K, tau, theta, N, L):
    r = 0.0
    S, K, tau, CP = np.broadcast_arrays(S, K, tau, CP)
    
    x   = np.log(S / K)
    k = np.arange(N, dtype=float)[:, None] 

    # Unified truncation range
    a = -L * np.sqrt(tau)
    b = L * np.sqrt(tau)
    u = k * np.pi / (b - a) 

    # Get the ChF for the model
    model_name = model.lower()
    if model_name == 'merton':
        phi = chf_merton(u, r, tau, theta)
    elif model_name == 'kou':
        phi = chf_kou(u, r, tau, theta)
    elif model_name == 'heston':
        phi = chf_heston(u, r, tau, theta)  
    elif model_name == 'bates':
        phi = chf_bates(u, r, tau, theta)
    else:
        raise ValueError(f"Unknown model: {model}")

    H_k = payoff_coefficients_vec(CP, k, a, b)
    
    w0 = np.real(phi * np.exp(-i * u * (a - x)))
    w0[0, :] *= 0.5
    prices = np.exp(-r * tau) * K * np.sum(w0 * H_k, axis=0)
    return prices.astype(float)


# --- Start main script ---
# Initialize parameters
N_vals = [32, 64, 128, 256, 512] 
L_vals = [4, 6, 8, 10, 12, 15]
models = ['Merton', 'Kou', 'Heston', 'Bates']

# Model parameters based on your thesis median values
theta_params = {
    'Merton': (0.62, 44.91, -0.03, 0.05),
    'Kou': (0.65, 59.93, 28.72, 34.13, 0.06),
    'Heston': (0.44, 0.70, 2.5, 2.0, -0.30),
    'Bates': (0.41, 0.68, 6.39, 2.00, 0.69, 24.93, -0.05, 0.05)
}

# Define options to test (DOTM, OTM, ATM)
S0 = 100.0
tau_val = (1 / 365)
strikes = np.array([140, 120, 100])
option_types = np.array(['call', 'call', 'call'])

# Benchmark price (large N and L)
N_benchmark = 16384
L_benchmark = 12
benchmark_prices = {}
print("Calculating benchmark prices with N=16384...")
for model in models:
    theta = theta_params[model]
    prices = cos_pricer(model, option_types, S0, strikes, tau_val, theta, N_benchmark, L_benchmark)
    benchmark_prices[model] = prices
    print(f"  {model} Benchmarks (DOTM, OTM, ATM): {np.round(prices, 4)}")
print("Benchmark prices calculated.\n")

# Run convergence simulation
results = []
print("Running convergence experiment...")
num_runs_timing = 10

for model in models:
    theta = theta_params[model]
    for l in L_vals:
        for n in N_vals:
            # Time the calculation
            start_time = time.time()
            for _ in range(num_runs_timing):
                current_prices = cos_pricer(model, option_types, S0, strikes, tau_val, theta, n, l)
            avg_time_ms = (time.time() - start_time) / num_runs_timing * 1000

            # Calculate RMSE against the benchmark for this model
            error = np.abs(np.mean(current_prices - benchmark_prices[model]))

            results.append({
                'Model': model,
                'L': l,
                'N': n,
                'Error': error,
                'Time (ms)': avg_time_ms
            })
print("Experiment finished.\n")

# Get results
results_df = pd.DataFrame(results)
for model in models:
    print(f"\n--- Convergence Table for {model} Model ---")
    model_df = results_df[results_df['Model'] == model].copy()
    
    # Create a display string with RMSE and Time
    model_df['display'] = model_df.apply(
        lambda row: f"{row['Error']:.2e} / {row['Time (ms)']:.2f}ms", axis=1
    )
    
    # Pivot the DataFrame to create the desired table format
    pivot_table = model_df.pivot(index='N', columns='L', values='display')
    
    # Add a title for the L values
    pivot_table.columns.name = 'Truncation Range Parameter (L)'
    
    print("Each cell: Error / Time (ms)")
    print(pivot_table.to_markdown())


Calculating benchmark prices with N=16384...
  Merton Benchmarks (DOTM, OTM, ATM): [0.0000e+00 1.0000e-04 1.4577e+00]
  Kou Benchmarks (DOTM, OTM, ATM): [0.000e+00 4.000e-04 1.471e+00]
  Heston Benchmarks (DOTM, OTM, ATM): [0.     0.     1.3848]
  Bates Benchmarks (DOTM, OTM, ATM): [0.     0.     1.4584]
Benchmark prices calculated.

Running convergence experiment...
Experiment finished.


--- Convergence Table for Merton Model ---
Each cell: Error / Time (ms)
|   N | 4                 | 6                 | 8                 | 10                | 12                | 15                |
|----:|:------------------|:------------------|:------------------|:------------------|:------------------|:------------------|
|  32 | 3.88e-02 / 0.07ms | 1.60e-05 / 0.06ms | 7.64e-07 / 0.06ms | 1.05e-05 / 0.05ms | 1.46e-04 / 0.06ms | 2.22e-03 / 0.06ms |
|  64 | 3.88e-02 / 0.07ms | 1.60e-05 / 0.06ms | 2.72e-09 / 0.07ms | 1.37e-12 / 0.06ms | 1.03e-09 / 0.06ms | 6.79e-08 / 0.06ms |
| 128 | 3.88e-02 / 0.08