In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import plotly.graph_objects as go
import scipy.stats as st

import warnings
warnings.filterwarnings('ignore', category=UserWarning)

In [2]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [6]:
volatility_max = 0.5
rate_max = 0.3
T = 1
strike = 100
S_max = 3 * strike
call_put = 'Put'
N_points = 500

In [3]:
# same logic as for 2d case, but this time we add Linear Complementarity Problem in the form of American penalty

def pde_dynamic(pinn, S_max, T, r_max, sigma_max, N_points = 200):

    S = S_max * torch.rand(N_points, 1, requires_grad=True)
    tau = T * torch.rand(N_points, 1, requires_grad=True)
    sigma = sigma_max * torch.rand(N_points, 1, requires_grad=True)
    r = r_max * torch.rand(N_points, 1, requires_grad=True)
    
    V = pinn(S, tau, r, sigma)
    V_tau = torch.autograd.grad(V.sum(), tau, create_graph=True)[0]
    V_S = torch.autograd.grad(V.sum(), S, create_graph=True)[0]
    V_SS = torch.autograd.grad(V_S.sum(), S, create_graph=True)[0]

    residual = V_tau - (0.5 * (sigma * S) ** 2 * V_SS + r * S * V_S - r * V)
    
    return torch.mean(residual ** 2)

def boundary_condition(pinn, strike, S_max, T, r_max, sigma_max, call_put = 'Call', N_points = 200):
    
    S0_tensor = torch.zeros(N_points, 1, requires_grad=True)
    Smax_tensor = S_max * torch.ones(N_points, 1, requires_grad=True)
    taus_tensor = T * torch.rand(N_points, 1, requires_grad=True)
    rs_tensor = r_max * torch.rand(N_points, 1, requires_grad=True)
    sigmas_tensor = sigma_max * torch.rand(N_points, 1, requires_grad=True)

    pred_boundary0 = pinn(S0_tensor, taus_tensor, rs_tensor, sigmas_tensor)
    pred_boundary1 = pinn(Smax_tensor, taus_tensor, rs_tensor, sigmas_tensor)

    pred_delta_v0 = torch.autograd.grad(outputs=pred_boundary0, inputs=S0_tensor, grad_outputs=torch.ones_like(pred_boundary0), create_graph=True, retain_graph=True)[0]
    pred_delta_v1 = torch.autograd.grad(outputs=pred_boundary1, inputs=Smax_tensor, grad_outputs=torch.ones_like(pred_boundary1), create_graph=True, retain_graph=True)[0]

    if call_put == 'Call':

        # boudary at S = 0
        boundary_V0 = torch.zeros(N_points, 1)

        # boundary at S \to \infty
        boundary_V1 = S_max - strike * torch.exp(-rs_tensor * taus_tensor)

        delta_V0 = torch.zeros(N_points, 1)
        delta_V1 = torch.ones(N_points, 1)
    
    elif call_put == 'Put':

        # boudary at S = 0
        boundary_V0 = strike * torch.exp(-rs_tensor * taus_tensor)

        # boundary at S \to \infty
        boundary_V1 = torch.zeros(N_points, 1)

        delta_V0 = -torch.ones(N_points, 1)
        delta_V1 = torch.zeros(N_points, 1)

    else:
        raise ValueError("Unsupported kind of option")

    loss_boundary = torch.mean((pred_boundary0 - boundary_V0)**2) + \
                    torch.mean((pred_boundary1 - boundary_V1)**2) + \
                    torch.mean((pred_delta_v0 - delta_V0) ** 2) + \
                    torch.mean((pred_delta_v1 - delta_V1) ** 2)
    
    return loss_boundary

def terminal_condition(pinn, strike, S_max, r_max, sigma_max, call_put, N_points = 200):
    terminal_S = S_max * torch.rand(N_points, 1, requires_grad=True)
    terminal_tau = torch.zeros(N_points, 1)
    r = r_max * torch.rand(N_points, 1, requires_grad=True)
    sigma = sigma_max * torch.rand(N_points, 1, requires_grad=True)
    
    if call_put == 'Call':
        terminal_V = torch.relu(terminal_S - strike)
    elif call_put == 'Put':
        terminal_V = torch.relu(strike - terminal_S)
    else:
        raise ValueError('Unsupported kind of option')

    pred_terminal = pinn(terminal_S, terminal_tau, r, sigma)
    loss_terminal = torch.mean((pred_terminal - terminal_V)**2)
    return loss_terminal

def american_penalty(pinn, strike, S_max, r_max, sigma_max, call_put, N_points = 200):
    S = S_max * torch.rand(N_points, 1, requires_grad=True)
    tau = torch.rand(N_points, 1, requires_grad=True)  # tau > 0 (not at maturity)
    r = r_max * torch.rand(N_points, 1, requires_grad=True)
    sigma = sigma_max * torch.rand(N_points, 1, requires_grad=True)

    if call_put == 'Call':
        intrinsic_value = torch.relu(S - strike)
    elif call_put == 'Put':
        intrinsic_value = torch.relu(strike - S)
    else:
        raise ValueError('Unsupported kind of option')
    
    pred_price = pinn(S, tau, r, sigma)
    constraint_violation = intrinsic_value - pred_price
    loss_american = torch.mean(torch.relu(constraint_violation)**2)
    
    return loss_american

In [4]:
class PINN(nn.Module):
    def __init__(self, S_max, T, r_max, sigma_max):

        self.S_max = S_max
        self.T = T
        self.r_max = r_max
        self.sigma_max = sigma_max

        super(PINN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(4, 20),  # Input: (S, tau, r, sigma)
            nn.Tanh(),
            nn.Linear(20, 20),
            nn.Tanh(),
            nn.Linear(20, 20),
            nn.Tanh(),
            nn.Linear(20, 20),
            nn.Tanh(),
            nn.Linear(20, 1),
            nn.Softplus()   # Output: V(S,tau, r, sigma)
        )
        
    def forward(self, S, tau, r, sigma):

        # normalize the inputs
        S_norm = S / self.S_max
        tau_norm = tau / self.T
        r_norm = r / self.r_max
        sigma_norm = sigma / self.sigma_max

        points = torch.cat([S_norm, tau_norm, r_norm, sigma_norm], dim=1)
        return self.net(points)

In [7]:
def train_network(strike = strike, rate_max = 0.3, volatility_max = 0.5, S_max = S_max, T = T, call_put = 'Call', is_american = False, N_points = 200):
    # ================== Training Setup ==================
    pinn = PINN(S_max=S_max, T=T, r_max = rate_max, sigma_max = volatility_max)
    optimizer = torch.optim.Adam(pinn.parameters(), lr=0.001)


    # ================== Training Loop with EMA Early Stopping ==================
    epochs = 15000
    train_losses = []

    # EMA parameters
    ema_loss = None
    alpha = 0.1       # Smoothing factor
    patience = 200     # Epochs to wait before stopping
    min_delta = 1e-5   # Minimum improvement threshold
    wait = 0           # Epochs since last improvement
    min_epochs = 100   # Minimum training epochs before checking

    for epoch in range(epochs):
        optimizer.zero_grad()
        
        # Compute losses
        loss_initial = terminal_condition(pinn, strike = strike, S_max = S_max, call_put = call_put, r_max = rate_max, sigma_max = volatility_max, N_points = N_points)
        loss_boundary = boundary_condition(pinn, strike = strike, S_max = S_max, T = T, r_max = rate_max, sigma_max = volatility_max, call_put = call_put, N_points = N_points)
        loss_physics = pde_dynamic(pinn, S_max = S_max, T = T, r_max = rate_max, sigma_max = volatility_max, N_points = N_points)
        american_loss = american_penalty(pinn, strike = strike, S_max = S_max, r_max = rate_max, sigma_max = volatility_max, call_put = call_put, N_points = N_points) if is_american == True else 0
        loss = loss_initial + loss_boundary + loss_physics + american_loss

        train_losses.append(loss.item())
        
        # Backpropagation
        loss.backward()
        optimizer.step()
        
        # EMA-based early stopping
        ema_loss = alpha * loss.item() + (1-alpha)*ema_loss if ema_loss else loss.item()
        
        if epoch > min_epochs:
            if loss.item() < ema_loss - min_delta:
                wait = 0  # Reset counter if improving
            else:
                wait += 1
                if wait >= patience:
                    print(f"\nEarly stopping at epoch {epoch}")
                    print(f"Final loss: {loss.item():.6f} (EMA: {ema_loss:.6f})")
                    break
        
        if epoch % 500 == 0:
            print(f"Epoch {epoch:4d} | Loss: {loss.item():.6f} | EMA: {ema_loss:.6f}")
    
    return pinn

In [8]:
pinn = train_network(strike = strike, 
                    rate_max = rate_max, 
                    volatility_max = volatility_max, 
                    S_max = S_max, 
                    T = T, 
                    call_put = call_put, 
                    is_american = True, 
                    N_points=N_points)

Epoch    0 | Loss: 10402.850586 | EMA: 10402.850586
Epoch  500 | Loss: 7891.113281 | EMA: 7733.561809
Epoch 1000 | Loss: 5639.455566 | EMA: 5727.917825
Epoch 1500 | Loss: 4268.800781 | EMA: 4221.067233
Epoch 2000 | Loss: 3033.609375 | EMA: 3063.962159
Epoch 2500 | Loss: 2173.137695 | EMA: 2178.949610
Epoch 3000 | Loss: 1487.757690 | EMA: 1480.218959
Epoch 3500 | Loss: 946.949585 | EMA: 971.583117
Epoch 4000 | Loss: 598.384094 | EMA: 608.269614
Epoch 4500 | Loss: 356.013245 | EMA: 361.215648
Epoch 5000 | Loss: 196.960236 | EMA: 207.259437
Epoch 5500 | Loss: 115.366959 | EMA: 114.394047
Epoch 6000 | Loss: 61.207062 | EMA: 63.771077
Epoch 6500 | Loss: 35.714939 | EMA: 37.557740
Epoch 7000 | Loss: 25.755754 | EMA: 25.654392
Epoch 7500 | Loss: 19.868736 | EMA: 19.846193
Epoch 8000 | Loss: 17.173161 | EMA: 16.982141
Epoch 8500 | Loss: 18.316330 | EMA: 16.561793
Epoch 9000 | Loss: 17.098394 | EMA: 15.065187
Epoch 9500 | Loss: 15.381964 | EMA: 14.813926
Epoch 10000 | Loss: 18.166096 | EMA: 14.

In [11]:
def visualize_for_r_and_sigma(pinn, r, sigma, N_points = 100):
    S_test = torch.linspace(0, S_max, N_points)
    tau_test = torch.linspace(0, T, N_points)

    # Create 2D meshgrid
    S_grid, tau_grid = torch.meshgrid(S_test, tau_test, indexing='xy')

    # Flatten and add fixed parameters
    S_flat = S_grid.reshape(-1, 1)
    tau_flat = tau_grid.reshape(-1, 1)
    r_flat = torch.ones_like(S_flat) * r
    sigma_flat = torch.ones_like(S_flat) * sigma

    # Predict
    values = pinn(S_flat, tau_flat, r_flat, sigma_flat).detach().numpy()
    values_2d = values.reshape(len(S_test), len(tau_test))

    # Create interactive 3D plot with plotly
    fig = go.Figure(data=[go.Surface(
        x=S_grid.numpy(),
        y=tau_grid.numpy(),
        z=values_2d,
        colorscale='jet',
        opacity=0.8,
    )])

    # Update layout
    fig.update_layout(
        title='Interactive Black-Scholes PDE solution',
        scene=dict(
            xaxis_title='Position (S)',
            yaxis_title='Time (tau)',
            zaxis_title='V(S,tau)',
        ),
        autosize=True,
        width=900,
        height=700,
    )

    # Show the plot
    fig.show()

In [12]:
visualize_for_r_and_sigma(pinn = pinn, r = 0.3, sigma = 0.4, N_points=100)

In [13]:
def compute_greeks(pinn, r, sigma, S_range=(50, 150), tau_range=(0.1, 1.0), 
                       grid_points=(50, 50)):
    
    S_min, S_max = S_range
    tau_min, tau_max = tau_range
    S_points, tau_points = grid_points
    
    # Create 2D grid
    S_vals = torch.linspace(S_min, S_max, S_points, requires_grad=True)
    tau_vals = torch.linspace(tau_min, tau_max, tau_points, requires_grad=True)

    # Create 2D meshgrid
    S_grid, tau_grid = torch.meshgrid(S_vals, tau_vals, indexing='xy')

    # Flatten and add fixed parameters
    S_flat = S_grid.reshape(-1, 1)
    tau_flat = tau_grid.reshape(-1, 1)
    r_flat = torch.ones_like(S_flat, requires_grad=True) * r
    sigma_flat = torch.ones_like(S_flat, requires_grad=True) * sigma

    # Predict
    values = pinn(S_flat, tau_flat, r_flat, sigma_flat)
    
    delta = torch.autograd.grad(
        outputs=values, inputs=S_flat,
        grad_outputs=torch.ones_like(values),
        create_graph=True,
        retain_graph=True
    )[0].reshape(grid_points)
    
    gamma = torch.autograd.grad(
        outputs=delta, inputs=S_flat,
        grad_outputs=torch.ones_like(delta),
        create_graph=False,
        retain_graph=True
    )[0].reshape(grid_points)
    
    theta = -torch.autograd.grad(
        outputs=values, inputs=tau_flat,
        grad_outputs=torch.ones_like(values),
        create_graph=False,
        retain_graph=True
    )[0].reshape(grid_points)

    vega = torch.autograd.grad(
        outputs=values, inputs=sigma_flat,
        grad_outputs=torch.ones_like(values),
        create_graph=True,
        retain_graph=True
    )[0].reshape(grid_points)

    rho = torch.autograd.grad(
        outputs=values, inputs=r_flat,
        grad_outputs=torch.ones_like(values),
        create_graph=True,
        retain_graph=False
    )[0].reshape(grid_points)

    return S_grid, tau_grid, values, delta, gamma, theta, vega, rho

def visualize_greeks(S_grid, tau_grid, greek):

    fig = go.Figure(data=[go.Surface(
        x=S_grid.detach().numpy(),
        y=tau_grid.detach().numpy(),
        z=greek.detach().numpy(),
        colorscale='jet',
        opacity=0.8,
    )])

    # Update layout
    fig.update_layout(
        title='Interactive Black-Scholes PDE solution',
        scene=dict(
            xaxis_title='Position (S)',
            yaxis_title='Time (tau)',
            zaxis_title='V(S,tau)',
        ),
        autosize=True,
        width=900,
        height=700,
    )

    # Show the plot
    fig.show()

In [15]:
greeks = compute_greeks(pinn, r = 0.05, sigma = 0.4)
visualize_greeks(greeks[0], greeks[1], greeks[-3])