In [1]:
# Import the pandas library (aliased as 'pd') for working with tabular data structures
import pandas as pd

# Import the NumPy library (aliased as 'np') for numerical computing:
import numpy as np

# Import SciPy's 'griddata' function, which interpolates scattered (x, y, ...) data
from scipy.interpolate import griddata

# Import SciPy's 'least_squares' optimizer, which solves non-linear least-squares problems:
from scipy.optimize import least_squares

# Import SciPy's 'minimize_scalar' optimizer, which performs 1D optimization:
from scipy.optimize import minimize_scalar

In [2]:
# Read the training option-chain dataset from a CSV file into a pandas DataFrame.
option_chains_train = pd.read_csv("/kaggle/input/raw-option-chains/aapl_2016_2020.csv")

# Read the test option-chain dataset from a CSV file into a pandas DataFrame.
option_chains_test = pd.read_csv("/kaggle/input/raw-option-chains/aapl_2021_2023.csv")

# Read the Fed Funds rate history from a CSV file into a pandas DataFrame.
risk_free_rates = pd.read_csv("/kaggle/input/fed-funds-rates/FedFunds History.csv", sep=";")

  option_chains_train = pd.read_csv("/kaggle/input/raw-option-chains/aapl_2016_2020.csv")
  option_chains_test = pd.read_csv("/kaggle/input/raw-option-chains/aapl_2021_2023.csv")


In [3]:
# Define a function that cleans and standardizes a raw option-chain DataFrame
def clean_data(df):
    # Specify the exact subset of raw columns to keep from the original dataset
    cols = [" [QUOTE_DATE]", " [EXPIRE_DATE]", " [DTE]", " [C_IV]", " [C_BID]", " [C_ASK]",
            " [STRIKE]", " [P_BID]", " [P_ASK]", " [P_IV]", " [UNDERLYING_LAST]"]
    # Filter the DataFrame to retain only the selected columns
    df = df[cols]
    # Rename the columns to cleaner, standardized names without brackets or leading spaces
    df.columns = ["QUOTE_DATE", "EXPIRE_DATE", "DTE", "C_IV", "C_BID", "C_ASK",
                  "STRIKE", "P_BID", "P_ASK", "P_IV", "UNDERLYING_LAST"]
    # Drop rows containing any missing values to ensure numerical consistency
    df = df.dropna()
    # Convert the quote date column from string/object to pandas datetime format
    df["QUOTE_DATE"] = pd.to_datetime(df["QUOTE_DATE"])
    # Convert the option expiration date column to pandas datetime format
    df["EXPIRE_DATE"] = pd.to_datetime(df["EXPIRE_DATE"])
    # Compute time-to-maturity in years by rounding DTE (days to expiration) and dividing by 365
    df["TTM"] = df["DTE"].round() / 365
    # Compute relative strike (moneyness) as strike price divided by the underlying spot price
    df["RELATIVE_STRIKE"] = df["STRIKE"] / df["UNDERLYING_LAST"]
    # Convert call implied volatility to numeric type, coercing invalid entries to NaN
    df["C_IV"] = df["C_IV"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Convert call bid prices to numeric type, coercing invalid entries to NaN
    df["C_BID"] = df["C_BID"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Convert call ask prices to numeric type, coercing invalid entries to NaN
    df["C_ASK"] = df["C_ASK"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Convert put implied volatility to numeric type, coercing invalid entries to NaN
    df["P_IV"] = df["P_IV"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Convert put bid prices to numeric type, coercing invalid entries to NaN
    df["P_BID"] = df["P_BID"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Convert put ask prices to numeric type, coercing invalid entries to NaN
    df["P_ASK"] = df["P_ASK"].apply(pd.to_numeric, errors="coerce").astype(float)
    # Drop any rows that became invalid after numeric type coercion
    df = df.dropna()
    # Return the cleaned and standardized DataFrame
    return df

In [4]:
# Define a function to clean and convert the raw risk-free rate time series into a daily, fully populated dataset
def rates_clean(df):
  # Convert the "Date" column from string/object into pandas datetime for proper time-series handling
  df["Date"] = pd.to_datetime(df["Date"])
  # Convert the "Rate" column from comma-decimal strings to floats and scale from percent to decimal (e.g., 5.25 -> 0.0525)
  df["Rate"] = df["Rate"].str.replace(",", ".").astype(float) / 100
  # Sort by date and set "Date" as the index to enable time-based reindexing and interpolation
  df = df.sort_values("Date").set_index("Date")
  # Create a complete daily date index spanning from the first to the last observed date
  full_idx = pd.date_range(df.index.min(), df.index.max(), freq="D")
  # Reindex to the full daily calendar, introducing NaNs for dates that were missing in the original series
  df_mod = df.reindex(full_idx)
  # Identify numeric columns (here, primarily "Rate") so interpolation is applied only to numeric data
  num_cols = df_mod.select_dtypes("number").columns
  # Fill missing numeric values using time-aware interpolation, then forward-fill and back-fill edge gaps
  df_mod[num_cols] = df_mod[num_cols].interpolate(method="time").ffill().bfill()
  # Convert the index back into a regular "Date" column and restore a standard DataFrame structure
  df_mod = df_mod.rename_axis("Date").reset_index()
  # Return the cleaned, daily, fully populated rate DataFrame
  return df_mod

In [5]:
# Define a function that augments an option-chain DataFrame (already merged with rates) with forward/moneyness/variance features
def df_option_chains_rates_cleaning(df):
    # Compute the forward price F = S * exp(r*T) using underlying spot, continuously compounded rate, and time-to-maturity
    df["F"] = df["UNDERLYING_LAST"] * np.exp((df["Rate"]) * df["TTM"])
    # Compute log-forward moneyness k = ln(K/F), where K is strike and F is the forward price
    df["LogMK"] = np.log(df["STRIKE"] / df["F"])
    # Compute total implied variance w = (IV^2) * T using call implied vol and time-to-maturity
    df["w"] = (df["C_IV"] ** 2) * df["TTM"]
    # Drop the redundant "Date" column (typically kept only for the merge key with the rate series)
    df.drop(columns=["Date"], inplace=True)
    # Return the enriched DataFrame with the newly created columns
    return df

In [6]:
# Define a function to extract all unique option quote dates in chronological order
def dates_extraction(df):
  # Select the QUOTE_DATE column, remove duplicates, and sort the dates in ascending order
  dates = df["QUOTE_DATE"].drop_duplicates().sort_values(ascending=True)
  # Return the sorted series of unique quote dates
  return dates

In [7]:
# Define a function to retrieve the underlying spot price for a given quote date
def get_price(df, date):
  # Filter rows matching the specified quote date, select the underlying price,
  # and extract the first value (prices are constant within a given quote date)
  price = df.loc[df["QUOTE_DATE"].eq(date), "UNDERLYING_LAST"].iloc[0]
  # Return the spot price for the specified date
  return price

In [8]:
# Define a function to retrieve the risk-free interest rate for a given date
def get_rate(df, date):
  # Filter rows matching the specified date, select the corresponding rate,
  # and extract the first value (rates are unique per calendar date)
  rate = df.loc[df["Date"].eq(date), "Rate"].iloc[0]
  # Return the risk-free rate for the specified date
  return rate

In [9]:
# Compute eSSVI total variance w(k) for given log-forward moneyness k and parameters (theta, psi, rho)
def essvi_total_variance(k, theta, psi, rho):
    # Ensure k is a NumPy float array so vectorized operations behave consistently
    k = np.asarray(k, dtype=float)
    # Enforce basic parameter admissibility; if violated, return NaNs with the same shape as k
    if theta <= 0.0 or psi <= 0.0 or abs(rho) >= 1.0:
        return np.full_like(k, np.nan, dtype=float)
    # Compute phi = psi/theta (a common eSSVI re-parameterization)
    phi = psi / theta
    # Compute the linear term x = phi*k + rho used inside the square-root expression
    x = phi * k + rho
    # Compute the square-root argument: x^2 + (1 - rho^2) which is always nonnegative in theory
    inside = x * x + (1.0 - rho * rho)
    # Numerically floor the argument to avoid sqrt of tiny negative values due to floating point error
    inside = np.maximum(inside, 1e-16)
    # Apply the eSSVI formula for total variance w(k)
    w = 0.5 * theta * (1.0 + rho * phi * k + np.sqrt(inside))
    # Return the total variance array w(k)
    return w

# Interpolate eSSVI parameters to a target maturity T_target using calibrated arrays at maturities Ts
def interpolate_essvi_params(T_target, Ts, theta, psi, rho):
    # Convert maturities array to float NumPy array for safe numeric comparisons and indexing
    Ts = np.asarray(Ts, dtype=float)
    # Convert theta array to float NumPy array
    theta = np.asarray(theta, dtype=float)
    # Convert psi array to float NumPy array
    psi = np.asarray(psi, dtype=float)
    # Convert rho array to float NumPy array
    rho = np.asarray(rho, dtype=float)
    # Ensure T_target is a scalar float
    T_target = float(T_target)
    # Handle extrapolation for maturities shorter than the first calibrated point by scaling (approximately linear in T)
    if T_target <= Ts[0]:
        # Compute scaling factor relative to the first maturity (guarding against division by zero)
        lam = T_target / Ts[0] if Ts[0] > 0 else 0.0
        # Scale theta down proportionally for short maturities
        theta_T = lam * theta[0]
        # Scale psi down proportionally for short maturities
        psi_T = lam * psi[0]
        # Keep rho fixed at the first calibrated rho
        rho_T = rho[0]
        # Return extrapolated parameters
        return theta_T, psi_T, rho_T
    # Handle extrapolation for maturities longer than the last calibrated point
    if T_target >= Ts[-1]:
        # Estimate a nonnegative slope for theta using the last two points if available
        if len(Ts) >= 2 and Ts[-1] > Ts[-2]:
            slope = max((theta[-1] - theta[-2]) / (Ts[-1] - Ts[-2]), 0.0)
        else:
            slope = 0.0
        # Extrapolate theta forward using the slope (clipped to be nondecreasing)
        theta_T = theta[-1] + slope * (T_target - Ts[-1])
        # Hold psi constant beyond the last point
        psi_T = psi[-1]
        # Hold rho constant beyond the last point
        rho_T = rho[-1]
        # Return extrapolated parameters
        return theta_T, psi_T, rho_T
        
    # Find the index i such that Ts[i] <= T_target < Ts[i+1]
    i = np.searchsorted(Ts, T_target) - 1
    # Clamp i to a valid interval index range
    i = max(0, min(i, len(Ts) - 2))
    # Extract the bracketing maturities
    T0, T1 = Ts[i], Ts[i + 1]
    # Compute linear interpolation weight lambda in [0,1]
    lam = (T_target - T0) / (T1 - T0)
    # Linearly interpolate theta between the two surrounding maturities
    theta_T = (1.0 - lam) * theta[i] + lam * theta[i + 1]
    # Linearly interpolate psi between the two surrounding maturities
    psi_T = (1.0 - lam) * psi[i] + lam * psi[i + 1]
    # Interpolate the product rho*psi (more stable than interpolating rho directly)
    rho_psi_T = (1.0 - lam) * rho[i] * psi[i] + lam * rho[i + 1] * psi[i + 1]
    # Recover rho at T_target by dividing by interpolated psi
    rho_T = rho_psi_T / psi_T
    # Return interpolated parameters
    return theta_T, psi_T, rho_T

# Calibrate eSSVI parameters for a single maturity slice given observed (k, w) points and optional previous-slice constraints
def calibrate_essvi_slice(k_obs, w_obs,
                          prev_theta=None, prev_psi=None, prev_rho=None,
                          rho_grid=None):
    # Convert observed k values to a float NumPy array
    k_obs = np.asarray(k_obs, dtype=float)
    # Convert observed total variances to a float NumPy array
    w_obs = np.asarray(w_obs, dtype=float)
    # Find the index of the observation closest to ATM (k ≈ 0)
    idx_star = int(np.argmin(np.abs(k_obs)))
    # Extract that near-ATM log-forward moneyness value
    k_star = float(k_obs[idx_star])
    # Use the near-ATM observed total variance as the anchor theta_star
    theta_star = float(w_obs[idx_star])
    # If no rho grid is provided, create a default grid spanning (-0.99, 0.99)
    if rho_grid is None:
        rho_grid = np.linspace(-0.99, 0.99, 41)
    # Define a large penalty value for constraint violations or invalid model values
    big = 1e9
    # Initialize the best objective value as +infinity
    best_err = np.inf
    # Initialize the best rho candidate
    best_rho = 0.0
    # Initialize the best psi candidate
    best_psi = 1e-4
    # Initialize the best theta candidate (start from theta_star)
    best_theta = theta_star
    # Loop over candidate rho values and optimize psi for each rho
    for rho in rho_grid:
        # Compute denom = 1 + |rho| used in upper bound formulas
        denom = 1.0 + abs(rho)
        # Compute the discriminant-like term used to derive a feasible psi upper bound
        tmp = 4.0 * rho * rho * k_star * k_star / (denom * denom) + 4.0 * theta_star / denom
        # Skip rho values that yield nonpositive tmp (no real sqrt => no feasible psi_plus)
        if tmp <= 0.0:
            continue
        # Compute psi_plus from the derived feasibility condition
        psi_plus = -2.0 * rho * k_star / denom + np.sqrt(tmp)
        # Set an upper bound on psi using both psi_plus and the calendar constraint 4/(1+|rho|)
        psi_upper = min(psi_plus, 4.0 / denom)
        # Set a small positive lower bound on psi to avoid degeneracy
        psi_lower = 1e-8
        # If previous slice parameters are available, enforce monotonic/calendar-style lower bounds on psi
        if prev_theta is not None and prev_psi is not None and prev_rho is not None:
            # Enforce that psi does not decrease (with a small epsilon) across maturities
            psi_lower = max(psi_lower, prev_psi + 1e-8)
            # Precompute denominators for additional calendar constraints
            denom1 = 1.0 - rho
            denom2 = 1.0 + rho
            # Skip invalid rho values that would make denominators nonpositive
            if denom1 <= 0 or denom2 <= 0:
                continue
            # Compute constraint-based minimum psi implied by (1-rho) scaling from previous slice
            psi_min_cal1 = prev_psi * (1.0 - prev_rho) / denom1
            # Compute constraint-based minimum psi implied by (1+rho) scaling from previous slice
            psi_min_cal2 = prev_psi * (1.0 + prev_rho) / denom2
            # Update psi_lower to satisfy both calendar constraints
            psi_lower = max(psi_lower, psi_min_cal1, psi_min_cal2)
        # Skip this rho if feasible interval is empty or inverted
        if psi_lower >= psi_upper:
            continue
        # Define the objective function in psi for a fixed rho
        
        def obj(psi):
            # Compute theta from the ATM anchoring relationship theta = theta_star - rho*psi*k_star
            theta = theta_star - rho * psi * k_star
            # Penalize if theta is too small or negative
            if theta <= 1e-8:
                return big + 1e4 * (1e-8 - theta) ** 2
            # Penalize if theta would decrease relative to previous slice (calendar monotonicity)
            if prev_theta is not None and theta < prev_theta:
                return big + 1e4 * (prev_theta - theta) ** 2
            # Compute model-implied total variance at observed k points
            w_model = essvi_total_variance(k_obs, theta, psi, rho)
            # Penalize any non-finite model output (NaN/inf)
            if not np.all(np.isfinite(w_model)):
                return big
            # Return mean squared error between model and observed total variances
            return np.mean((w_model - w_obs) ** 2)
        # Run bounded 1D optimization to find psi minimizing the objective for this rho
        res = minimize_scalar(
            obj,
            bounds=(psi_lower, psi_upper),
            method="bounded",
            options={"xatol": 1e-8, "maxiter": 200},
        )
        # Skip if optimizer did not converge successfully
        if not res.success:
            continue
        # Extract optimized psi from the optimizer result
        psi_opt = float(res.x)
        # Recompute theta corresponding to the optimized psi
        theta_opt = theta_star - rho * psi_opt * k_star
        # Skip if theta is not strictly positive
        if theta_opt <= 0:
            continue
        # Extract the optimized objective value (MSE)
        err = float(res.fun)
        # Update the best parameters if this rho/psi combination improves the fit
        if err < best_err:
            best_err = err
            best_rho = rho
            best_psi = psi_opt
            best_theta = theta_opt
    # Return calibrated (theta, psi, rho) plus anchor diagnostics and the best MSE
    return best_theta, best_psi, best_rho, k_star, theta_star, best_err

# Calibrate eSSVI parameters across all maturity slices present in the DataFrame
def calibrate_essvi_slices(df):
    # Extract unique maturities (TTM) and sort them ascending for sequential calendar-consistent calibration
    Ts = np.sort(df["TTM"].unique())
    # Initialize a list to collect calibrated parameter rows
    params_rows = []
    # Initialize previous-slice parameters (used to enforce monotonic/calendar constraints)
    prev_theta = prev_psi = prev_rho = None
    # Iterate through each maturity slice in ascending order
    for i, T in enumerate(Ts):
        # Extract the slice of quotes corresponding to the given maturity
        sli = df.loc[df["TTM"] == T].copy()
        # Pull observed log-forward moneyness values as a NumPy array
        k_obs = sli["LogMK"].to_numpy()
        # Pull observed total variances as a NumPy array
        w_obs = sli["w"].to_numpy()
        # Calibrate eSSVI for this maturity slice using constraints from the previous slice if available
        theta, psi, rho, k_star, theta_star, mse = calibrate_essvi_slice(
            k_obs, w_obs,
            prev_theta=prev_theta,
            prev_psi=prev_psi,
            prev_rho=prev_rho,
        )
        # Convert mean squared error into root mean squared error for interpretability
        rmse = float(np.sqrt(mse))
        # Append the calibrated parameters and diagnostics for this maturity to the results list
        params_rows.append({
            "T": T,
            "theta": theta,
            "psi": psi,
            "rho": rho,
            "k_star": k_star,
            "theta_star": theta_star,
            "rmse": rmse,
            "n_quotes": len(sli),
        })
        # Update previous-slice parameters for the next maturity calibration
        prev_theta, prev_psi, prev_rho = theta, psi, rho
    # Create a DataFrame of calibrated parameters and sort by maturity
    essvi_params = (
        pd.DataFrame(params_rows)
        .sort_values("T")
        .reset_index(drop=True)
    )
    # Return the calibrated term-structure parameter DataFrame
    return essvi_params
    
# Build an implied-vol surface on a (moneyness, maturity) grid from calibrated eSSVI parameters and a given rate
def build_essvi_surface_on_grid(essvi_params, moneyness_grid, ttm_grid, rate):
    # Extract calibrated maturities as a NumPy array
    Ts = essvi_params["T"].to_numpy()
    # Extract calibrated theta values as a NumPy array
    theta_arr = essvi_params["theta"].to_numpy()
    # Extract calibrated psi values as a NumPy array
    psi_arr = essvi_params["psi"].to_numpy()
    # Extract calibrated rho values as a NumPy array
    rho_arr = essvi_params["rho"].to_numpy()
    # Convert moneyness grid to float NumPy array for consistent numeric operations
    moneyness_grid = np.asarray(moneyness_grid, dtype=float)
    # Compute log spot-moneyness log(K/S) once, reused across maturities
    log_m = np.log(moneyness_grid)
    # Initialize a list of row dictionaries to build the grid DataFrame
    rows = []
    # Loop over target maturities in the surface grid
    for T in ttm_grid:
        # Interpolate (theta, psi, rho) to this maturity T using the calibrated term structure
        theta_T, psi_T, rho_T = interpolate_essvi_params(
            T, Ts, theta_arr, psi_arr, rho_arr
        )
        # Compute log-forward moneyness k = log(K/S) - log(F/S) = log(m) - r*T
        k_grid = log_m - rate * T
        # If interpolated parameters are invalid, fall back to zero variance on this row
        if theta_T <= 0 or psi_T <= 0:
            w_row = np.zeros_like(k_grid, dtype=float)
        else:
            # Compute total variance across the k grid using eSSVI
            w_row = essvi_total_variance(k_grid, theta_T, psi_T, rho_T)
        # Store each grid point as a row (T, moneyness, k, w) for later pivoting/analysis
        for mg, k_val, wv in zip(moneyness_grid, k_grid, w_row):
            rows.append({
                "T": float(T),
                "moneyness": float(mg),
                "k": float(k_val),
                "w": max(float(wv), 0.0),
            })
    # Create a DataFrame from collected rows and sort for stable ordering
    grid_essvi = (
        pd.DataFrame(rows)
        .sort_values(["T", "moneyness"])
        .reset_index(drop=True)
    )
    # Convert total variance w to implied volatility iv = sqrt(w/T), safely handling T=0
    grid_essvi["iv"] = np.sqrt(
        np.where(grid_essvi["T"] > 0.0, grid_essvi["w"] / grid_essvi["T"], 0.0)
    )
    # Return the gridded eSSVI surface DataFrame
    return grid_essvi

In [10]:
# Define a function that builds a 4D "vol cube" tensor (samples × channels × TTM × moneyness) from daily option data
def formating_vol_cube(df, dates):
    # Initialize an empty cube with a dummy first sample to simplify concatenation inside the loop
    vol_cube = np.empty((1, 5, 20, 20))
    # Loop over each quote date to build one surface (one sample) per date
    for date in dates:
        # Look up the risk-free rate applicable to this quote date (from the external cleaned rate DataFrame)
        rate = get_rate(df_risk_free_rates, date)
        # Look up the underlying spot price for this quote date from the option-chain DataFrame
        price = get_price(df, date)
        # Filter the full option-chain DataFrame down to only rows for the current quote date
        df_data = df[df["QUOTE_DATE"] == date]
        # Calibrate eSSVI parameters across all maturities available on this quote date
        essvi_params = calibrate_essvi_slices(df_data)
        # Build the eSSVI implied-vol surface on the predefined (ttm_grid × moneyness_grid) grid
        grid_essvi = build_essvi_surface_on_grid(
            essvi_params,
            moneyness_grid=moneyness_grid,
            ttm_grid=ttm_grid,
            rate=rate,)
        # Pivot the long-form grid DataFrame into a 2D matrix of implied vols indexed by T and moneyness
        df_grid_pivot = grid_essvi.pivot(index="T", columns="moneyness", values="iv").sort_index().sort_index(axis=1)
        # Convert the pivoted implied-vol surface into a NumPy array (shape: n_T × n_moneyness)
        IV_grid = df_grid_pivot.to_numpy()
        # Add a leading dimension so the surface can be stacked with other 2D "channels"
        IV_grid_expanded = np.expand_dims(IV_grid, axis=0)
        # Create a grid-shaped array filled with the scalar rate so it can be used as a channel
        rate_expanded = np.full(IV_grid_expanded.shape, rate)
        # Create a grid-shaped array filled with the scalar spot price so it can be used as a channel
        price_expanded = np.full(IV_grid_expanded.shape, price)
        # Stack the TTM grid channel with the implied-vol channel (channel dimension grows by concatenation on axis=0)
        IV_grid_expanded = np.concatenate((TTM_grid_expanded, IV_grid_expanded), axis=0)
        # Append the spot price channel to the channel stack
        IV_grid_expanded = np.concatenate((IV_grid_expanded, price_expanded), axis=0)
        # Append the rate channel to the channel stack
        IV_grid_expanded = np.concatenate((IV_grid_expanded, rate_expanded), axis=0)
        # Append the strike grid channel K = S * (K/S) by multiplying spot price by the moneyness grid
        IV_grid_expanded = np.concatenate((IV_grid_expanded, price * M_grid_expanded), axis=0)
        # Add a leading sample dimension so this date becomes one sample in the dataset
        IV_grid_expanded = np.expand_dims(IV_grid_expanded, axis=0)
        # Concatenate this sample onto the growing vol cube along the sample axis
        vol_cube = np.concatenate((vol_cube, IV_grid_expanded), axis=0)
    # Remove the initial dummy sample row that was only used to make concatenation convenient
    vol_cube = np.delete(vol_cube, 0, axis=0)
    # Return the final vol cube tensor (shape: n_dates × 5 × 20 × 20)
    return vol_cube

In [11]:
# Create a 1D grid of spot-moneyness values K/S from 0.90 up to (but not including) 1.09 in steps of 0.01
moneyness_grid = np.arange(0.9, 1.09, 0.01)
# Create a 1D grid of maturities from 30 to 600 days (inclusive) in steps of 30, then convert days to years by dividing by 365
ttm_grid = np.arange(30, 600 + 1, 30) / 365
# Build 2D mesh grids so every maturity is paired with every moneyness (used to construct grid-shaped channels)
M_grid, TTM_grid = np.meshgrid(moneyness_grid, ttm_grid)

In [12]:
# Add a leading channel dimension to the moneyness grid so it can be stacked with other surface channels
M_grid_expanded = np.expand_dims(M_grid, axis=0)
# Add a leading channel dimension to the time-to-maturity grid so it can be stacked with other surface channels
TTM_grid_expanded = np.expand_dims(TTM_grid, axis=0)

In [13]:
# Clean and interpolate the raw risk-free rate dataset to produce a daily, fully populated rate time series
df_risk_free_rates = rates_clean(risk_free_rates)
# Save the cleaned and interpolated daily risk-free rate DataFrame to a CSV file for inspection or reuse
df_risk_free_rates.to_csv("rates_interpolated.csv")

  df["Date"] = pd.to_datetime(df["Date"])


In [14]:
# Clean and standardize the raw training option-chain data (select columns, parse dates, compute TTM/moneyness, coerce numerics)
df_option_chains_train = clean_data(option_chains_train)
# Merge the cleaned option-chain data with the daily risk-free rate series by matching QUOTE_DATE to the rate "Date"
df_option_chains_rates_train = df_option_chains_train.merge(
    df_risk_free_rates,
    left_on="QUOTE_DATE",
    right_on="Date",
    how="left",
)
# Add derived forward/total-variance features (F, LogMK, w) and drop the redundant merge key column
df_option_chains_rates_clean_train = df_option_chains_rates_cleaning(df_option_chains_rates_train)
# Extract the sorted list of unique quote dates present in the enriched training dataset
dates_train = dates_extraction(df_option_chains_rates_clean_train)
# Build the training vol cube tensor by calibrating eSSVI and gridding implied vols for each quote date
vol_cube_train = formating_vol_cube(df_option_chains_rates_clean_train, dates_train)

In [15]:
# Save the cleaned training option-chain DataFrame to CSV for verification or reuse
df_option_chains_train.to_csv("clean_train_chains.csv")
# Save the list of training quote dates to CSV for reference or downstream processing
dates_train.to_csv("dates_train.csv")

In [16]:
# Clean and standardize the raw test option-chain data (select columns, parse dates, compute TTM/moneyness, coerce numerics)
df_option_chains_test = clean_data(option_chains_test)
# Merge the cleaned test option-chain data with the daily risk-free rate series by matching QUOTE_DATE to the rate "Date"
df_option_chains_rates_test = df_option_chains_test.merge(
    df_risk_free_rates,
    left_on="QUOTE_DATE",
    right_on="Date",
    how="left",
)
# Add derived forward/total-variance features (F, LogMK, w) and drop the redundant merge key column
df_option_chains_rates_clean_test = df_option_chains_rates_cleaning(df_option_chains_rates_test)
# Extract the sorted list of unique quote dates present in the enriched test dataset
dates_test = dates_extraction(df_option_chains_rates_clean_test)
# Build the test vol cube tensor by calibrating eSSVI and gridding implied vols for each quote date
vol_cube_test = formating_vol_cube(df_option_chains_rates_clean_test, dates_test)

In [17]:
# Save the cleaned test option-chain DataFrame to CSV for verification or reuse
df_option_chains_test.to_csv("clean_test_chains.csv")
# Save the list of test quote dates to CSV for reference or downstream processing
dates_test.to_csv("dates_test.csv")

In [18]:
# Save the training and test vol cube tensors into a single compressed NumPy archive file
np.savez_compressed("vol_cube.npz", train=vol_cube_train, test=vol_cube_test)

In [19]:
# Define a function that approximates the standard normal CDF Φ(x) for vector inputs
def norm_cdf(x: np.ndarray) -> np.ndarray:
    # Convert input to a NumPy float array to ensure vectorized numerical operations
    x = np.asarray(x, dtype=float)
    # Extract the sign of x so the approximation can be applied to |x| and then re-signed
    sign = np.sign(x)
    # Compute |x|/sqrt(2) because erf-based CDF formulas use x / sqrt(2)
    x_abs = np.abs(x) / np.sqrt(2.0)
    # Set approximation coefficients for a polynomial erf approximation (Abramowitz–Stegun style)
    a1 = 0.254829592
    # Set the second coefficient for the erf approximation polynomial
    a2 = -0.284496736
    # Set the third coefficient for the erf approximation polynomial
    a3 = 1.421413741
    # Set the fourth coefficient for the erf approximation polynomial
    a4 = -1.453152027
    # Set the fifth coefficient for the erf approximation polynomial
    a5 = 1.061405429
    # Set the constant p used in the rational approximation term t = 1/(1 + p*x)
    p  = 0.3275911
    # Compute t = 1/(1 + p*x_abs) which drives the polynomial evaluation
    t = 1.0 / (1.0 + p * x_abs)
    # Compute an approximation of erf(x_abs) using a nested polynomial (Horner form) times exp(-x_abs^2)
    erf_approx = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * np.exp(-x_abs * x_abs)
    # Re-apply the sign to extend the approximation from x>=0 to all real x
    erf_approx *= sign
    # Convert erf approximation to normal CDF using Φ(x) = 0.5 * (1 + erf(x/sqrt(2)))
    return 0.5 * (1.0 + erf_approx)

# Define a vectorized Black–Scholes call pricer given spot S, strike K, maturity T, rate r, and volatility sigma
def black_scholes_call(S, K, T, r, sigma):
    # Convert spot to a NumPy float array for broadcasting over grids
    S = np.asarray(S, dtype=float)
    # Convert strike to a NumPy float array for broadcasting over grids
    K = np.asarray(K, dtype=float)
    # Convert maturity to a NumPy float array for broadcasting over grids
    T = np.asarray(T, dtype=float)
    # Convert risk-free rate to a NumPy float array for broadcasting over grids
    r = np.asarray(r, dtype=float)
    # Convert volatility to a NumPy float array for broadcasting over grids
    sigma = np.asarray(sigma, dtype=float)
    # Define a small epsilon to prevent division by zero in sigma and T
    eps = 1e-12
    # Floor volatility at eps to avoid numerical blow-ups when sigma is near zero
    sigma = np.maximum(sigma, eps)
    # Floor maturity at eps to avoid division by zero and log issues when T is near zero
    T = np.maximum(T, eps)
    # Compute sqrt(T) once since it is used multiple times
    sqrtT = np.sqrt(T)
    # Compute the Black–Scholes d1 term
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * sqrtT)
    # Compute the Black–Scholes d2 term
    d2 = d1 - sigma * sqrtT
    # Compute Φ(d1) using the normal CDF approximation
    Nd1 = norm_cdf(d1)
    # Compute Φ(d2) using the normal CDF approximation
    Nd2 = norm_cdf(d2)
    # Return the Black–Scholes call price C = S Φ(d1) - K e^{-rT} Φ(d2)
    return S * Nd1 - K * np.exp(-r * T) * Nd2

# Define a function that checks several static no-arbitrage conditions on a call surface implied by (T,K,vol)
def check_call_surface_no_arbitrage(
    S0: float,
    r: float,
    T_grid: np.ndarray,
    K_grid: np.ndarray,
    vol_grid: np.ndarray,
    tol: float = 1e-8,) -> dict:
    # Convert the maturity grid to a NumPy float array
    T_grid = np.asarray(T_grid, dtype=float)
    # Convert the strike grid to a NumPy float array
    K_grid = np.asarray(K_grid, dtype=float)
    # Convert the volatility surface to a NumPy float array
    vol_grid = np.asarray(vol_grid, dtype=float)
    # Read the surface dimensions: number of maturities and number of strikes
    n_T, n_K = vol_grid.shape
    # Assert that the maturity grid length matches the surface first dimension
    assert T_grid.shape == (n_T,)
    # Assert that the strike grid length matches the surface second dimension
    assert K_grid.shape == (n_K,)
    # Build a maturity matrix by repeating T_grid across strike columns (shape: n_T × n_K)
    T_mat = T_grid[:, None] * np.ones((1, n_K))
    # Build a strike matrix by repeating K_grid across maturity rows (shape: n_T × n_K)
    K_mat = np.ones((n_T, 1)) * K_grid[None, :]
    # Compute the call price surface from the vol surface using Black–Scholes
    C = black_scholes_call(S0, K_mat, T_mat, r, vol_grid)
    # Compute the intrinsic lower bound max(0, S0 - K e^{-rT}) for each grid point
    lower_bound = np.maximum(0.0, S0 - K_mat * np.exp(-r * T_mat))
    # Set the simple upper bound for a call price, which is S0
    upper_bound = S0
    # Flag points where call prices violate the lower bound (with tolerance)
    bound_low_viol  = C + tol < lower_bound
    # Flag points where call prices violate the upper bound (with tolerance)
    bound_high_viol = C - tol > upper_bound
    # Compute first differences in strike direction (proxy for ∂C/∂K up to scaling)
    dC_dK = np.diff(C, axis=1)
    # Flag monotonicity violations: call prices must be nonincreasing in strike, so differences should be <= 0
    monoK_viol = dC_dK > tol
    # Compute strike spacing ΔK for finite-difference slope calculations
    dK = np.diff(K_grid)
    # Compute discrete slopes (C(K_{j+1}) - C(K_j)) / ΔK for each maturity row
    slopes = (C[:, 1:] - C[:, :-1]) / dK[None, :]
    # Compute differences of slopes (proxy for second derivative in K)
    slope_diff = np.diff(slopes, axis=1)
    # Flag convexity violations: call prices must be convex in strike, so slope differences should be >= 0
    convex_viol = slope_diff < -tol
    # Compute first differences in maturity direction (proxy for ∂C/∂T up to scaling)
    dC_dT = np.diff(C, axis=0)
    # Flag calendar violations: call prices should be nondecreasing in maturity (no calendar arbitrage)
    calendar_viol = dC_dT < -tol
    # Return a summary dictionary containing whether any violation occurred and counts of each type
    return {
        "any_violation": (
            bound_low_viol.any()
            or bound_high_viol.any()
            or monoK_viol.any()
            or convex_viol.any()
            or calendar_viol.any()
        ),
        "bound_low_count": int(bound_low_viol.sum()),
        "bound_high_count": int(bound_high_viol.sum()),
        "monoK_count": int(monoK_viol.sum()),
        "convex_count": int(convex_viol.sum()),
        "calendar_count": int(calendar_viol.sum()),
    }

# Define a helper that extracts grids/parameters from one vol-cube sample and runs the no-arbitrage checker
def check_sample_from_vol_cube(sample, tol: float = 1e-8):
    # Extract channel 0: maturity grid replicated across strikes (shape: n_T × n_K)
    ch0 = sample[0]   # T grid replicated across strikes
    # Extract channel 1: implied volatility surface (shape: n_T × n_K)
    ch1 = sample[1]   # vol surface
    # Extract channel 2: spot price grid (constant across the surface)
    ch2 = sample[2]   # spot
    # Extract channel 3: rate grid (constant across the surface)
    ch3 = sample[3]   # r-like scalar (constant on grid)
    # Extract channel 4: strike grid replicated across maturities (shape: n_T × n_K)
    ch4 = sample[4]   # K grid replicated across maturities
    # Recover the 1D maturity vector by taking the first strike column (all columns are identical in this channel)
    T_grid = ch0[:, 0]
    # Recover the 1D strike vector by taking the first maturity row (all rows are identical in this channel)
    K_grid = ch4[0, :]
    # Recover the scalar spot price from any grid entry and cast to Python float
    S0 = float(ch2[0, 0])
    # Recover the scalar rate from any grid entry and cast to Python float
    r = float(ch3[0, 0])
    # Run the no-arbitrage checks on the implied call surface generated from the vol surface
    return check_call_surface_no_arbitrage(
        S0=S0,
        r=r,
        T_grid=T_grid,
        K_grid=K_grid,
        vol_grid=ch1,
        tol=tol,
    )

In [20]:
# Define a function that scans an entire vol-cube dataset and aggregates no-arbitrage violation statistics
def summarize_dataset_no_arb(vol_cube, tol: float = 1e-8):
    # Read the number of samples (dates) stored in the first dimension of the vol cube
    n = vol_cube.shape[0]
    # Initialize a summary dictionary to track dataset-level counts and totals
    summary = {
        "total_samples": n,
        "no_arbitrage_samples": 0,
        "arbitrage_samples": 0,
        "bound_low_total": 0,
        "bound_high_total": 0,
        "monoK_total": 0,
        "convex_total": 0,
        "calendar_total": 0,
    }
    # Track the index of the first sample where any arbitrage is detected
    first_arb_idx = None
    # Store the detailed violation counts for the first arbitrage sample for debugging
    first_arb_details = None
    # Loop over each sample in the dataset
    for i in range(n):
        # Run the per-sample arbitrage checks by extracting grids from vol_cube[i]
        res = check_sample_from_vol_cube(vol_cube[i], tol=tol)
        # If any arbitrage condition is violated, update arbitrage counters and totals
        if res["any_violation"]:
            summary["arbitrage_samples"] += 1
            summary["bound_low_total"]  += res["bound_low_count"]
            summary["bound_high_total"] += res["bound_high_count"]
            summary["monoK_total"]      += res["monoK_count"]
            summary["convex_total"]     += res["convex_count"]
            summary["calendar_total"]   += res["calendar_count"]
            # If this is the first arbitrage sample encountered, record its index and details
            if first_arb_idx is None:
                first_arb_idx = i
                first_arb_details = res
        # Otherwise, count this sample as no-arbitrage
        else:
            summary["no_arbitrage_samples"] += 1
    # Add the index of the first arbitrage sample (or None if no arbitrage was found)
    summary["first_arbitrage_index"] = first_arb_idx
    # Add the detailed results for the first arbitrage sample (or None if no arbitrage was found)
    summary["first_arbitrage_details"] = first_arb_details
    # Return the completed dataset-level summary statistics
    return summary

In [21]:
# Run the no-arbitrage summary checks across all samples in the training vol cube using the default tolerance
summarize_dataset_no_arb(vol_cube_train)

{'total_samples': 1253,
 'no_arbitrage_samples': 1176,
 'arbitrage_samples': 77,
 'bound_low_total': 0,
 'bound_high_total': 0,
 'monoK_total': 0,
 'convex_total': 0,
 'calendar_total': 1811,
 'first_arbitrage_index': 137,
 'first_arbitrage_details': {'any_violation': True,
  'bound_low_count': 0,
  'bound_high_count': 0,
  'monoK_count': 0,
  'convex_count': 0,
  'calendar_count': 45}}

In [22]:
# Run the no-arbitrage summary checks across all samples in the test vol cube using the default tolerance
summarize_dataset_no_arb(vol_cube_test)

{'total_samples': 570,
 'no_arbitrage_samples': 565,
 'arbitrage_samples': 5,
 'bound_low_total': 0,
 'bound_high_total': 0,
 'monoK_total': 0,
 'convex_total': 0,
 'calendar_total': 45,
 'first_arbitrage_index': 128,
 'first_arbitrage_details': {'any_violation': True,
  'bound_low_count': 0,
  'bound_high_count': 0,
  'monoK_count': 0,
  'convex_count': 0,
  'calendar_count': 14}}