In [None]:
import warnings
warnings.filterwarnings('ignore')
!pip install QuantLib
import QuantLib as ql
import numpy as np
import pandas as pd
import itertools

from scipy.stats import norm
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.interpolate import CubicSpline, interp1d

!pip install seaborn
import seaborn as sns
sns.set_theme(style="white")
import math
from sklearn.kernel_ridge import KernelRidge
from scipy.stats import linregress, t
import statsmodels.api as sm


######################################################################################################
# functions to plot 3D vol surfaces, generate paths, and generate vol surface from Heston parameters #
######################################################################################################

def plot_vol_surface(vol_surface, plot_years=np.arange(0.1, 3, 0.1), plot_strikes=np.arange(70, 130, 1), funct='blackVol'):
    if type(vol_surface) != list:
        surfaces = [vol_surface]
    else:
        surfaces = vol_surface

    fig = plt.figure()
    ax = fig.add_subplot(projection='3d')
    X, Y = np.meshgrid(plot_strikes, plot_years)

    for surface in surfaces:
        method_to_call = getattr(surface, funct)

        Z = np.array([method_to_call(float(y), float(x))
                      for xr, yr in zip(X, Y)
                          for x, y in zip(xr,yr) ]
                     ).reshape(len(X), len(X[0]))

        surf = ax.plot_surface(X,Y,Z, rstride=1, cstride=1, linewidth=0.1)

    fig.colorbar(surf, shrink=0.5, aspect=5)

def generate_multi_paths_df(sequence, num_paths):
    spot_paths = []
    vol_paths = []

    for i in range(num_paths):
        sample_path = seq.next()
        values = sample_path.value()

        spot, vol = values

        spot_paths.append([x for x in spot])
        vol_paths.append([x for x in vol])

    df_spot = pd.DataFrame(spot_paths, columns=[spot.time(x) for x in range(len(spot))])
    df_vol = pd.DataFrame(vol_paths, columns=[spot.time(x) for x in range(len(spot))])

    return df_spot, df_vol

def create_vol_surface_mesh_from_heston_params(today, calendar, spot, v0, kappa, theta, rho, sigma,
                                               rates_curve_handle, dividend_curve_handle,
                                               strikes = np.linspace(40, 200, 161), tenors = np.linspace(0.1, 3, 60)):
    quote = ql.QuoteHandle(ql.SimpleQuote(spot))

    heston_process = ql.HestonProcess(rates_curve_handle, dividend_curve_handle, quote, v0, kappa, theta, sigma, rho)
    heston_model = ql.HestonModel(heston_process)
    heston_handle = ql.HestonModelHandle(heston_model)
    heston_vol_surface = ql.HestonBlackVolSurface(heston_handle)

    data = []
    for strike in strikes:
        data.append([heston_vol_surface.blackVol(tenor, strike) for tenor in tenors])

    expiration_dates = [calendar.advance(today, ql.Period(int(365*t), ql.Days)) for t in tenors]
    implied_vols = ql.Matrix(data)
    feller = 2 * kappa * theta - sigma ** 2

    return expiration_dates, strikes, implied_vols, feller

# World State for Vanilla Pricing
spot = 100
rate = 0.00
today = ql.Date(8, 7, 2024)
calendar = ql.NullCalendar()
day_count = ql.Actual365Fixed()

# Setting up the flat risk-free curves
riskFreeCurve = ql.FlatForward(today, rate, ql.Actual365Fixed())

ql.Settings.instance().evaluationDate = today
flat_ts = ql.YieldTermStructureHandle(
    ql.FlatForward(today, rate, day_count))
dividend_ts = ql.YieldTermStructureHandle(
    ql.FlatForward(today, rate, day_count))


#############################################################################################################
#Firstly, create the volatility surface 'observed in the market' using already calibrated Heston parameters
#############################################################################################################

dates, strikes, vols, feller = create_vol_surface_mesh_from_heston_params(today, calendar, spot, 0.0094, 1.4124, 0.0137, -0.1194, 0.2988, flat_ts, dividend_ts)

market_data = ql.BlackVarianceSurface(today, calendar, dates, strikes, vols, day_count)

plot_vol_surface(market_data)
plt.title('Artificial "market" volatility surface')

# For given maturity 1Y, plot the volatility against strikes
v0, kappa, theta, rho, sigma = 0.0094, 1.4124, 0.0137, -0.1194, 0.2988

strikes = np.linspace(70,130,58)
tenors = np.arange(0.1, 3, 0.05)

quote = ql.QuoteHandle(ql.SimpleQuote(spot))
heston_process = ql.HestonProcess(flat_ts, dividend_ts, quote, v0, kappa, theta, sigma, rho)
model = ql.HestonModel(heston_process)
heston_handle = ql.HestonModelHandle(model)
heston_vol_surface = ql.HestonBlackVolSurface(heston_handle)

data = []
for strike in strikes:
    data.append([heston_vol_surface.blackVol(tenor, strike) for tenor in tenors])

expiration_dates = [calendar.advance(today, ql.Period(int(365*t), ql.Days)) for t in tenors]
implied_vols = ql.Matrix(data)
feller = 2 * kappa * theta - sigma ** 2

strikes = np.linspace(40, 200, 161)
strikes_grid = np.arange(strikes[0], strikes[-1],10)
expiry = 1.0
implied_vols = [heston_vol_surface.blackVol(1, s) for s in strikes_grid]
fig, ax = plt.subplots()
ax.plot(strikes_grid, implied_vols, label="Implied Volatility Surface")
ax.set_xlabel("Strikes", size=12)
ax.set_ylabel("Volatilities", size=12)
legend = ax.legend(loc="upper right")

plt.show()

#####################################################################################################
# Construct the European Option (these are the true values that we will be comparing our results to)
#####################################################################################################

option = ql.Option.Call
strikes = np.linspace(80,120,101)
expiration= calendar.advance(today, ql.Period(int(365), ql.Days))
prices=[]
for s in strikes:
    payoff = ql.PlainVanillaPayoff(option, s)
    exercise = ql.EuropeanExercise(expiration)
    european_option = ql.VanillaOption(payoff, exercise)
    v0, kappa, theta, rho, sigma = 0.0094, 1.4124, 0.0137, -0.1194, 0.2988
    tenors = np.arange(0.1, 3, 0.05)

    quote = ql.QuoteHandle(ql.SimpleQuote(spot))
    heston_process = ql.HestonProcess(flat_ts, dividend_ts, quote, v0, kappa, theta, sigma, rho)
    model = ql.HestonModel(heston_process)

    european_option.setPricingEngine(ql.AnalyticHestonEngine(model))
    hs_price = european_option.NPV()
    prices.append(hs_price)

#########################################
# STORE THE GENERATED VALUES OF THE STOCK
#########################################

length = 1
n_timesteps = [5,10,20,40,80,160]
list_stock_prices = []
for timestep in n_timesteps:
    times = ql.TimeGrid(length, timestep)
    dimension = heston_process.factors()

    rng = ql.GaussianRandomSequenceGenerator(ql.UniformRandomSequenceGenerator(dimension * timestep, ql.UniformRandomGenerator()))
    seq = ql.GaussianMultiPathGenerator(heston_process, list(times), rng, False)

    df_spot, df_vol = generate_multi_paths_df(seq, 1)
    df_spot.head()

    stock_prices = df_spot.values.tolist()
    list_stock_prices.append(stock_prices)
print(list_stock_prices[0][0][1])
print("The theoretical price is ", prices)

#######################################################

from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
poly = PolynomialFeatures(degree=2, include_bias=False)
poly_features = poly.fit_transform(strikes.reshape(-1, 1))
mymodel_1 = np.poly1d(np.polyfit(strikes,prices, 2))
myline_1 = np.linspace(80, 120, 161)
plt.scatter(strikes,prices)
plt.plot(myline_1, mymodel_1(myline_1))
plt.title('T = 1Y')
plt.xlabel('Strikes')
plt.ylabel('Market Prices')

plt.show()

#######################################################
# 1. Calculate the Dupire volatility
#######################################################

spot_quote = ql.QuoteHandle(ql.SimpleQuote(spot))

market_data.setInterpolation("bicubic")
local_vol_handle = ql.BlackVolTermStructureHandle(market_data)
local_vol = ql.LocalVolSurface(local_vol_handle, flat_ts, dividend_ts, spot_quote)
local_vol.enableExtrapolation()

# Plot the Dupire surface
plot_vol_surface(local_vol, funct='localVol')
plt.title('Local volatility surface')

######################################################################################################
# Create pure Heston model using calibrated params (in our case perturbed). We aim to fix this difference
######################################################################################################

v0, kappa, theta, rho, sigma, spot = 0.01, 1.5, 0.01, -0.1, 0.3, 100
feller = 2 * kappa * theta - sigma ** 2

heston_process = ql.HestonProcess(flat_ts, dividend_ts, spot_quote, v0, kappa, theta, sigma, rho)
heston_model = ql.HestonModel(heston_process)

heston_handle = ql.HestonModelHandle(heston_model)
heston_vol_surface = ql.HestonBlackVolSurface(heston_handle)

# Plot the vol surface
plot_vol_surface([market_data, heston_vol_surface])
plt.title('Implied (blue) and Heston (orange) volatility surfaces')


##############################################################################################################
# We now define some functions that will be useful for the implementation of the particle method.
##############################################################################################################

def gauss_const(epsilon):
    return 1/(epsilon*np.sqrt(np.pi*2))

def gauss_exp(Xi, Xj, epsilon):
    num =  - 0.5*np.square(np.add(Xj,-Xi))
    den = epsilon*epsilon
    return num/den

def kernel_function(epsilon,Xi,Xj):
    const = gauss_const(epsilon)
    gauss_val = const*np.exp(gauss_exp(Xi,Xj,epsilon))
    return gauss_val

def g(Y):
    return np.exp(Y)

def g_squared(Yi):
    return np.square(np.exp(Yi))

def conditionalexpectation(KernelFunction,g_squared,delta):
    numerator = np.sqrt(np.sum(KernelFunction)+delta)
    S = np.multiply(KernelFunction,g_squared)
    denomenator = np.sqrt(np.sum(S)+delta)
    return numerator/denomenator

In [None]:
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression

# World State for Vanilla Pricing
spot = 100
rate = 0.00
today = ql.Date(5, 2, 2025)
calendar = ql.NullCalendar()
day_count = ql.Actual365Fixed()

# Setting up the flat risk-free curves
riskFreeCurve = ql.FlatForward(today, rate, ql.Actual365Fixed())

#flat_ts = ql.YieldTermStructureHandle(riskFreeCurve)
#dividend_ts = ql.YieldTermStructureHandle(riskFreeCurve)

#EXTRAS
ql.Settings.instance().evaluationDate = today
flat_ts = ql.YieldTermStructureHandle(ql.FlatForward(today, rate, day_count))
dividend_ts = ql.YieldTermStructureHandle(ql.FlatForward(today, rate, day_count))

n_timesteps = [5,10,20,40,80,160]
n_particles = 1000
delta = 0.01
h_1 = 0.1
t_0 = 0
T  = 1.0

# Initialize heston parameters
v0, theta, rho, sigma, spot = 0.0094, 0.01, -0.1, 0.3, 100

r = 0

results_by_kappa = {}

for kappa in [1.5,6,18]:

   dates, strikes, vols, feller = create_vol_surface_mesh_from_heston_params(today, calendar, spot, 0.0094, kappa+0.1, 0.0137, -0.1194, 0.2988, flat_ts, dividend_ts)

   market_data = ql.BlackVarianceSurface(today, calendar, dates, strikes, vols, day_count)

   plot_vol_surface(market_data)
   plt.title('Artificial "market" volatility surface')

# For given maturity 1Y, plot the volatility against Strikes
   v0, k, theta, rho, sigma = 0.0094, kappa+0.1, 0.0137, -0.1194, 0.2988

   strikes = np.linspace(70,130,58)
   tenors = np.arange(0.1, 3, 0.05)

   quote = ql.QuoteHandle(ql.SimpleQuote(spot))
   heston_process = ql.HestonProcess(flat_ts, dividend_ts, quote, v0, k, theta, sigma, rho)
   model = ql.HestonModel(heston_process)
   heston_handle = ql.HestonModelHandle(model)
   heston_vol_surface = ql.HestonBlackVolSurface(heston_handle)

   data = []
   for strike in strikes:
     data.append([heston_vol_surface.blackVol(tenor, strike) for tenor in tenors])

   expiration_dates = [calendar.advance(today, ql.Period(int(365*t), ql.Days)) for t in tenors]
   implied_vols = ql.Matrix(data)
   feller = 2 * k * theta - sigma ** 2

# black_var_surface = ql.BlackVarianceSurface(
#     today, calendar,
#     expiration_dates, strikes,
#     implied_vols, day_count)

   strikes = np.linspace(40, 200, 161)
   strikes_grid = np.arange(strikes[0], strikes[-1],10)
   expiry = 1.0 # years
# implied_vols = [market_data.blackVol(expiry, s)
#                 for s in strikes_grid] # can interpolate here
   implied_vols = [heston_vol_surface.blackVol(1, s) for s in strikes_grid]
   fig, ax = plt.subplots()
   ax.plot(strikes_grid, implied_vols, label="Implied Volatility Surface")
   ax.set_xlabel("Strikes", size=12)
   ax.set_ylabel("Volatilities", size=12)
   legend = ax.legend(loc="upper right")

   plt.show()

#####################################################################################################
# Construct the European Option (these are the true values that we will be comparing our results to)
#####################################################################################################

   option = ql.Option.Call
   strikes = np.linspace(80,120,101)
   expiration= calendar.advance(today, ql.Period(int(365), ql.Days))
   prices=[]
   for s in strikes:
    payoff = ql.PlainVanillaPayoff(option, s)
    exercise = ql.EuropeanExercise(expiration)
    european_option = ql.VanillaOption(payoff, exercise)
    v0, k, theta, rho, sigma = 0.0094, kappa+0.1, 0.0137, -0.1194, 0.2988
    tenors = np.arange(0.1, 3, 0.05)

    quote = ql.QuoteHandle(ql.SimpleQuote(spot))
    heston_process = ql.HestonProcess(flat_ts, dividend_ts, quote, v0, k, theta, sigma, rho)
    model = ql.HestonModel(heston_process)

    european_option.setPricingEngine(ql.AnalyticHestonEngine(model))
    hs_price = european_option.NPV()
    prices.append(hs_price)

################################################################
# STORE THE GENERATED VALUES OF THE STOCK AS A PANDAS DATAFRAME
################################################################

   length = 1
   n_timesteps = [5,10,20,40,80,160]
   list_stock_prices = []
   for timestep in n_timesteps:
      times = ql.TimeGrid(length, timestep)
      dimension = heston_process.factors()

      rng = ql.GaussianRandomSequenceGenerator(ql.UniformRandomSequenceGenerator(dimension * timestep, ql.UniformRandomGenerator()))
      seq = ql.GaussianMultiPathGenerator(heston_process, list(times), rng, False)

      df_spot, df_vol = generate_multi_paths_df(seq, 1)
      df_spot.head()

      stock_prices = df_spot.values.tolist()
      list_stock_prices.append(stock_prices)
   print(list_stock_prices[0][0][1])
   print("The theoretical price is ", prices)
   poly = PolynomialFeatures(degree=2, include_bias=False)
   poly_features = poly.fit_transform(strikes.reshape(-1, 1))
   mymodel_1 = np.poly1d(np.polyfit(strikes,prices, 2))
   myline_1 = np.linspace(80, 120, 161)
   plt.scatter(strikes,prices)
   plt.plot(myline_1, mymodel_1(myline_1))
   plt.title('T = 1Y')
   plt.xlabel('Strikes')
   plt.ylabel('Market Prices')

   plt.show()

   spot_quote = ql.QuoteHandle(ql.SimpleQuote(spot))

   market_data.setInterpolation("bicubic")
   local_vol_handle = ql.BlackVolTermStructureHandle(market_data)
   local_vol = ql.LocalVolSurface(local_vol_handle, flat_ts, dividend_ts, spot_quote)
   local_vol.enableExtrapolation()

   plot_vol_surface(local_vol, funct='localVol')
   plt.title('Local volatility surface')

   for m in range(len(n_timesteps)):
    theta, rho, sigma, spot = 0.01, -0.1, 0.3, 100

    print(m)
    M = n_timesteps[m]
    dt = (T - t_0)/M
    tk = np.arange(t_0, T+dt, dt)
    err_em = np.zeros((n_particles,len(tk)))

    particles_X_values = 100 * np.ones(n_particles)
    particles_Y_values = 0.0094 * np.ones(n_particles)

    data = np.ones((M+1 ,n_particles))
    X_k = np.ones((M+1 ,n_particles))
    Y_k = np.ones((M+1 ,n_particles))

    volatility = np.ones((M+1 ,M+1))
    X_k[0] = particles_X_values
    Y_k[0] = particles_Y_values

    lsv_i = np.ones(M+1)
    strong_errors = []
    for k in range(1, M+1):

        t_k = tk[k]
        Xj = X_k[k - 1]
        Yj = Y_k[k - 1]
        dW1 = np.random.normal(0.0,np.sqrt(dt),len(Yj))

        dW2 = rho * dW1 + math.sqrt(1 - rho ** 2) * dW1


        for i in range (n_particles):
            Xj = [xj for xj in X_k[k - 1] if gauss_const(h_1)*np.exp(- 0.5*np.square(xj-X_k[k - 1][i])/(h_1*h_1))>0.001]
            Xi = np.full((1, len(Xj)),particles_X_values[i])
            Yj = [Y_k[k - 1][m] for m in range(n_particles) if gauss_const(h_1)*np.exp(- 0.5*np.square(X_k[k - 1][m]-particles_X_values[i])/(h_1*h_1))>0.001]

            theta = np.full((1,len(Yj)),0.01)

            alpha_i = conditionalexpectation(kernel_function(h_1, Xi, Xj),(Yj),delta)

            tenor = (t_k)
            strike = particles_X_values[i]
            sigma_dup = np.abs([local_vol.localVol(tenor, strike)][0])

            a = np.multiply(particles_X_values[i],alpha_i)*sigma_dup
            b = np.multiply(np.sqrt(particles_Y_values[i]),dW1[i])
            c = np.multiply(a,b)

            X_k[k][i] = particles_X_values[i] + (r*particles_X_values[i])*dt + c

            err_em[i,k]  = abs(list_stock_prices[m][0][k] - X_k[k][i])

        av_err_em = np.sum(err_em, axis = 1)/n_particles
        strong_errors.append(av_err_em)
        particles_X_values = X_k[k]

        theta= np.full((1,len(Y_k[k - 1])),0.01)
        Y_k[k] = np.abs(Y_k[k - 1] + kappa*(np.add(theta,-Y_k[k - 1]))*dt + sigma*np.sqrt(Y_k[k - 1])*dW2)

        particles_Y_values = Y_k[k]

    results_by_kappa[kappa] = max(strong_errors[0])


sns.set_theme(style="whitegrid")
xlog = np.log([0.2, 0.1, 0.05, 0.025, 0.0125, 0.00625])
plt.figure()

for kappa, errors in results_by_kappa.items():
    errors = np.asarray(errors)
    ylog = np.log(errors)

    slope, intercept, r_value, p_value, std_err = linregress(xlog, ylog)

    y_fit = slope * xlog + intercept
    y_ref = 0.5 * xlog + intercept

    # 95% CI for the slope
    n = len(xlog)
    dof = n - 2
    alpha = 0.05
    t_crit = t.ppf(1 - alpha/2, dof)
    margin = t_crit * std_err
    ci_lower, ci_upper = slope - margin, slope + margin

    # Plot data and fitted line
    plt.plot(xlog, ylog, 'x', label=f'κ={kappa}')
    plt.plot(xlog, y_fit, label=f'κ={kappa} fit (slope={slope:.3f}, 95% CI [{ci_lower:.3f}, {ci_upper:.3f}])')
    # reference slope (dashed)
    plt.plot(xlog, y_ref, linestyle='dashed', label=f'κ={kappa} ref slope 0.5')

plt.title('Strong Convergence of Euler–Maruyama Scheme')
plt.xlabel('$\\log(\\Delta t)$')
plt.ylabel('$\\log(\\text{Error})$')
plt.legend(loc='upper left')
plt.grid(which='both', color='gray', linestyle='--', linewidth=0.5, alpha=0.5)
plt.show()