This notebook implements the market simulators for Deep Hedging. Two models are implemented and constrasted: (1) GBM and (2) Heston Market Model

First, the code structure is arranged here in accordance to OOP principles. This will allow easy development of testing and also that of transferring the code to external FASTApi apps. However, jupyter notebooks will be the research base for clear communication and noting of research/design decisions.

In [1]:
import torch
import torch.nn as nn
from abc import ABC, abstractmethod

class MarketSimulator(nn.Module, ABC):
    """
    Abstract Base Class for Market Simulators. 
    To fascilitate interfacing between different models interchangeably with the rest of the system
    
    Output Shape for simulation data: [Batch_Size, Time_Steps, Features]
    """
    def __init__(self, dt, device):
        super().__init__()
        self.dt = dt
        self.device = device

    @abstractmethod
    def forward(self, num_paths, num_steps, initial_prices):
        """
        Generates the market paths.
        
        Args:
            num_paths (int): Number of Monte Carlo simulations. This is the Batch Size of the model.
            num_steps (int): Number of discrete time-steps (T).
            initial_prices (Tensor): Starting prices S_0 [Batch, Assets].
            
        Returns:
            Tensor: The simulated market states.
        """
        pass


The first model: Geometric Brownian Motion (GBM)

This model is defined by the Stochastic Differential Equation: $$dS_t = \mu S_t dt + \sigma S_t dW_t$$

Where, 
* $S_t$: Asset price at time $t$.
* $\mu$: Drift (constant trend of the asset).
* $\sigma$: Volatility (constant standard deviation of returns).
* $dW_t$: A standard Brownian Motion (Wiener Process).

This SDE is discretized as:  $S_{t+\Delta t} = S_t (1 + \mu \Delta t + \sigma \sqrt{\Delta t} Z)$

The above was in price terms, instead of prices themselves log of prices maybe utilized

Log-Price Space ($x_t = \ln S_t$) 

For which, the GBM SDE can be subjected to It√¥'s Lemma:$$dx_t = \left( \mu - \frac{1}{2}\sigma^2 \right) dt + \sigma dW_t$$
This SDE is discretized as: $x_{t+\Delta t} = x_t + (\mu - 0.5\sigma^2)\Delta t + \sigma \sqrt{\Delta t} Z$

In [2]:
import torch

def pyt_gbm(S0, mu, sigma, T, dt, n_paths, use_log=True):
    steps = int(T / dt)
    n_paths = int(n_paths)
    
    # Generate noise by drawing from std-normal-distribution, hence with mean=0, var=1
    Z = torch.randn(steps, n_paths) #[steps x paths matrix, so each column in a path]

    # Paths generated in either price and log-price  
    if use_log:
        # Log-price: provides numerical stability with non-negative values due to log and also due to additive updates
        drift = (mu - 0.5 * sigma**2) * dt
        diffusion = sigma * torch.sqrt(torch.tensor(dt)) * Z
        log_increments = drift + diffusion
        
        # cumulative sum of log-returns using cumsum
        x = torch.log(torch.tensor(S0)) + torch.cumsum(log_increments, dim=0)
        return torch.exp(x)
    else:
        # Price-space: disadvantaged relative to log-prices due to potential for negative values and also multiplicative updates
        # S_{t+1} = S_t * (1 + mu*dt + sigma*sqrt(dt)*Z)
        increments = 1 + mu * dt + sigma * torch.sqrt(torch.tensor(dt)) * Z
        prices = S0 * torch.cumprod(increments, dim=0) #return values in proces and not in log-prices
        return prices

To test the above method, its compliance to this maybe checked: $E[S_T] = S_0 e^{\mu T}$

this results from the analytical solution to the GBM SDE. 

In [3]:
# Test for the above method
# To check the compliance of the method with the Expected value, the mean across a large sample is tested

def test_gbm_mean_pytorch():
    S0, mu, sigma, T, dt = 100.0, 0.05, 0.2, 1.0, 0.001
    n_paths = 100000

    # generate price-paths
    prices = pyt_gbm(S0, mu, sigma, T, dt, n_paths)
    terminal_prices = prices[-1] # consider only the final prices
    
    theoretical_mean = S0 * torch.exp(torch.tensor(mu * T)) # value for generated data to be checked against 
    theoretical_mean = theoretical_mean.cpu()
    sample_mean = terminal_prices.mean().item() 
    
    # standard error of the mean = sigma_sample / sqrt(n)
    sem = terminal_prices.std().item() / torch.sqrt(torch.tensor(n_paths))

    # log findings 
    print(f'The Std. Error of the Mean is {sem}')

    # assert within 3 standard errors (99.7% confidence)
    assert abs(sample_mean - theoretical_mean) < 3 * sem

# Perform the test
test_gbm_mean_pytorch()

The Std. Error of the Mean is 0.06745587289333344


### Heston Model

The Heston models differs primarily from the GBM in that it models volatility as a non-constant stochastic process.
This increases the realism of the simulation by being able to capture market dynamics such as spikes in volatility when there are price drops etc.


The model:
       $$ dS_t = \mu  S_t  dt + \sqrt(v_t) S_t  dW1_t $$
       $$  dv_t = \kappa  (\theta - v_t)  dt + \sigma_v  \sqrt(v_t)  dW2_t $$ 
        
where:
       $$ S_t - \text{is the  stock price} $$
       $$ v_t -  \text{variance} $$
       $$ dW1_t, dW2_t -  \text{correlated Brownian motions with correlation rho} $$
       $$ \text{the two Brownian motions are linked by the correlation } \rho \text{ by } d\langle W^S, W^v \rangle_t = \rho dt $$
       

The Parameters in the equations above:
* $\mu$ : The expected return of the asset (often set to $r$, the risk-free rate which should hold as a risk-neutral measure).
* $\sqrt{v_t}$ (Stochastic Volatility): this is the volatility at any time-step or $d_t$, i.e., the instantaneous volatility.
* $\kappa$ (Mean Reversion Speed): the rate at which the volatility returns to the long-run average.
* $\theta$ (Long-Run Variance): The mean level that variance gravitates toward.
* $\xi$ (Vol of Vol): The volatility that the variance process follows.
* $\rho$ (Correlation): This is the correlation between the two Brownian motions for asset prices and asset variances.
  $\rho$ is usually negative to capture market dynamics of variance of asset prices rising as the asset prices fall. 

In [4]:
def heston_simulation(
        n_paths: int = 5,
        S0: float = 100.0,          # starting stock price
        v0: float = 0.04,           # starting variance (vol^2, so sqrt(0.04)=0.2 or 20% volatility)
        mu: float = 0.05,           # drift of the process
        kappa: float = 2.0,         # mean reversion speed
        theta: float = 0.04,        # long-term variance
        sigma_v: float = 0.3,       # volatility of volatility 
        rho: float = -0.7,          # correlation between asset price and variance
        T: float = 1.0,             # overall time horizon
        dt: float = 0.01,           # time step
        scheme: str = 'euler',       # 'euler' first then 'milstein' or 'truncated' ##TODO: implement Milstein and Truncated methods
        device = 'cpu'
    ):
    """
        Simulates Heston model given asset price paths
        
        Returns:
            S - torch.Tensor of shape (n_paths, n_steps + 1) containing price paths
            v - torch.Tensor of shape (n_paths, n_steps + 1) containing variance paths
    """
    import torch

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    
    n_steps = int(T/dt) #torch.tensor(int (T / dt))
    n_paths = int(n_paths) #n_paths = torch.tensor(n_paths)
    rho = torch.tensor(rho)

    #generate dW_S, dW_v
    sqrt_dt = torch.sqrt(torch.tensor(dt))
    # independent normal distributions as starting points
    Z1 = torch.randn(n_paths, n_steps, device=device)
    Z2 = torch.randn(n_paths, n_steps, device=device)
    # induce the correlation rho in the Brownian motions
    dW_S = sqrt_dt * Z1 # dW_S = sqrt(dt) * Z1
    dW_v = sqrt_dt * (rho * Z1 + torch.sqrt(1 - rho**2) * Z2) # dW_v = sqrt(dt) * (rho * Z1 + sqrt(1 - rho^2) * Z2)

    S = torch.zeros(n_paths, n_steps + 1, device=device)
    v = torch.zeros(n_paths, n_steps + 1, device=device)
    
    S[:, 0] = S0 #set the first price to starting price
    v[:, 0] = v0 #set the first variance to starting variance

    #turn to torch for cuda usage
    kappa = torch.tensor(kappa)
    theta = torch.tensor(theta)
    sigma_v = torch.tensor(sigma_v)
    mu = torch.tensor(mu)

    for t in range(n_steps):
        v_pos = torch.clamp(v[:, t], min=0.0) #keep variance positive, since the next column is being updated per loop
        sqrt_v = torch.sqrt(v_pos)
        # update variance as per the Heston equation dv = kappa*(theta - v)*dt + sigma_v*sqrt(v)*dW_v
        v[:, t+1] = v[:, t] + kappa * (theta - v_pos) * dt + sigma_v * sqrt_v * dW_v[:, t]
        # update stock price as per Heston equation dS = mu*S*dt + sqrt(v)*S*dW_S  <-- 
        S[:, t+1] = S[:, t] * (1 + mu * dt + sqrt_v * dW_S[:, t])
    
    return S, v
    

In [5]:
S, v = heston_simulation()
S.shape, v.shape
#S, v

(torch.Size([5, 101]), torch.Size([5, 101]))

In [6]:
# fit simulation into a simple NN
from torch.nn import Module


class DeepHedgingMLPHeston(nn.Module):

    def __init__(self, input_dim: int = 4, # [S_t -> asset price, v_t -> asset variance, delta_{t-1} -> asset volume held, pi_{t-1} -> PnL]
                 hidden_dims: list = [64, 64], #hidden layers of the NN
                 output_dim: int = 1): #output -> hedging decision
        super().__init__()
        layers = [] #to hold hidden dims and feed into nn.Sequential
        prev_dim = input_dim
        # pack in layers starting from input_dim through to the end of hidden dims, with a relu
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, output_dim)) #pack in output layer
        self.network = nn.Sequential(*layers) #pack in the hidden layers

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.network(x)

class EuropeanCallPayoff:
    def __init__(self, strike: float = 100.0):
        self.strike = strike
        
    def __call__(self, S_T: torch.Tensor) -> torch.Tensor:
        '''
        Args: S_T: Terminal stock prices, shape (n_paths,)
        Returns: Payoffs, shape (n_paths,)
        '''
        return torch.relu(S_T - self.strike) # max(S-K, 0)

def simple_euro_payoff(S_T: torch.Tensor, strike: float):
    return torch.relu(S_T - strike)

In [7]:
from typing import Tuple

class DeepHedgingAgentHeston:
    def __init__(self, model: nn.Module, transaction_cost: float = 0.001):
        self.model = model
        self.tc = transaction_cost
        
    def hedge_path(
        self, 
        S_path: torch.Tensor, # asset price path from Heston model
        v_path: torch.Tensor, # asset variance "path" from Heston model
        payoff: torch.Tensor  # final PnL amount
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        '''
        execute hedging strategy on the given price and variance paths
        
        Args:
            S_path: Price path with shape (batch_size, n_steps + 1)
            v_path: Variance path with shape (batch_size, n_steps + 1)
            payoff: Terminal payoff with shape (batch_size,)
            
        Returns:
            final_pnl: Final PnL of shape (batch_size,)
            hedge_positions: Hedging positions over time
        '''
        batch_size, n_steps_plus_1 = S_path.shape
        n_steps = n_steps_plus_1 - 1
        device = S_path.device # decide cuda if from cuda
        
        delta = torch.zeros(batch_size, 1, device=device)
        pnl = torch.zeros(batch_size, device=device)
        hedge_positions = torch.zeros(batch_size, n_steps, device=device)
        
        for t in range(n_steps):
            S_t = S_path[:, t].unsqueeze(1)
            v_t = v_path[:, t].unsqueeze(1)  
            
            # network input: [S_t, v_t, delta_{t-1}, pnl_{t-1}]
            net_input = torch.cat([S_t, v_t, delta, pnl.unsqueeze(1)], dim=1)
            
            delta_new = self.model(net_input)
            trade = delta_new - delta
            tc_cost = self.tc * torch.abs(trade * S_t).squeeze()
            
            if t > 0:
                price_change = S_path[:, t] - S_path[:, t-1]
                pnl += delta.squeeze() * price_change
            
            pnl -= tc_cost
            delta = delta_new
            hedge_positions[:, t] = delta.squeeze()
        
        # final PnL
        final_price_change = S_path[:, -1] - S_path[:, -2]
        pnl += delta.squeeze() * final_price_change
        pnl -= payoff
        
        return pnl, hedge_positions

In [8]:
from typing import Tuple

class DeepHedgingAgentHeston:
    def __init__(self, model: nn.Module, transaction_cost: float = 0.001):
        self.model = model
        self.tc = transaction_cost
        
    def hedge_path(
        self, 
        S_path: torch.Tensor, # asset price path from Heston model
        v_path: torch.Tensor, # asset variance "path" from Heston model
        payoff: torch.Tensor  # final PnL amount
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        '''
        execute hedging strategy on the given price and variance paths
        
        Args:
            S_path: Price path with shape (batch_size, n_steps + 1)
            v_path: Variance path with shape (batch_size, n_steps + 1)
            payoff: Terminal payoff with shape (batch_size,)
            
        Returns:
            final_pnl: Final PnL of shape (batch_size,)
            hedge_positions: Hedging positions over time
        '''
        batch_size, n_steps_plus_1 = S_path.shape
        n_steps = n_steps_plus_1 - 1
        device = S_path.device # decide cuda if from cuda
        
        delta = torch.zeros(batch_size, 1, device=device)
        pnl = torch.zeros(batch_size, device=device)
        hedge_positions = torch.zeros(batch_size, n_steps, device=device)
        
        for t in range(n_steps):
            S_t = S_path[:, t].unsqueeze(1)
            v_t = v_path[:, t].unsqueeze(1)  
            
            # network input: [S_t, v_t, delta_{t-1}, pnl_{t-1}]
            net_input = torch.cat([S_t, v_t, delta, pnl.unsqueeze(1)], dim=1)
            
            delta_new = self.model(net_input)
            trade = delta_new - delta
            tc_cost = self.tc * torch.abs(trade * S_t).squeeze()
            
            if t > 0:
                price_change = S_path[:, t] - S_path[:, t-1]
                pnl += delta.squeeze() * price_change
            
            pnl -= tc_cost
            delta = delta_new
            hedge_positions[:, t] = delta.squeeze()
        
        # final PnL
        final_price_change = S_path[:, -1] - S_path[:, -2]
        pnl += delta.squeeze() * final_price_change
        pnl -= payoff
        
        return pnl, hedge_positions

In [9]:
#CVar loss function
def cvar_loss(pnl: torch.Tensor, alpha: float = 0.1) -> torch.Tensor:
    '''
    computes Conditional Value At Risk (CVar) as a loss function
    CVar gives losses accrued at worst x% of the tail. 

    inputs:
    pnl: tensor holding the current PnL
    alpha: the x% over which to compute

    return average value of alpha% of pnl samples with lowest values
    '''
    import torch
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    batch_size = pnl[0] # first value in tensor 
    num_rows_worst = max(1, int(alpha * batch_size))
    sorted_pnl_rows, _ = torch.sort(pnl)
    return sorted_pnl_rows[:num_rows_worst].mean() #return avg of alpha% worst values
    

In [11]:
from typing import Callable

def train_deep_hedging_heston__(
    model: nn.Module,
    payoff_fn: Callable = EuropeanCallPayoff(),
    n_epochs: int = 100,
    n_paths_per_epoch: int = 1024,
    batch_size: int = 256,
    lr: float = 0.001,
    transaction_cost: float = 0.001,
    alpha: float = 0.05,
    device: str = 'cpu'
):
    """Train Deep Hedging model for Heston"""

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    agent = DeepHedgingAgentHeston(model, transaction_cost)
    
    losses = []

    
    for epoch in range(1): #n_epochs
        epoch_losses = []
        n_batches = n_paths_per_epoch // batch_size
        
        for batch_idx in range(n_batches):
            # simulate price and variance paths
            S_paths, v_paths = heston_simulation(n_paths = batch_size, device = device) 
            payoffs = payoff_fn(S_paths[:, -1])
            print(f'For batch_idx-{batch_idx}, payoffs.shape is {payoffs.shape}\n \
            S_paths[:3, :3] is {S_paths[:3, :3] } and v_paths[:3, :3] is { v_paths[:3, :3]}, and \
            payoffs[:3] is {payoffs[:3]}')
            #print(payoffs.shape)
            
            # execute hedging strategy
            pnl, _ = agent.hedge_path(S_paths, v_paths, payoffs)
            
            # compute CVaR loss
            loss = cvar_loss(pnl, alpha)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        avg_loss = torch.mean(torch.tensor(epoch_losses))
        losses.append(avg_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f'At Epoch No. {epoch+1}/{n_epochs}, CVaR Loss is {avg_loss:.4f}')
    
    return model, losses

In [16]:
model, losses =  train_deep_hedging_heston__(DeepHedgingMLPHeston())

For batch_idx-0, payoffs.shape is torch.Size([256])
             S_paths[:3, :3] is tensor([[100.0000,  96.2059,  92.3677],
        [100.0000, 100.1506,  99.5477],
        [100.0000, 101.5181, 101.3800]], device='cuda:0') and v_paths[:3, :3] is tensor([[0.0400, 0.0526, 0.0600],
        [0.0400, 0.0353, 0.0397],
        [0.0400, 0.0374, 0.0362]], device='cuda:0'), and             payoffs[:3] is tensor([ 0.0000, 10.6997,  0.0000], device='cuda:0')
For batch_idx-1, payoffs.shape is torch.Size([256])
             S_paths[:3, :3] is tensor([[100.0000, 102.2138, 101.5347],
        [100.0000, 103.3844, 105.7877],
        [100.0000, 100.7267,  99.7834]], device='cuda:0') and v_paths[:3, :3] is tensor([[0.0400, 0.0262, 0.0270],
        [0.0400, 0.0312, 0.0307],
        [0.0400, 0.0387, 0.0348]], device='cuda:0'), and             payoffs[:3] is tensor([ 7.5789,  1.9780, 13.8036], device='cuda:0')
For batch_idx-2, payoffs.shape is torch.Size([256])
             S_paths[:3, :3] is tensor([[100.000

In [14]:
from typing import Callable

def train_deep_hedging_heston(
    model: nn.Module,
    payoff_fn: Callable = EuropeanCallPayoff(),
    n_epochs: int = 100,
    n_paths_per_epoch: int = 1024,
    batch_size: int = 256,
    lr: float = 0.001,
    transaction_cost: float = 0.001,
    alpha: float = 0.05,
    device: str = 'cpu'
):
    """Train Deep Hedging model for Heston"""

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    agent = DeepHedgingAgentHeston(model, transaction_cost)
    
    losses = []
    
    for epoch in range(n_epochs):
        epoch_losses = []
        n_batches = n_paths_per_epoch // batch_size
        
        for batch_idx in range(n_batches):
            # simulate price and variance paths
            S_paths, v_paths = heston_simulation(n_paths = batch_size, device = device) 
            payoffs = payoff_fn(S_paths[:, -1])
            #print(payoffs.shape)
            
            # execute hedging strategy
            pnl, _ = agent.hedge_path(S_paths, v_paths, payoffs)
            
            # compute CVaR loss
            loss = cvar_loss(pnl, alpha)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        avg_loss = torch.mean(torch.tensor(epoch_losses))
        losses.append(avg_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f'At Epoch No. {epoch+1}/{n_epochs}, CVaR Loss is {avg_loss:.4f}')
    
    return model, losses

In [15]:
cvar_trained_model, losses = train_deep_hedging_heston(DeepHedgingMLPHeston())

At Epoch No. 10/100, CVaR Loss is -1131528585216.0000
At Epoch No. 20/100, CVaR Loss is -93539482785508491264.0000
At Epoch No. 30/100, CVaR Loss is -3744417705085489905664.0000
At Epoch No. 40/100, CVaR Loss is -221937723888352886784.0000
At Epoch No. 50/100, CVaR Loss is -2301204444534013952.0000
At Epoch No. 60/100, CVaR Loss is -120961812955637219328.0000
At Epoch No. 70/100, CVaR Loss is -336770489594859749376.0000
At Epoch No. 80/100, CVaR Loss is -9854394954174955520.0000
At Epoch No. 90/100, CVaR Loss is -215157906493067493376.0000
At Epoch No. 100/100, CVaR Loss is -9768062400673611776.0000


In [42]:
# Entropic loss function
def entropic_risk_loss(pnl: torch.Tensor, beta: float = 0.1) -> torch.Tensor:
    '''
    computes entropic loss as a loss function
    entropic loss := (1/beta) * log(E[exp(-beta * PnL)])

    inputs:
    pnl: tensor holding the current PnL
    beta: risk sensitivity parameter, it penalizes losses in PnL, the higher the beta more severly will PnL losses be penalized

    return aentropic loss as scalar tensor
    '''
    import torch
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    return (1.0 / beta) * torch.log(torch.exp(-beta * pnl).mean())

In [43]:
from typing import Callable

def train_deep_hedging_heston_entropic(
    model: nn.Module,
    payoff_fn: Callable = EuropeanCallPayoff(),
    n_epochs: int = 100,
    n_paths_per_epoch: int = 1024,
    batch_size: int = 256,
    lr: float = 0.001,
    transaction_cost: float = 0.001,
    alpha: float = 0.05,
    device: str = 'cpu'
):
    """Train Deep Hedging model for Heston"""

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    agent = DeepHedgingAgentHeston(model, transaction_cost)
    
    losses = []
    
    for epoch in range(n_epochs):
        epoch_losses = []
        n_batches = n_paths_per_epoch // batch_size
        
        for batch_idx in range(n_batches):
            # simulate price and variance paths
            S_paths, v_paths = heston_simulation(n_paths = batch_size, device = device) #simulator.simulate_with_variance(batch_size, device=device)
            payoffs = payoff_fn(S_paths[:, -1])
            #print(payoffs.shape)
            
            # execute hedging strategy
            pnl, _ = agent.hedge_path(S_paths, v_paths, payoffs)
            
            # compute entropic loss
            loss = entropic_risk_loss(pnl, alpha)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_losses.append(loss.item())
        
        avg_loss = torch.mean(torch.tensor(epoch_losses))
        losses.append(avg_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f'At Epoch No. {epoch+1}/{n_epochs}, Entropic Loss is {avg_loss:.4f}')
    
    return model, losses

In [44]:
entropic_trained_model, losses = train_deep_hedging_heston_entropic(DeepHedgingMLPHeston())

At Epoch No. 10/100, Entropic Loss is 8.0561
At Epoch No. 20/100, Entropic Loss is 8.0629
At Epoch No. 30/100, Entropic Loss is 7.8561
At Epoch No. 40/100, Entropic Loss is 8.0338
At Epoch No. 50/100, Entropic Loss is 8.0583
At Epoch No. 60/100, Entropic Loss is 7.8276
At Epoch No. 70/100, Entropic Loss is 7.5737
At Epoch No. 80/100, Entropic Loss is 7.6689
At Epoch No. 90/100, Entropic Loss is 8.1905
At Epoch No. 100/100, Entropic Loss is 7.7507


In [50]:
#simple performance check
import yfinance as yf
import pandas
from typing import Dict, List, Tuple, Optional, Callable
import matplotlib.pyplot as plt
from datetime import datetime, timedelta


In [55]:
def load_market_data(ticker='AAPL', start='2020-01-01', end='2023-12-31'):
    '''Get historical stock data to run model on actual data not simulated ones.'''
    print(f'Downloading {ticker} data for testing...')
    stock = yf.Ticker(ticker)
    data = stock.history(start=start, end=end)
    print(f'Downloaded data for asset {ticker} of {len(data)} days of data.\nStarting date {start} and ending date {end}\n')
    return data
historical_data = load_market_data()

Downloading AAPL data for testing...
Downloaded data for asset AAPL of 1006 days of data.
Starting date 2020-01-01 and ending date 2023-12-31



In [56]:
def compute_realized_variance(data, window=21):
    '''
    Estimate variance from historical data
    Args:
        data: DataFrame of actual stock -- with 'Close' prices
        window: Rolling window for variance calculation, default value of 21 to approximate trading days in a month
    '''
    import torch

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    prices = torch.tensor(data['Close'], device=device)
    prices_rolled = torch.roll(prices, shifts=1, dims=0) 
    returns = torch.log(prices / prices.shift(1))
    
    # Realized variance - annualized
    realized_var = returns.rolling(window=window).var() * 252
    
    return pd.DataFrame({
        'Price': prices,
        'Returns': returns,
        'Variance': realized_var
    }).dropna()

market_df = compute_realized_variance(historical_data, window=21)
print(f'\nProcessed data: {len(market_df)} days')
print(f'Average realized volatility: {np.sqrt(market_df['Variance'].mean()):.2%}')

ValueError: could not determine the shape of object type 'Series'

Attic Below: Delete 

In [15]:
from collections.abc import Callable

def evaluate_hedging_heston(
    model: nn.Module,
    simulator: Callable,
    payoff_fn: Callable,
    n_paths: int = 10000,
    transaction_cost: float = 0.001,
    device: str = 'cpu'
):
    
    model.eval()
    agent = DeepHedgingAgentHeston(model, transaction_cost)
    
    with torch.no_grad():
        S_paths, v_paths = simulator(n_paths, device=device)
        payoffs = payoff_fn(S_paths[:, -1])
        pnl, hedge_positions = agent.hedge_path(S_paths, v_paths, payoffs)
        
        pnl_np = pnl.cpu().numpy()
        
        print("\n" + "="*50)
        print("HESTON MODEL EVALUATION RESULTS")
        print("="*50)
        print(f"Mean P&L: ${pnl_np.mean():.4f}")
        print(f"Std P&L: ${pnl_np.std():.4f}")
        print(f"Min P&L: ${pnl_np.min():.4f}")
        print(f"Max P&L: ${pnl_np.max():.4f}")
        print(f"VaR(5%): ${np.percentile(pnl_np, 5):.4f}")
        print(f"CVaR(5%): ${pnl_np[pnl_np <= np.percentile(pnl_np, 5)].mean():.4f}")
        
        return pnl_np, hedge_positions.cpu().numpy(), S_paths.cpu().numpy(), v_paths.cpu().numpy()


In [11]:

class GBMSimulator(MarketSimulator):
    """
    Simulates markets assuming Constant Volatility.
    
    Equation: dS_t = mu * S_t * dt + sigma * S_t * dW_t
    """
    def __init__(self, mu, sigma, dt, device='cpu'):
        super().__init__(dt, device)
        self.mu = mu
        self.sigma = sigma

    def simulate(self, num_paths, num_steps, s0):
        pass


In [10]:
class HestonSimulator(MarketSimulator):
    """
    Simulates markets assuming Stochastic Volatility.
    
    Equation 1 (Price):    dS_t = mu * S_t * dt + sqrt(v_t) * S_t * dW_S
    Equation 2 (Variance): dv_t = kappa * (theta - v_t) * dt + xi * sqrt(v_t) * dW_v
    """
    def __init__(self, mu, kappa, theta, xi, rho, dt, device='cpu'):
        super().__init__(dt, device)
        self.mu = mu
        self.kappa = kappa  # Mean reversion speed
        self.theta = theta  # Long-run variance
        self.xi = xi        # Vol of Vol
        self.rho = rho      # Correlation (Leverage Effect)

    def simulate(self, num_paths, num_steps, s0, v0):
        # todo: Implementation of Euler-Maruyama with Full Truncation -- PyTorch
        pass