# *Heston Local Stochastic Volatility Model*

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
os.chdir("/content/drive/MyDrive/Colab Notebooks")
!ls

In [None]:
pip install py_vollib



In [None]:
import sys
sys.path.append("..")
from Utility import Utility
from SV import HestonModel
from LSV_MC import LocalStochasticVolatilityModel, PMParameters
from Utility import CallOption, MarketState, HestonParameters, plot_distribution_confidence_intervals, implied_volatilities_for_appropriate_strikes, time_to_boundary_strikes_mappings, plot_iv, plot_lv_to_one_convergence_report, plot_iv_to_heston_market_convergence_report, dupire_for_appropriate_strikes
from LSV_MC.Presentation_functions import LF_dynamics_vol_of_vol
import matplotlib.pyplot as plt
plt.style.use('https://github.com/dhaitz/matplotlib-stylesheets/raw/master/pitayasmoothie-light.mplstyle')
from tqdm.auto import tqdm, trange
from dataclasses import dataclass
import numpy as np
from copy import deepcopy

  return jit(*jit_args, **jit_kwargs)(fun)


In [None]:
#from py_vollib.black_scholes.implied_volatility import implied_volatility as lets_be_rational_iv

$$dS_t = S_t (rdt +  \sigma(t, S_t) \sqrt{V_t} dW^{S}_t), \qquad S_0=s_0,$$


$$dV_t = \kappa (\bar v - V_t) dt + \gamma \sqrt{V_t} dW^{V}_t, \quad V_0 = v_0,$$

$$dW^{S}_t dW^{V}_t = \rho dt.$$

**Market state** is given by an initial asset price $S_0$, constant interest rate $r$.

In [None]:
market_state = MarketState(asset_price = 1., interest_rate = 0.0)
market_state

MarketState(asset_price=1.0, interest_rate=0.0)

**Heston parameters** are given by a tuple $(\kappa, \gamma, \rho, \bar v, v_0)$, where

$\kappa - \text{speed of mean reversion,}$

$\gamma - \text{vol of vol,}$

$\rho - \text{correlation between } dW^{S}_t \text{and } dW^{V}_t,$

$\bar v - \text{long term mean,}$

$v_0  - \text{initial variance,}$

$\sigma(t, s)  - \text{leverage function.}$


In [None]:
# Market Heston parameters
market_heston_parameters = HestonParameters(kappa=1, gamma=0.3, rho=-0.25, vbar=0.055, v0=0.0225)
# market_heston_parameters = HestonParameters(kappa=0.6, gamma=0.3, rho=-0.25, vbar=0.0625, v0=0.0225)
market_heston_parameters

HestonParameters(kappa=1, gamma=0.3, rho=-0.25, vbar=0.055, v0=0.0225)

In [None]:
print("Feller condition: {}".format(2 * market_heston_parameters.kappa * market_heston_parameters.vbar - market_heston_parameters.gamma**2))

Feller condition: 0.020000000000000004


**Heston model**

In [None]:
# Market Heston model
market_heston_model = HestonModel(heston_parameters=market_heston_parameters,
                                  state=market_state,
                                  random_state=42)

print("Heston parameters: {}".format(market_heston_model.heston_parameters))
print("Market state: {}".format(market_heston_model.state))
print("Random state: {}".format(market_heston_model.random_state))

Heston parameters: HestonParameters(kappa=1, gamma=0.3, rho=-0.25, vbar=0.055, v0=0.0225)
Market state: MarketState(asset_price=1.0, interest_rate=0.0)
Random state: 42


### 2. Market generation

**2.1 Market prices**

In [None]:
number_of_years = 2
number_of_days_per_month = 21
number_of_days_per_year = number_of_days_per_month * 12
pillar_indices = [1, 2, 3, 6, 9, 12, 18, 24]
maturities = [indice * number_of_days_per_month / number_of_days_per_year for indice in pillar_indices]

In [None]:
N_STRIKES = 51
N_TIMES = len(maturities)

options = CallOption(strike=np.linspace(0.5, 1.5, N_STRIKES), #
                     expiration_time=np.array(maturities))


In [None]:
cos_method_call_option_prices = np.empty((N_TIMES, N_STRIKES))
cos_method_put_option_prices = np.empty((N_TIMES, N_STRIKES))


for i, t in enumerate(options.expiration_time):
    cos_method_call_option_prices[i] = market_heston_model.price_vanilla(options.is_call,
                                                                  time_to_maturity=t,
                                                                  strikes=options.strike,
                                                                  method='cos')

for i, t in enumerate(options.expiration_time):
    cos_method_put_option_prices[i] = market_heston_model.price_vanilla(not options.is_call,
                                                                 time_to_maturity=t,
                                                                 strikes=options.strike,
                                                                 method='cos')

**2.2 Market implied volatility**

In [None]:
q_min, q_max = 0.0005, 0.0005

time_ = np.linspace(1 / (number_of_days_per_year * number_of_years), np.max(maturities), number_of_days_per_year * number_of_years)
strikes_ = np.linspace(0.1, 6, 1000)

intervals_with_digitals_market = market_heston_model.distribution_confidence_intervals(time = time_,
                                                                                       strikes = strikes_,
                                                                                       q_min = q_min,
                                                                                       q_max = q_max,
                                                                                       method="cos")

In [None]:
t_to_Kmin, t_to_Kmax = time_to_boundary_strikes_mappings(intervals_with_digitals_market)

t_to_K_min_and_K_max = lambda t: (t_to_Kmin(t), t_to_Kmax(t)) # t -> (K_min, K_max)

implied_vol_surface_market = implied_volatilities_for_appropriate_strikes(options.strike,
                                                                          options.expiration_time,
                                                                          market_state.asset_price,
                                                                          market_state.interest_rate,
                                                                          t_to_K_min_and_K_max,
                                                                          cos_method_call_option_prices,
                                                                          cos_method_put_option_prices,
                                                                          from_parity = False)


ZeroDivisionError: ignored

In [None]:
# vf = np.vectorize(lets_be_rational_iv)
# implied_vol_surface_market_ = np.zeros(cos_method_call_option_prices.shape)

# for i, t in enumerate(options.expiration_time):
#     strikes = options.strike
#     idx_call = strikes >= market_state.asset_price
#     idx_put = strikes < market_state.asset_price
#     put_prices = cos_method_call_option_prices[i] + options.strike * np.exp(- market_state.interest_rate * t) - market_state.asset_price
#     implied_vol_surface_market_[i, idx_call] = vf(cos_method_call_option_prices[i][idx_call], market_state.asset_price, strikes[idx_call], t, market_state.interest_rate, 'c')
#     implied_vol_surface_market_[i, idx_put] = vf(put_prices[idx_put], market_state.asset_price, strikes[idx_put], t, market_state.interest_rate, 'p')

# variance = implied_vol_surface_market_**2 * np.tile(options.expiration_time.reshape(-1, 1), (1, options.strike.shape[0]))
# variance_splined = RectBivariateSpline(options.expiration_time, np.log(options.strike), variance, s=0.1)


In [None]:
fig = plt.figure(figsize=(10, 7))
for i, t in enumerate(implied_vol_surface_market.keys()):
    if i % 3 == 0:
        plt.plot(implied_vol_surface_market[t]['strikes'], implied_vol_surface_market[t]['iv'], label="T = {}".format(round(t, 2)), marker = "^")
plt.title("Market implied volatilities", fontsize=20)
plt.ylabel("Iimplied vol", fontsize=15)
plt.xlabel("Strikes", fontsize=15)
plt.legend(fontsize=15)
plt.show()

**2.3. Market Dupire local volatility**

$$ \sigma^2(T,y) = \frac{w'_T}{ 1 - \frac{y}{w} w'_T + \frac14 (-\frac14 - \frac1w + \frac{y^2}{w^2}) (w'_y)^2  +\frac12 w''_{yy}},$$


where $w(T,y) =  \sigma^2_{BS}(T, s_0 e^{rT + y}) T$ is implied variance.

In [None]:
# # variance_splined = var_spline(implied_vol_surface_market, options, market_state)

# # TO FIX:
# # NOTE: the formula for variance_splined is correct only for zero interest rate! Using market_state with
# # non zero inrerest rate leads to incorrect calculations.


# dV_dt = variance_splined.partial_derivative(dx=1, dy=0)
# dV_dlogK = variance_splined.partial_derivative(dx=0, dy=1)
# d2V_dlogK2 = variance_splined.partial_derivative(dx=0, dy=2)

# local_vol_ = lambda t, logK: dV_dt(t, logK, grid=False) / (
#                 1 - logK / variance_splined(t, logK, grid=False) * dV_dlogK(t, logK, grid=False)
#                 + 0.25 * (-0.25 - 1 / variance_splined(t, logK, grid=False) + logK * logK / (variance_splined(t, logK, grid=False)**2)) * dV_dlogK(t, logK, grid=False)**2
#                 + 0.5 * d2V_dlogK2(t, logK, grid=False))

# local_vol__ = lambda t, s: np.sqrt(local_vol_(t, np.log(s)))

In [None]:
local_vol__ = dupire_for_appropriate_strikes(implied_vol_surface_market, market_state, options.strike)

In [None]:
fig = plt.figure(figsize=(10, 10))
x, y = np.meshgrid(options.strike, options.expiration_time)

ax1 = fig.add_subplot(projection='3d')
ax1.plot_surface(x, y, local_vol__(y, x))
for t, iv_strikes in implied_vol_surface_market.items():
    ax1.plot(iv_strikes["strikes"],
             [t] * len(iv_strikes["strikes"]),
             local_vol__(t, iv_strikes["strikes"]),
             linewidth=2, marker="o", markersize=2, label="$\sigma_{Dup}(t, s)$, t = " + str(round(t, 2)))
ax1.set_title('Market Dupire local volatility surface', fontsize=20)
ax1.set_xlabel('Strike, K', fontsize=15)
ax1.set_ylabel('Time to expiration, T', fontsize=15)
ax1.set_zlabel('Local volatility $\sigma_{Dup}(t, s)$', fontsize=15)
ax1.legend()

### 3. Heston LSV model calibration (market stochastic component)

**3.1 Heston LSV model initialization**

In [None]:
market_heston_lsv_model = LocalStochasticVolatilityModel(state=market_state,
                                                         stochastic_model=market_heston_model,
                                                         random_state=42)

In [None]:
print("Market Heston LSV parameters: {}".format(market_heston_lsv_model.stochastic_model.heston_parameters))
print("Market state: {}".format(market_heston_lsv_model.state))
print("Random state: {}".format(market_heston_lsv_model.random_state))

**3.2 Heston LSV model calibration**

In [None]:
N = 2**15

hpar = PMParameters(
    n_particles=N,
    n_steps=int(number_of_years * number_of_days_per_year),
    kernel_scaling_factor=0.5,
    kernel_t_min=0.25,
    n_interp=100
)

expectation_method = 'quadratic'

In [None]:
market_heston_lsv_model.calibrate(particle_method_parameters=hpar,
                                  options=options,
                                  dupire_lv_function=local_vol__,
                                  expectation_method = expectation_method)

In [None]:
fig = plt.figure(figsize=(10, 10))
x, y = np.meshgrid(options.strike, options.expiration_time, indexing="ij")

ax1 = fig.add_subplot(projection='3d')
indexes = [int(number_of_days_per_month * indice) - 1 for indice in pillar_indices]
ax1.plot_surface(x, y, market_heston_lsv_model.sigma[:, indexes])

for i, (t, iv_strikes) in enumerate(implied_vol_surface_market.items()):
    ax1.plot(iv_strikes["strikes"],
             [t] * len(iv_strikes["strikes"]),
             market_heston_lsv_model.sigma[:, indexes[i]][np.where(np.in1d(options.strike, iv_strikes["strikes"]))[0]],
             linewidth=2, marker="o", markersize=2, label="$\sigma(t, s)$, t = " + str(round(t, 2)))

ax1.set_title('Leverage function $\sigma(t, s)$', fontsize=20)
ax1.set_xlabel('Strike, K', fontsize=15)
ax1.set_ylabel('Time to expiration, T', fontsize=15)
ax1.set_zlabel('Local volatility $\sigma_{Dup}(t, s)$', fontsize=15)
ax1.set_zlim(np.quantile(market_heston_lsv_model.sigma, (.001, .999)))
ax1.legend()
plt.show()


**3.3 Heston LSV  $\sigma(t, s)$  convergence to constant 1**

In [None]:
fig = plt.figure(figsize=(10, 7))

indexes = [number_of_days_per_month * indice - 1 for indice in pillar_indices]
for i, t in enumerate(options.expiration_time):
    plt.plot(options.strike, market_heston_lsv_model.sigma[:, indexes[i]], label="t = {}".format(round(t, 2)), marker = "^")
plt.plot(options.strike, np.ones(options.strike.shape), label="Target leverage", marker = ".", color="black")
plt.title("Leverage functions $\sigma(t, s)$ for different $t$", fontsize=15)
plt.ylabel("Leverage function, $\sigma(t, s)$", fontsize=15)
plt.xlabel("Strikes, K", fontsize=13)
plt.legend(fontsize=15)
plt.show()

In [None]:
Ns = [2 ** i for i in range(12, 16, 1)]
indexes = [number_of_days_per_month * indice - 1 for indice in pillar_indices]
idxs = [0, len(indexes) // 2, len(indexes) - 1]

times = [indexes[i] / number_of_days_per_year for i in idxs]
n_steps = number_of_years * number_of_days_per_year
dupire_lv_function=local_vol__

plot_lv_to_one_convergence_report(Ns, idxs, times, n_steps, options, dupire_lv_function, PMParameters,
                                  market_heston_lsv_model, indexes, expectation_method = expectation_method)

**3.3 Heston LSV implied volatility**

In [None]:
q_min, q_max = 0.0005, 0.0005
intervals_with_sampling = market_heston_lsv_model.distribution_confidence_intervals(strikes = strikes_,
                                                                                    q_min = q_min,
                                                                                    q_max = q_max)

In [None]:
plot_distribution_confidence_intervals(intervals_with_sampling = intervals_with_sampling,
                                       intervals_with_digitals=intervals_with_digitals_market,
                                       title_1="COS (market)", title_2 = "Sampling (calibrated LSV)")

In [None]:
lsv_call_option_prices = np.empty((N_TIMES, N_STRIKES))
lsv_put_option_prices = np.empty((N_TIMES, N_STRIKES))


for i, t in tqdm(enumerate(options.expiration_time)):
    lsv_call_option_prices[i] = market_heston_lsv_model.price_vanilla(options.is_call, time_to_maturity=t, strikes=options.strike)

for i, t in tqdm(enumerate(options.expiration_time)):
    lsv_put_option_prices[i] = market_heston_lsv_model.price_vanilla(not options.is_call, time_to_maturity=t, strikes=options.strike)

In [None]:
t_to_Kmin, t_to_Kmax = time_to_boundary_strikes_mappings(intervals_with_sampling)

t_to_K_min_and_K_max = lambda t: (t_to_Kmin(t), t_to_Kmax(t)) # t -> (K_min, K_max)

implied_vols_lsv_method = implied_volatilities_for_appropriate_strikes(strikes=options.strike,
                                                                      maturities=options.expiration_time,
                                                                      asset_price=market_state.asset_price,
                                                                      interest_rate=market_state.interest_rate,
                                                                      t_to_K_min_and_K_max=t_to_K_min_and_K_max,
                                                                      call_option_prices=lsv_call_option_prices,
                                                                      put_option_prices=lsv_put_option_prices,
                                                                      from_parity=False)

In [None]:
plot_iv(implied_vols_lsv_method, implied_vol_surface_market, "IV from Heston LSV (calibrated)", "IV from COS method (market)")

**3.4 Heston LSV implied volatility convergence**

#### Weights
For $K_i$ we use normalized weights $\frac{w_i}{||w_i||}$, where
$$w_i = P(S_t > K_i) \cdot I[K_i > S_0] + P(S_t < K_i) \cdot I[K_i < S_0]$$
or
$$w_i = f_{S_t}(K_i)$$
where $f_{S_t}(\cdot)$ is density function for r.v.$S_t$.

In [None]:
Ns = [2 ** i for i in range(10, 18, 1)]
indexes = options.expiration_time
idxs = [0, len(indexes) // 2, len(indexes) - 1]

q_min = 0.0005
q_max = 0.0005


times = indexes[idxs]#[indexes[i] / number_of_days_per_year for i in idxs]

time_ = np.linspace(1 / (number_of_days_per_year * number_of_years),
                    np.max(maturities),
                    number_of_days_per_year * number_of_years)
strikes_ = np.linspace(0.1, 6, 1000)


plot_iv_to_heston_market_convergence_report(Ns,idxs,times,time_,strikes_,n_steps,options,dupire_lv_function,
                                            implied_vol_surface_market, market_heston_model, market_state, PMParameters,
                                            market_heston_lsv_model, expectation_method = expectation_method,
                                            weighted = 'proba', q_min = q_min, q_max = q_max)

**3.5 Effect of Heston parameters on leverage function $\sigma(t, s)$**

In [None]:
vol_of_lols = [0.2, 0.25, 0.3, 0.35, 0.4] #0.01, 0.03, 0.05,
rhos = [-0.4, -0.25, -0.2, 0, 0.1, 0.2]

In [None]:

heston_parameters = HestonParameters(kappa=1, gamma=0.3, rho=-0.25, vbar=0.055, v0=0.0225)
heston_parameters

In [None]:
indexes = [number_of_days_per_month * indice - 1 for indice in pillar_indices]
n_steps = number_of_years * number_of_days_per_year

LF_dynamics_vol_of_vol(vol_of_lols, indexes, options, n_steps, market_state, local_vol__, market_heston_parameters, param_type = 'vov')

In [None]:
indexes = [number_of_days_per_month * indice - 1 for indice in pillar_indices]
n_steps = number_of_years * number_of_days_per_year

LF_dynamics_vol_of_vol(rhos, indexes, options, n_steps, market_state, local_vol__, market_heston_parameters, param_type = 'rho')