# **Heston PINN**
---


$$
\frac{\partial V}{\partial t} + rS \frac{\partial V}{\partial S} + \rho\sigma v S \frac{\partial^2 V}{\partial S \partial v} + \frac{1}{2}S^2 v \frac{\partial^2 V}{\partial S^2} + \frac{1}{2}\sigma^2 v \frac{\partial^2 V}{\partial v^2} + \kappa(\theta - v) \frac{\partial V}{\partial v} - rV = 0
$$
<center>

**Heston PDE**
</center>

Where:
* $V$: Option price (a function of $S$, $v$, and $t$)
* $t$: Time
* $S$: Price of the underlying asset
* $v$: Instantaneous variance of the underlying asset (volatility squared)
* $r$: Risk-free interest rate
* $\rho$: Correlation between the Brownian motion of the asset price and its variance
* $\sigma$: Volatility of the variance (volatility of volatility)
* $\kappa$: Rate at which the variance $v$ reverts to its long-term mean $\theta$
* $\theta$: Long-term mean variance

---

## **Import Libraries**

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time

# Common Parameters (from Black-Scholes)
r = 0.05          # Risk-free rate
sigma = 0.2       # Black-Scholes volatility (used for anchoring)
K = 100.0         # Strike price
T = 1.0           # Time to maturity (in years)

# Heston specific Parameters
# We set theta (long-term variance) and v_0 (initial variance)
# to sigma^2 to make the models comparable.
theta = sigma**2  # Long-term variance (0.04)
v_0 = sigma**2    # Initial variance (0.04)
kappa = 2.0       # Rate of mean reversion for variance
sigma_v = 0.3     # Volatility of variance ("vol of vol")
rho = -0.7        # Correlation between asset and variance

# Domain and Grid Setup
# Spatial domain for Stock Price (S)
S_min = 0.0
S_max = 250.0

# Spatial domain for Variance (v)
v_min = 0.0
v_max = 1.0 # The max variance can be adjusted based on model behavior

# Time domain
t_min = 0.0
t_max = T

# Grid points
N = 500

# Create grids for S, t, and v
# These represent the collocation points for training the PINN
S_grid = torch.linspace(S_min, S_max, N).view(-1, 1).requires_grad_()
t_grid = torch.linspace(t_min, t_max, N).view(-1, 1).requires_grad_()
v_grid = torch.linspace(v_min, v_max, N).view(-1, 1).requires_grad_()

## **Define the Model**

In [16]:
class HestonPINN(nn.Module):
    def __init__(self, input_dim=3, output_dim=1, hidden_dim=4, neurons_per_layer=64, activation_fn=nn.Tanh()):
        super().__init__()
        layers = []

        layers.append(nn.Linear(input_dim, neurons_per_layer))
        layers.append(activation_fn)

        for _ in range(hidden_dim - 1):
            layers.append(nn.Linear(neurons_per_layer, neurons_per_layer))
            layers.append(activation_fn)

        layers.append(nn.Linear(neurons_per_layer, output_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, S, v, t):
        """Concatenates inputs and performs a forward pass."""
        x = torch.cat([S, v, t], dim=1)
        return self.net(x)

## **Define the Losses**

In [17]:
def pde_loss(model, S, v, t):
    # Calculates the residual of the Heston PDE

    C = model(S, v, t)
    
    # First derivatives
    C_t = torch.autograd.grad(C, t, grad_outputs=torch.ones_like(C), create_graph=True)[0]
    C_S = torch.autograd.grad(C, S, grad_outputs=torch.ones_like(C), create_graph=True)[0]
    C_v = torch.autograd.grad(C, v, grad_outputs=torch.ones_like(C), create_graph=True)[0]
    
    # Second derivatives
    C_SS = torch.autograd.grad(C_S, S, grad_outputs=torch.ones_like(C_S), create_graph=True)[0]
    C_vv = torch.autograd.grad(C_v, v, grad_outputs=torch.ones_like(C_v), create_graph=True)[0]
    C_Sv = torch.autograd.grad(C_S, v, grad_outputs=torch.ones_like(C_S), create_graph=True)[0]
    
    # Heston PDE residual
    pde_residual = (
        C_t
        + r * S * C_S
        + kappa * (theta - v) * C_v
        + 0.5 * v * S**2 * C_SS
        + 0.5 * sigma_v**2 * v * C_vv
        + rho * sigma_v * v * S * C_Sv
        - r * C
    )
    
    return torch.mean(pde_residual**2)


def boundary_loss(model, S_boundary, v_boundary, t_boundary):
    #Calculates the loss at the spatial boundaries (S=0 and S=S_max)

    # Loss at S=0 (option is worthless)
    S_zero = torch.zeros_like(t_boundary).requires_grad_()
    C_at_S_zero = model(S_zero, v_boundary, t_boundary)
    loss_S_zero = torch.mean(C_at_S_zero**2)
    
    # Loss at S=S_max (option behaves like S - K*exp(-r(T-t)))
    S_at_max = (torch.ones_like(t_boundary) * S_max).requires_grad_()
    C_at_S_max_pred = model(S_at_max, v_boundary, t_boundary)
    C_at_S_max_true = S_at_max - K * torch.exp(-r * (T - t_boundary))
    loss_S_max = torch.mean((C_at_S_max_pred - C_at_S_max_true)**2)
    
    # Note: For v boundaries, enforcing the PDE is a common strategy.
    # This is implicitly handled by including v_min and v_max points
    # in the collocation points for the pde_loss.
    
    return loss_S_zero + loss_S_max


def terminal_loss(model, S_terminal, v_terminal):
    # Calculates the loss at the terminal condition (t=T), i.e., the payoff
    
    t_terminal = (torch.ones_like(S_terminal) * T).requires_grad_()
    C_pred = model(S_terminal, v_terminal, t_terminal)
    
    # Payoff for a European Call option: max(S - K, 0)
    C_true = torch.clamp(S_terminal - K, min=0)
    
    return torch.mean((C_pred - C_true)**2)




## **Training**

In [None]:
# Hyperparameters
input_dim       = 3        # Fixed num of inputs (S, v, t)
output_dim      = 1        # Fixed num of outputs (The price)
hidden_dim      = 16       # Num hidden layers
num_neurons     = 16       # Number of neurons per layer
num_epochs      = 10000    # Training epochs
learning_rate   = 0.001    # Optimizer learning rate

# Number of points to sample for each loss component
N_pde = 2500
N_boundary = 500
N_terminal = 500

# Loss weights (can be tuned)
pde_weight = 14.46
boundary_weight = 0.086
terminal_weight = 0.05

# Model, Optimizer, Scheduler
model = HestonPINN(
    input_dim=input_dim,
    output_dim=output_dim,
    hidden_dim=hidden_dim,
    neurons_per_layer=num_neurons,
    activation_fn=nn.Tanh()
)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=500, factor=0.5, verbose=True)

# Training Loop
print("Starting training...")
start_time = time.time()

# Loss History Tracking
loss_history = {
    'total': [],
    'pde': [],
    'boundary': [],
    'terminal': []
}

for epoch in range(num_epochs):
    optimizer.zero_grad()
    
    # 1. PDE Loss (Interior Points)
    # Sample random points in the (S, v, t) domain
    S_pde = torch.rand(N_pde, 1) * (S_max - S_min) + S_min
    v_pde = torch.rand(N_pde, 1) * (v_max - v_min) + v_min
    t_pde = torch.rand(N_pde, 1) * (t_max - t_min) + t_min
    S_pde.requires_grad = True
    v_pde.requires_grad = True
    t_pde.requires_grad = True
    loss_pde = pde_loss(model, S_pde, v_pde, t_pde)

    # 2. Boundary Loss (S=0 and S=S_max)
    # Sample points on the v and t dimensions for the boundaries
    v_bc = torch.rand(N_boundary, 1) * (v_max - v_min) + v_min
    t_bc = torch.rand(N_boundary, 1) * (t_max - t_min) + t_min
    v_bc.requires_grad = True
    t_bc.requires_grad = True
    loss_bc = boundary_loss(model, None, v_bc, t_bc) # S is handled inside the function

    # 3. Terminal Loss (t=T)
    # Sample points on the S and v dimensions for the terminal condition
    S_tc = torch.rand(N_terminal, 1) * (S_max - S_min) + S_min
    v_tc = torch.rand(N_terminal, 1) * (v_max - v_min) + v_min
    S_tc.requires_grad = True
    v_tc.requires_grad = True
    loss_tc = terminal_loss(model, S_tc, v_tc)
    
    # Combine losses
    total_loss = (pde_weight * loss_pde) + (boundary_weight * loss_bc) + (terminal_weight * loss_tc)
    
    # Backpropagation and optimization
    total_loss.backward()
    optimizer.step()
    
    # Update learning rate scheduler
    scheduler.step(total_loss)

    # Record loss
    loss_history['total'].append(total_loss.item())
    loss_history['pde'].append(loss_pde.item())
    loss_history['boundary'].append(loss_bc.item())
    loss_history['terminal'].append(loss_tc.item())
    
    # Print progress
    if (epoch + 1) % 100 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss.item():.4e}, "
              f"PDE: {loss_pde.item():.4e}, BC: {loss_bc.item():.4e}, TC: {loss_tc.item():.4e}")

end_time = time.time()
print(f"Training finished in {end_time - start_time:.2f} seconds.")



Starting training...
Epoch [100/10000], Loss: 2.1269e+03, PDE: 3.7584e-02, BC: 2.2134e+04, TC: 4.4576e+03
Epoch [200/10000], Loss: 2.0581e+03, PDE: 9.2667e-02, BC: 2.1479e+04, TC: 4.1909e+03
Epoch [300/10000], Loss: 1.9878e+03, PDE: 1.6042e-01, BC: 2.0929e+04, TC: 3.7104e+03
Epoch [400/10000], Loss: 1.9691e+03, PDE: 2.4000e-01, BC: 2.0469e+04, TC: 4.1055e+03
Epoch [500/10000], Loss: 1.9115e+03, PDE: 3.3112e-01, BC: 1.9993e+04, TC: 3.7461e+03
Epoch [600/10000], Loss: 1.8645e+03, PDE: 4.3344e-01, BC: 1.9566e+04, TC: 3.5100e+03
Epoch [700/10000], Loss: 1.8572e+03, PDE: 5.4613e-01, BC: 1.9178e+04, TC: 4.0001e+03
Epoch [800/10000], Loss: 1.7801e+03, PDE: 6.6879e-01, BC: 1.8778e+04, TC: 3.1091e+03
Epoch [900/10000], Loss: 1.7637e+03, PDE: 8.0098e-01, BC: 1.8418e+04, TC: 3.3639e+03
Epoch [1000/10000], Loss: 1.7213e+03, PDE: 9.4220e-01, BC: 1.8064e+04, TC: 3.0835e+03
Epoch [1100/10000], Loss: 1.7090e+03, PDE: 1.0920e+00, BC: 1.7781e+04, TC: 3.2808e+03
Epoch [1200/10000], Loss: 1.6606e+03, PDE:

### **Plot Losses**

In [None]:
plt.figure(figsize=(12, 6))
epochs_recorded = range(100, num_epochs + 1, 100)
plt.plot(epochs_recorded, loss_history['total'], label='Total Loss')
plt.plot(epochs_recorded, loss_history['pde'], label='PDE Loss', linestyle='--')
plt.plot(epochs_recorded, loss_history['boundary'], label='Boundary Loss', linestyle='--')
plt.plot(epochs_recorded, loss_history['terminal'], label='Terminal Loss', linestyle='--')

plt.title('Training Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.yscale('log')
plt.legend()
plt.grid(True, which="both", ls="--")
plt.show()