In [None]:
import math
import numpy as np
import pandas as pd
from typing import Optional
import plotly.graph_objects as go
from dataclasses import dataclass
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.metrics import mean_squared_error, r2_score


@dataclass
class HestonBatesParams:
    """Parameters for the Heston-Bates model"""
    kappa: float     # Mean reversion speed of variance
    theta: float     # Long-term variance
    sigma: float     # Volatility of variance
    rho: float       # Correlation between asset and variance
    v0: float        # Initial variance
    lambda_: float   # Jump intensity
    mu_j: float      # Mean jump size
    sigma_j: float   # Jump size volatility

class HestonBatesMC:
    def __init__(
        self,
        S0: float,                 # Initial stock price
        K: float,                  # Strike price
        T: float,                  # Time to maturity
        r: float,                  # Risk-free rate
        params: HestonBatesParams, # Heston Jump parameters
        n_paths: int = 100_000,    # Number of simulation paths
        n_steps: int = 252         # Number of time steps
    ):
        self.S0 = S0
        self.K = K
        self.T = T
        self.r = r
        self.params = params
        self.n_paths = n_paths
        self.n_steps = n_steps
        self.dt = T / n_steps
        self.simulated_paths = None

    def simulate_paths(self) -> np.ndarray:
        """
        Simulate stock price paths using the Heston-Bates model
        Returns: array of shape (n_paths, n_steps + 1)
        """
        # Initialize arrays
        S = np.zeros((self.n_paths, self.n_steps + 1))
        V = np.zeros((self.n_paths, self.n_steps + 1))
        S[:, 0] = self.S0
        V[:, 0] = self.params.v0

        # Generate correlated random numbers
        np.random.seed(0)
        z1 = np.random.standard_normal((self.n_paths, self.n_steps))
        np.random.seed(1)
        z2 = self.params.rho * z1 + np.sqrt(1 - self.params.rho**2) * \
             np.random.standard_normal((self.n_paths, self.n_steps))

        # Generate jump process
        np.random.seed(2)
        N = np.random.poisson(self.params.lambda_ * self.dt, (self.n_paths, self.n_steps))
        np.random.seed(3)
        J = np.random.normal(self.params.mu_j, self.params.sigma_j, (self.n_paths, self.n_steps))

        # Simulate paths
        for t in range(self.n_steps):
            # Variance process
            V[:, t] = np.maximum(V[:, t], 0)  # Ensure positive variance
            dV = self.params.kappa * (self.params.theta - V[:, t]) * self.dt + \
                 self.params.sigma * np.sqrt(V[:, t] * self.dt) * z2[:, t]
            V[:, t + 1] = V[:, t] + dV

            # Price process with jumps
            dS = (self.r - 0.5 * V[:, t]) * self.dt + \
                 np.sqrt(V[:, t] * self.dt) * z1[:, t] + \
                 J[:, t] * N[:, t]
            S[:, t + 1] = S[:, t] * np.exp(dS)

        self.simulated_paths = S
        
        return S

    def price_european_option(self, option_type: str = 'call') -> tuple[float, float]:
        """
        Price European options using Monte Carlo simulation
        Args:
            option_type: 'call' or 'put'
        Returns:
            tuple of (option price, standard error)
        """
        if self.simulated_paths is None:
            print('new simulation')
            S = self.simulate_paths()
        else:
            S = self.simulated_paths
        ST = S[:, -1]  # Terminal stock prices

        # Calculate payoffs
        if option_type.lower() == 'call':
            payoffs = np.maximum(ST - self.K, 0)
        else:  # put
            payoffs = np.maximum(self.K - ST, 0)

        # Discount payoffs
        discount_factor = np.exp(-self.r * self.T)
        discounted_payoffs = payoffs * discount_factor

        # Calculate price and standard error
        option_price = np.mean(discounted_payoffs)
        std_error = np.std(discounted_payoffs) / np.sqrt(self.n_paths)

        return option_price, std_error

def _phi(x: float) -> float:
    """
    Standard normal probability density function (PDF).
    """
    return 1.0 / math.sqrt(2.0 * math.pi) * math.exp(-0.5 * x * x)

def _Phi(z: float) -> float:
    """
    Cumulative distribution function (CDF) for the standard normal distribution.

    We can use math.erf for a more direct implementation:
        Phi(z) = 0.5 * [1 + erf(z / sqrt(2))]
    """
    return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0)))

def black_scholes_price(S: float,
                        K: float,
                        T: float,
                        r: float,
                        sigma: float,
                        option_type: str = 'call') -> float:
    """
    Returns the Black–Scholes price of a European call or put.

    :param S: Current underlying price
    :param K: Strike price
    :param T: Time to maturity (in years)
    :param r: Risk-free interest rate (annualized)
    :param sigma: Volatility (annualized)
    :param option_type: 'call' or 'put'
    :return: Theoretical option price
    """
    if T <= 0:
        # Edge case: if T = 0, payoff is max(S-K, 0) or max(K-S, 0)
        if option_type.lower() == 'call':
            return max(S - K, 0)
        else:
            return max(K - S, 0)

    d1 = (math.log(S / K) + (r + 0.5 * sigma * sigma) * T) / (sigma * math.sqrt(T))
    d2 = d1 - sigma * math.sqrt(T)

    if option_type.lower() == 'call':
        price = S * _Phi(d1) - K * math.exp(-r * T) * _Phi(d2)
    else:  # put
        price = K * math.exp(-r * T) * _Phi(-d2) - S * _Phi(-d1)

    return price

def implied_volatility(S: float,
                       K: float,
                       T: float,
                       r: float,
                       option_market_price: float,
                       option_type: str = 'call',
                       tol: float = 1e-8,
                       max_iterations: int = 200) -> float:
    """
    Computes the implied volatility (sigma) given market parameters and an observed market price
    for a European call or put using a bisection method.

    :param S: Current underlying price
    :param K: Strike price
    :param T: Time to maturity (in years)
    :param r: Risk-free interest rate (annualized)
    :param option_market_price: Observed market price of the option
    :param option_type: 'call' or 'put'
    :param tol: Tolerance for the bisection search
    :param max_iterations: Maximum number of iterations in the bisection search
    :return: Implied volatility (annualized)
    """

    # Some boundary checks:
    # If the market price is 0, implied vol is technically 0, but let's just return 0
    if option_market_price < 1e-15:
        return 0.0

    # Similarly, if the market price is so large it's above intrinsic + time value, 
    # we might guess volatility is quite large. We'll just start with an upper bound guess.
    
    # Lower and upper bounds for sigma
    vol_lower = 1e-10
    vol_upper = 5.0  # 500% annual volatility is often a large enough upper bound

    # Bisection method
    for i in range(max_iterations):
        vol_mid = 0.5 * (vol_lower + vol_upper)
        price_mid = black_scholes_price(S, K, T, r, vol_mid, option_type)

        # Check how close we are to the target price
        if abs(price_mid - option_market_price) < tol:
            return vol_mid

        # If price_mid is too high, we reduce volatility
        if price_mid > option_market_price:
            vol_upper = vol_mid
        else:
            vol_lower = vol_mid

    # If we exit the loop without converging, return the midpoint anyway
    return vol_mid


In [72]:

# Option parameters
S0 = 100.0     # Initial stock price
K = 100.0      # Strike price
T = 1.0        # Time to maturity (1 year)
r = 0.05       # Risk-free rate

# Create model instance
model = HestonBatesMC(S0, K, T, r, params)

model.simulate_paths()

# Price call and put options
call_price, call_se = model.price_european_option('call')
put_price, put_se = model.price_european_option('put')

In [None]:
params = HestonBatesParams(
    kappa=2.0,      # Mean reversion speed
    theta=0.04,     # Long-term variance
    sigma=0.3,      # Volatility of variance
    rho=-0.7,       # Correlation
    v0=0.02,        # Initial variance
    lambda_=0.4,    # Jump intensity
    mu_j=-0.05,     # Mean jump size
    sigma_j=0.05     # Jump size volatility
)

# Option parameters
S0 = 100.0     # Initial stock price
r = 0.05       # Risk-free rate

market_data = list()
for T in [5/250, 20/250, 60/250, 250/250]:

    model = HestonBatesMC(S0, K, T, r, params)
    model.simulate_paths()

    for K in np.linspace(start=90, stop=100, num=30):
        model.K = K
        put_price, put_se = model.price_european_option('put')

        iv = implied_volatility(S0, K, T, r, put_price, option_type='put')

        market_data.append((T, K, put_price, iv))
market_data_df = pd.DataFrame(market_data, columns=['T', 'K', 'P', 'IV'])

In [122]:
# Reshape data for the surface plot
x = sorted(market_data_df['K'].unique())
y = sorted(market_data_df['T'].unique())
z = market_data_df.pivot(index='T', columns='K', values='P').values

# Create the 3D surface plot
fig = go.Figure(data=[go.Surface(z=z, x=x, y=y)])

# Add titles
fig.update_layout(
    title="3D Surface Plot",
    scene=dict(
        xaxis_title="X Axis",
        yaxis_title="Y Axis",
        zaxis_title="Z Axis"
    ),
    width=1000, height=1000,
)

# Show the plot
fig.show()

In [123]:
X = np.log(market_data_df['K']/S0).to_frame('log_strike')
X['sqrt_T'] = np.sqrt(market_data_df['T'])
y = market_data_df['P']#.pow(2)

# Transform features to polynomial terms
degree = 2  # Degree of the polynomial
poly_features = PolynomialFeatures(degree=degree)
X_poly = poly_features.fit_transform(X)
X_poly = pd.DataFrame(X_poly)

In [124]:
# Fit a linear regression model on the transformed features
model = LinearRegression()
model.fit(X_poly, y)
Y_pred = model.predict(X_poly)

In [125]:
r2 = r2_score(y, Y_pred)
r2

0.9936286939881162

In [126]:
model.coef_

array([  0.        ,  15.28465551,   3.27707449, 120.87078992,
        27.96779637,   2.19401974])

In [128]:
plot_df = market_data_df.copy()
plot_df['poly'] = Y_pred

In [129]:
# Reshape data for the surface plot
x = sorted(plot_df['K'].unique())
y = sorted(plot_df['T'].unique())
z = plot_df.pivot(index='T', columns='K', values='poly').values

# Create the 3D surface plot
fig = go.Figure(data=[go.Surface(z=z, x=x, y=y)])

# Add titles
fig.update_layout(
    title="3D Surface Plot",
    scene=dict(
        xaxis_title="X Axis",
        yaxis_title="Y Axis",
        zaxis_title="Z Axis"
    ),
    width=1000, height=1000,
)

# Show the plot
fig.show()

In [130]:
plot_df

Unnamed: 0,T,K,P,IV,poly
0,0.004,90.000000,0.000501,0.515784,-0.034002
1,0.004,90.344828,0.000606,0.505797,-0.064420
2,0.004,90.689655,0.000721,0.495090,-0.091207
3,0.004,91.034483,0.000841,0.483379,-0.114405
4,0.004,91.379310,0.000965,0.470874,-0.134052
...,...,...,...,...,...
145,1.000,98.620690,4.975773,0.198965,5.098632
146,1.000,98.965517,5.101224,0.198668,5.239354
147,1.000,99.310345,5.228931,0.198376,5.382516
148,1.000,99.655172,5.359006,0.198091,5.528091


In [131]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go

# Sample DataFrame for Surface 1
data1 = {
    'x': [1, 1, 1, 2, 2, 2, 3, 3, 3],
    'y': [1, 2, 3, 1, 2, 3, 1, 2, 3],
    'z': [10, 20, 30, 40, 50, 60, 70, 80, 90]
}
df1 = pd.DataFrame(data1)

# Reshape data for Surface 1
x1 = sorted(df1['x'].unique())
y1 = sorted(df1['y'].unique())
z1 = df1.pivot(index='y', columns='x', values='z').values

# Sample DataFrame for Surface 2
data2 = {
    'x': [1, 1, 1, 2, 2, 2, 3, 3, 3],
    'y': [1, 2, 3, 1, 2, 3, 1, 2, 3],
    'z': [15, 25, 35, 45, 55, 65, 75, 85, 95]
}
df2 = pd.DataFrame(data2)

# Reshape data for Surface 2
x2 = sorted(df2['x'].unique())
y2 = sorted(df2['y'].unique())
z2 = df2.pivot(index='y', columns='x', values='z').values

# Create the 3D surface plots
fig = go.Figure()

# Add Surface 1
fig.add_trace(go.Surface(z=z1, x=x1, y=y1, colorscale="Viridis", name="Surface 1"))

# Add Surface 2
fig.add_trace(go.Surface(z=z2, x=x2, y=y2, colorscale="Cividis", name="Surface 2"))

# Update layout
fig.update_layout(
    title="3D Surface Plots",
    scene=dict(
        xaxis_title="X Axis",
        yaxis_title="Y Axis",
        zaxis_title="Z Axis"
    )
)

# Show the plot
fig.show()
