# Implied-Vol Surface Completion (FC-NN + BS layer)
## a lotta words to say that this project IS NOT COMPLETED YET!!!!


**Goal (snapshot t_0):** Given a few option quotes at one timestamp, complete an **arbitrage-free** surface of prices/IV across strikes K and tenors T.  
**Method:** Fully-connected MLP → predicted IV 𝜎̂(K,T) → **Black–Scholes** price layer → fit to observed quotes.  
**Stretch:** Small **PINN** term (BS-PDE residual + simple boundary/terminal penalties) as regularizer.  
**Deliverables:** error/arb metrics, plots, ablation (baseline / +no-arb / +PINN / ensemble).


We will be building a black scholes model calculator in house and following this methodology:
 - Generate data using b-s model for pretraining
 - Utilize data from kaggle dataset(s) for fine-tuning


maybe we can experiment with join curriculum, with 90% synthetic / 10% real and validate/test on real only


## Data generation using the Black-Scholes model
Use b-s to generate "snapshots" of many quotes. Within each snapshot, pick 10-20 quotes as "observed", while treating the rest as targets for completion. This is fixed only once for fair comparison. We then use this phase to debug the model/tune rough ranges

Black Scholes Formula (Call):
$$ C = SN(d_1)-Ke^{rT}N(d_2) $$
Where:
- $C$: Price of the European call option
- $S$: Current price of the underlying asset (spot)
- $K$: Strike price of the option
- $r$: Risk-free interest rate
- $T$: Time to expiration (in years)
- $\sigma$: Volatility of the underlying asset's returns
- $N(d_{1})$ and $N(d_{2})$: The cumulative standard normal distribution function, which gives the probability that a variable will be less than a certain value

Put is different formula (to work on later)

This function will need to be useful in data generation. It will be rebuilt inside the model itself for training.

In [1]:
# Global Imports
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from scipy.stats import norm
import math
import platform
import torch.nn.functional as F


In [2]:
def bs_price(spot, strike, years, r, q, sigma, option="call"):
    T = np.asarray(years, dtype=float)
    S = np.asarray(spot, dtype=float)
    K = np.asarray(strike, dtype=float)
    sig = np.asarray(sigma, dtype=float)

    eps = 1e-12  # fix divide by zero error

    T = np.maximum(T, eps)
    sig = np.maximum(sig, 1e-12)

    d1 = (np.log(S / K) + (r - q + 0.5 * sig**2) * T) / (sig * np.sqrt(T))
    d2 = d1 - sig * np.sqrt(T)

    Nd1 = norm.cdf(d1)
    Nd2 = norm.cdf(d2)

    if option == "call":
        return S * np.exp(-q * T) * Nd1 - K * np.exp(-r * T) * Nd2
    else:  # put
        return K * np.exp(-r * T) * norm.cdf(-d2) - S * np.exp(-q * T) * norm.cdf(-d1)


In [3]:
def make_snapshot(snapshot_id, n_strikes=30, tenors_days=(7,14,30,60,90,180,365),
                  smile=False, rng=None):
    rng = np.random.default_rng(rng)

    S0 = rng.uniform(50, 500)
    r = rng.uniform(0.00, 0.05)
    q = rng.uniform(0.00, 0.03)

    m = rng.uniform(0.6, 1.4, size=n_strikes)
    K = np.sort(S0 * m)
    T = np.array(tenors_days) / 365.0

    K_grid, T_grid = np.meshgrid(K, T, indexing="xy")
    S_grid = np.full_like(K_grid, S0)
    r_grid = np.full_like(K_grid, r)
    q_grid = np.full_like(K_grid, q)

    # Volatility: constant or simple smile
    if not smile:
        sigma_snap = rng.uniform(0.10, 0.60)
        sigma_grid = np.full_like(K_grid, sigma_snap)
    else:
        ell = np.log(S0 / K_grid)
        a = rng.uniform(0.10, 0.50)
        b = rng.uniform(-0.20, 0.20)
        c = rng.uniform(0.00, 0.20)
        d = rng.uniform(-0.05, 0.10)
        sigma_grid = np.clip(a + b*ell + c*ell**2 + d*np.sqrt(T_grid), 0.05, 2.0)

    P_mid = bs_price(S_grid, K_grid, T_grid, r_grid, q_grid, sigma_grid, option="call")

    noise = rng.normal(loc=0.0, scale=0.01*np.maximum(0.1, P_mid))
    P_obs = np.clip(P_mid + noise, 0.0, None)

    df = pd.DataFrame({
        "snapshot_id": snapshot_id,
        "S0": S_grid.ravel(),
        "K": K_grid.ravel(),
        "T": T_grid.ravel(),
        "r": r_grid.ravel(),
        "q": q_grid.ravel(),
        "sigma_true": sigma_grid.ravel(),
        "price_mid": P_mid.ravel(),
        "price_obs": P_obs.ravel(),
    })
    return df


In [4]:
def make_dataset(n_snapshots=2000, smile=False, seed=42):
    rng = np.random.default_rng(seed)
    dfs = []
    for sid in range(n_snapshots):
        df = make_snapshot(sid, smile=smile, rng=rng)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

In [5]:
def split_snapshots(n_snapshots, train=0.8, val=0.1, seed=7):
    rng = np.random.default_rng(seed)
    ids = np.arange(n_snapshots)
    rng.shuffle(ids)
    n_train = int(train*n_snapshots)
    n_val = int(val*n_snapshots)
    return {
        "train": ids[:n_train],
        "val": ids[n_train:n_train+n_val],
        "test":  ids[n_train+n_val:]
    }

In [6]:
def make_observed_mask(df, observed_per_snapshot=15, seed=99):
    rng = np.random.default_rng(seed)
    mask = {}
    for sid, df_s in df.groupby("snapshot_id"):
        idx = df_s.index.values
        choose = min(observed_per_snapshot, len(idx))
        obs_idx = rng.choice(idx, size=choose, replace=False)
        mask[int(sid)] = np.sort(obs_idx)
    return mask

# Model Creation
Now that we have created the B-S Model and functions that will help us generate synthetic code, we can now work on the creation of the model itself. The architecture is as follows:
- Fully connected Multi-Layer Perceptron (MLP)
- Depth $\times$ width: 5 hidden layers $\times$ 256 units
- Activation: SiLU (Sigmoid Linear Unit)

The model has the following inputs:
- log-moneyness: $ln(S0/K)$
	- how far the strike is from the spot
- time to expiry: $\tau$ in years
- rates: risk-free $r$ and dividend yield $q$
- _(optional)_ scaled spot $S_{0} / 100$

The model would then output:
- Implied Volatility $\hat{\sigma}(K,T)>0$
  - We then plug $\hat{\sigma}$ into Black-Scholes to get a price $\hat{C}$.

In [7]:
# Dr white's implementation
# Device selection: CUDA -> MPS (Apple Silicon) -> CPU
if torch.cuda.is_available():
    device = torch.device("cuda")
elif getattr(torch.backends, "mps", None) and torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

print(f"Using device: {device.type}\n")

if device.type == "cuda":
    num_gpus = torch.cuda.device_count()
    print(f"Found {num_gpus} CUDA device(s):")
    for idx in range(num_gpus):
        print(f"  [{idx}] {torch.cuda.get_device_name(idx)}")
elif device.type == "mps":
    print("Apple Metal (MPS) device available")
else:
    print("Running on CPU")

print("\nPython:", platform.python_version(), "| PyTorch:", torch.__version__)

Using device: cuda

Found 1 CUDA device(s):
  [0] NVIDIA GeForce RTX 4070 SUPER

Python: 3.12.3 | PyTorch: 2.4.1


In [8]:
class MLP_IV(nn.Module):
    def __init__(self, in_dim=5, width=256, depth=5, eps=1e-4):
        super().__init__()
        layers = []
        dims = [in_dim] + [width]*depth
        self.backbone = nn.Sequential(
          nn.Linear(in_dim, width), nn.SiLU(),
          nn.Linear(width, width), nn.SiLU(),
          nn.Linear(width, width), nn.SiLU(),
          nn.Linear(width, width), nn.SiLU(),
          nn.Linear(width, width), nn.SiLU(),
        )
        self.head = nn.Linear(width, 1)
        self.softplus = nn.Softplus()
        self.eps = eps

    def forward(self, x):
        h = self.backbone(x)
        sigma = self.softplus(self.head(h)) + self.eps
        return sigma.squeeze(-1)


Reasoning for using only the Call price.
"But wait! Why not make a `bs_put_price` too?"
 - Answer: *They're not independent.* In fact, one can always be computed from the other using the **put-call parity** relationship:

 $$
C - P = S_{0}e^{qT} - Ke^{-rT}
$$
or rearranged for the put:
$$
P = C - S_{0}e^{qT} + Ke^{-rt}.
$$
That means, once our network learns the **call surface** (call prices for all strikes/maturities), we can immediately compute the corresponding put surface **analytically.**

In [9]:
def normal_cdf(x):
    # Stable CDF via PyTorch's error function (keeps it differentiable)
    # Used in calculating the call price in Black-Scholes
    return 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

def bs_call_price(S, K, tau, r, q, sigma):
    # Implementation of Black-Scholes Call price in PyTorch
    S = torch.clamp(S, min=1e-12)
    K = torch.clamp(K, min=1e-12)
    tau = torch.clamp(tau, min=1e-12)
    sqrt_tau = torch.sqrt(tau)

    d1 = (torch.log(S/K) + (r-q + 0.5 * sigma**2) * tau) / (sigma * sqrt_tau)
    d2 = d1 - sigma * sqrt_tau
    Nd1 = normal_cdf(d1)
    Nd2 = normal_cdf(d2)
    disc_q = torch.exp(-q * tau)
    disc_r = torch.exp(-r * tau)
    C = S * disc_q * Nd1 - K * disc_r * Nd2
    return C


To ensure readability and ease-of-use for the results, we will be using Dataloaders.

In [10]:
# Custom dataset for the MLP Model
class OptionDataset(Dataset):
    def __init__(self, df, stats=None, standardize=True):
        self.df = df.reset_index(drop=True)
        # features for the MLP
        log_mny = np.log(self.df["S0"].values / self.df["K"].values)
        T = self.df["T"].values
        r = self.df["r"].values
        q = self.df["q"].values
        S0_scaled = self.df["S0"].values / 100.0

        # fit stats on train; reuse them for val/test
        if stats is None:
            stats = {
              "T_mu": T.mean(), "T_sd": T.std() + 1e-8,
              "r_mu": r.mean(), "r_sd": r.std() + 1e-8,
              "q_mu": q.mean(), "q_sd": q.std() + 1e-8,
            }
        self.stats = stats

        if standardize:
            Tn = (T - stats["T_mu"]) / stats["T_sd"]
            rn = (r - stats["r_mu"]) / stats["r_sd"]
            qn = (q - stats["q_mu"]) / stats["q_sd"]
        else:
            Tn, rn, qn = T, r, q

        X = np.stack([log_mny, Tn, rn, qn, S0_scaled], axis=1).astype(np.float32)

        # store tensors
        self.X = torch.from_numpy(X)
        self.S0 = torch.from_numpy(self.df["S0"].values.astype(np.float32))
        self.K = torch.from_numpy(self.df["K"].values.astype(np.float32))
        self.T = torch.from_numpy(self.df["T"].values.astype(np.float32))   # raw for BS
        self.r = torch.from_numpy(self.df["r"].values.astype(np.float32))
        self.q = torch.from_numpy(self.df["q"].values.astype(np.float32))
        self.P = torch.from_numpy(self.df["price_obs"].values.astype(np.float32))

    def __len__(self): return len(self.df)

    def __getitem__(self, i):
        return {
            "features": self.X[i],
            "S0": self.S0[i], "K": self.K[i], "T": self.T[i],
            "r": self.r[i], "q": self.q[i],
            "P_obs": self.P[i],
            "stats": self.stats,
        }


## Creating the Loss function
Due to the nature of the problem we are trying to solve, it would be naive to implement a single function to act as our Loss. Intuitively, we can approach this with a more modular mindset.
Instead of implementing one large Loss function $L$, we create three separate parts of a greater whole. The benefits of doing this include:
 - Debugging/monitoring each piece separately
 - Turning on/off certain loss functions during training
 - Allowing certain loss functions to be weighted differently

 If we just use a single formula, we can't tell whether the model is missing because it's not fitting the data or because a regularizer is too strong.

 We can proceed with the mental model:
1. A fit-to-market loss function `loss_fit_price`
	- It uses only the quotes observed (our 15 per snapshot)
	- It teaches the model to "Match the market at the points we actually saw."
	- This is the "Primary teacher"
2. No-arbitrage shape penalty `loss_noarb_bounds`
	- Uses the model's predicted prices at supervised points
	- It teaches the model to "stay inside basic bounds!"
		- $0\leq C \leq S_0 e^{-qT}$
	- These are soft constraints/regularizers, small weights. We don't want them to dominate.
3. Physics (PDE) Residual + Boundary/Terminal `loss_pde_and_bc`
	- It uses collocation points we label, autodiff to get $u_t,u_S,u_{SS}$ and check the Black-Scholes PDE and simple boundary/terminal conditions.
	- It teaches the model to "Be locally consistent with the BS dynamics, respecting the payoff at certain points".
	- Since it doesn't use labeled data, it should be a different kind of term
	- **Idea:** Start training without it **(warm-up)** and then turn it on with a small weight, so it regularizes rather than overpowers the fit.


Overall, the total loss is:

$$
L = L_{\text{fit}} + \lambda_{\text{arb}} \space L_{\text{arb}} + \lambda_{\text{pde}} \space L_{\text{pde}} + \lambda_{\text{bc}} \space L_{\text{bc}}
$$

$$
L_{\text{arb}} = \mathbb{E}(\text{ReLU}(0-C)^2 + \text{ReLU}(C-U)^2)
$$

For the PDE Loss function, just the PDE residual alone is not enough. Many surfaces can satisfy it locally. Therefore, we need three panlties to make the solution unique/physically sensible across the whole domain. These are:
1. Terminal Payoff $(\tau \approx 0)$
	- "Initial Condition" in time
    - Enforces the idea that the option is worth its payoff at expiry.
    - Without it, many different surfaces could satisfy the PDE locally, but disagree at expiry.
2. Zero-spot boundary (S = 0)
	- Left boundary in price space
    - Enforces "uselessness" to a call for any $\tau > 0$.
    - Prevents the network from inventing positive values when the call itself is worthless.
    - S -> 0
3. Large-S slope (Delta)
	- Right boundary in price space
    - Enforces the idea that, for large S, a call behaves like stock with divident yield q: $\frac{\partial u}{\partial S} \approx e^{-qT}$
    - Delta is required to prevent the prices to explode/dominate the loss when S is high.
    - S -> $\infty$


In [11]:
# Compute the supervised fit loss
def loss_fit_price(model, batch):

    # Pull the tensors from the batch
    X = batch["features"]
    S0 = batch["S0"]
    K = batch["K"]
    T = batch["T"]
    r = batch["r"]
    q = batch["q"]
    P = batch["P_obs"] # Observed call prices

    # Predict Implied Volatility based on the tensors
    sigma_hat = model(X)

    # Decode to price with the Black-Scholes model
    C_hat = bs_call_price(S0, K, T, r, q, sigma_hat)

    # Compute the fit loss (MAE)
    L_fit = F.l1_loss(C_hat,P)

    # Return BOTH the scalar loss and a small cache to reuse later
    return L_fit, {"sigma_hat": sigma_hat, "C_hat": C_hat}

# Penalize prices that violate the basic no-arbitrage bounds for European Calls
def loss_noarb_bounds(C_hat, S0, T, q):
    # Tiny epsilon to reduce noise
    epsilon = 1e-8
    
    # Compute the upper bound for every contract
    U = S0 * torch.exp(-q * T)

    # Measure violations (Upper/Lower), ReLU keeps penalty 0 when inside bounds
    lower_bound_violation = torch.relu((0.0 - epsilon) - C_hat)
    upper_bound_violation = torch.relu(C_hat - (U + epsilon))

    # Return a penalty scalar based on the bound violations
    return (upper_bound_violation.pow(2) + lower_bound_violation.pow(2)).mean()

# Create synthetic points where we don't have the market quotes
def sample_collocation(batch, n_colloc=None, s_factor=(0.4, 2.5)):

    N = batch["S0"].shape[0]
    if n_colloc is None:
        n_colloc = N
    device = batch["S0"].device

    # sample indices from the current batch
    idx = torch.randint(0, N, (n_colloc,), device=device)

    # gather base tensors
    S0_ref = batch["S0"][idx]
    K = batch["K"][idx]
    r = batch["r"][idx]
    q = batch["q"][idx]

    # tau in [0, T_max]
    T_max = torch.clamp(batch["T"].max(), min=1e-8)
    tau = torch.rand(n_colloc, device=device) * T_max

    # S in [smin, smax] * S0_ref
    smin, smax = s_factor
    S = S0_ref * (smin + (smax - smin) * torch.rand(n_colloc, device=device))

    # build model inputs X_coll with SAME normalization as training
    stats = batch["stats"]
    T_mu = torch.tensor(stats["T_mu"], device=device, dtype=torch.float32)
    T_sd = torch.tensor(stats["T_sd"], device=device, dtype=torch.float32)
    r_mu = torch.tensor(stats["r_mu"], device=device, dtype=torch.float32)
    r_sd = torch.tensor(stats["r_sd"], device=device, dtype=torch.float32)
    q_mu = torch.tensor(stats["q_mu"], device=device, dtype=torch.float32)
    q_sd = torch.tensor(stats["q_sd"], device=device, dtype=torch.float32)

    log_mny = torch.log(S0_ref / K)
    tau_n = (tau - T_mu) / T_sd
    r_n = (r - r_mu) / r_sd
    q_n = (q - q_mu) / q_sd
    S0_scaled = S0_ref / 100.0

    #Covert 5 separate 1-D tensors into one 2-D Tensor
    X_coll = torch.stack([log_mny, tau_n, r_n, q_n, S0_scaled], dim=-1)

    return {
            "S": S, "K": K, "tau": tau, "r": r, "q": q,
            "X_coll": X_coll, "stats": batch["stats"],
            "S0_ref": S0_ref
           }
    


def loss_pde_and_bc(model, coll, lambda_bc=1e-4):

    # Pull tensors from coll
    X_coll = coll["X_coll"]
    S = coll["S"].detach().clone().requires_grad_(True)
    tau = coll["tau"].detach().clone().requires_grad_(True)
    K = coll["K"]
    r = coll["r"]
    q = coll["q"]
    stats = coll["stats"]
    S0_ref = coll["S0_ref"]

    # Predict Implied Volatility at collocation points
    sigma_hat = model(X_coll)

    #Price surface at collocation points
    u = bs_call_price(S, K, tau, r, q, sigma_hat)

    # Take first derivative of u w.r.t S
    u_S = torch.autograd.grad(
        outputs = u, 
        inputs = S, 
        grad_outputs = torch.ones_like(u),
        create_graph = True
    )[0]

    # Take second derivative of u w.r.t S
    u_SS = torch.autograd.grad(
        outputs = u_S,
        inputs = S,
        grad_outputs = torch.ones_like(u_S),
        create_graph = True
    )[0]

    # Take time derivative of u w.r.t tau
    u_tau = torch.autograd.grad(
        outputs = u,
        inputs = tau,
        grad_outputs = torch.ones_like(u),
        create_graph = True
    )[0]

    # Form the Black-Scholes PDE residual
    R_PDE = u_tau + 0.5 * sigma_hat**2 * S.pow(2) * u_SS + (r - q) * S * u_S - (r * u)

    L_pde = torch.mean(R_PDE.pow(2))

    # terminal payoff - Rebuild input features with a small tau value (tau0 = 1e-6) to get a new Implied Volatility
    tau0 = torch.full_like(tau, 1e-6)
    
     # required: previous parameters from batch
    T_mu = torch.tensor(stats["T_mu"], device=device, dtype=torch.float32)
    T_sd = torch.tensor(stats["T_sd"], device=device, dtype=torch.float32)
    r_mu = torch.tensor(stats["r_mu"], device=device, dtype=torch.float32)
    r_sd = torch.tensor(stats["r_sd"], device=device, dtype=torch.float32)
    q_mu = torch.tensor(stats["q_mu"], device=device, dtype=torch.float32)
    q_sd = torch.tensor(stats["q_sd"], device=device, dtype=torch.float32)
    
    log_mny = torch.log(S0_ref / K)
    tau_n = (tau0 - T_mu) / T_sd
    r_n = (r - r_mu) / r_sd
    q_n = (q - q_mu) / q_sd
    S0_scaled = S0_ref / 100.0

    X_tau0 = torch.stack([log_mny, tau_n, r_n, q_n, S0_scaled], dim=-1)

    sigma_hat_tau0 = model(X_tau0)

    u_tau0 = bs_call_price(S, K, tau0, r, q, sigma_hat_tau0)
    payoff = torch.relu(S - K)
    L_term = F.mse_loss(u_tau0, payoff)

    # zero spot boundary (left boundary in price space)
    u_S0 = bs_call_price(torch.zeros_like(S), K, tau, r, q, sigma_hat)
    L_S0 = torch.mean(u_S0**2)

    # Large-S delta behavior (right boundary in price space)
    S_big = 2.5 * S0_ref
    S_big = S_big.detach().clone().requires_grad_(True)
    u_big = bs_call_price(S_big, K, tau, r, q, sigma_hat)
    u_S_big = torch.autograd.grad(
        outputs=u_big,
        inputs=S_big,
        grad_outputs=torch.ones_like(u_big),
        create_graph=False
    )[0]

    delta = torch.exp(-q * tau)

    L_delta = F.mse_loss(u_S_big, delta)

    # Combine all three terms
    L_bc = L_term + L_S0 + L_delta
    
    # return L_pde and L_bc * lambda_bc
    return L_pde, L_bc * lambda_bc
    

