In [None]:
import iisignature
import matplotlib.pyplot as plt
import numpy as np
import sigkernel
import torch
import torchsde
from tqdm.notebook import tqdm

from src.utils.helper_functions.plot_helper_functions import make_grid

In [None]:
is_cuda = torch.cuda.is_available()
device = 'cuda' if is_cuda else 'cpu'

if not is_cuda:
    print("Warning: CUDA not available; falling back to CPU but this is likely to be very slow.")
    
# You realistically need GPU access (either natively or via cloud computing) to run this notebook.

## 1. Generate data

In [None]:
class HestonModel(torch.nn.Module):
    def __init__(self, mu, kappa, theta, sigma, rho):
        super(HestonModel, self).__init__()
        # Parameters as tensors
        self.mu = torch.tensor(mu, dtype=torch.float32)
        self.kappa = torch.tensor(kappa, dtype=torch.float32)
        self.theta = torch.tensor(theta, dtype=torch.float32)
        self.sigma = torch.tensor(sigma, dtype=torch.float32)
        self.rho = torch.tensor(rho, dtype=torch.float32)
        
        # Specify the noise type as 'general'
        self.noise_type = 'general'
        self.sde_type = 'ito'

    def f(self, t, y):
        # Drift part
        S, V = y[..., 0], y[..., 1]
        dS = self.mu * S  # Change this
        dV = self.kappa * (self.theta - V)  # Change this
        return torch.stack([dS, dV], dim=-1)

    def g(self, t, y):
        # Diffusion part corrected to account for noise dimensionality
        S, V = y[..., 0], y[..., 1]
        vol_S = torch.sqrt(V)  # Change this
        vol_v = self.sigma * torch.sqrt(V)  # Change this

        # Constructing a tensor of shape (batch_size, state_dim, noise_dim)
        dW1_dS = vol_S * S  # dW1 effect on S  # Change this
        dW1_dV = torch.zeros_like(S)  # dW1 has no direct effect on V  # Change this
        
        dW2_dS = torch.zeros_like(S)  # dW2 has no direct effect on S, Change this
        dW2_dV = self.rho * vol_S + torch.sqrt(1 - self.rho ** 2) * vol_v  # dW2 effect on V, Change this

        # Stacking to get the correct shape: (batch, state_channels, noise_channels)
        return torch.stack([torch.stack([dW1_dS, dW1_dV], dim=-1),
                            torch.stack([dW2_dS, dW2_dV], dim=-1)], dim=-1)

In [None]:
# Parameters for the Heston model
mu    = 0.05
kappa = 1.5
theta = 0.04
sigma = 0.2
rho   = -0.7

# Initial conditions
S0 = 100  # Initial asset price
v0 = 0.04  # Initial variance
y0 = torch.tensor([[S0, v0]] * 100, dtype=torch.float32)

# Simulation settings
t0, T, dt = 0, 1, 0.01  # start time, end time, and time step
ts = torch.arange(t0, T, dt)

# Create an instance of the Heston model and simulate
heston_model = HestonModel(mu, kappa, theta, sigma, rho)
result = torchsde.sdeint(heston_model, y0, ts, dt=dt, method="euler")

In [None]:
# Otherwise, can use Brownian Motions (scaled, with drift...)
class BrownianMotionDrift(torch.nn.Module):
    
    def __init__(self, mu, sigma, noise_type: str, sde_type: str):
        super().__init__()
        self.mu         = torch.nn.Parameter(torch.tensor(mu, dtype=torch.float32), requires_grad=True) 
        self.sigma      = torch.nn.Parameter(torch.tensor(sigma, dtype=torch.float32), requires_grad=True)
        self.noise_type = noise_type
        self.sde_type   = sde_type
        
    def f(self, t, y):
        # Directly return a tensor filled with self.mu without creating a zeros tensor first
        return torch.full_like(y, self.mu.item(), dtype=torch.float32)
    
    def g(self, t, y):
        # Directly return a tensor filled with self.sigma without creating a zeros tensor first
        return torch.full_like(y, self.sigma.item(), dtype=torch.float32)
    

def return_mmd_distributions(h0_paths, h1_paths, kernel, n_atoms=128, n_paths=32, max_batch=32):

    h0_dists = torch.zeros(n_atoms)
    h1_dists = torch.zeros(n_atoms)
    
    h0_path_bank_size = h0_paths.shape[0]
    h1_path_bank_size = h1_paths.shape[0]

    with torch.no_grad():
        for i in range(n_atoms):
            h0_rands = torch.randperm(h0_path_bank_size)[:int(2*n_paths)]

            ix, jx = h0_rands[:n_paths], h0_rands[n_paths:]
            iy     = torch.randperm(h1_path_bank_size)[:n_paths]
            
            h0_dists[i] = kernel.compute_mmd(h0_paths[ix], h0_paths[jx], max_batch=max_batch)
            h1_dists[i] = kernel.compute_mmd(h0_paths[ix], h1_paths[iy], max_batch=max_batch)
            
    return h0_dists, h1_dists

def expected_type2_error(dist, crit_value):
    n_atoms = dist.shape[0]
    num_fail = dist <= crit_value
    return sum(num_fail)/n_atoms

def scale_transform(path: torch.tensor, scaler: torch.float32) -> torch.tensor:
    device = path.device
    res    = torch.zeros(path.shape).to(device)
    
    scaler_ = torch.tensor(scaler).to(device)
    
    res[..., 1:] = path[..., 1:]*scaler_
    
    return res

In [None]:
# sde params
mu0, sig  = 0., 0.2
mu1, beta = 0., 0.3
noise_type = "diagonal"
sde_type   = "ito"

# Grid params
T          = 1
batch_size = 32768
state_size = 1
dt_scale   = 1e-1  # Finer refinements give better solutions (but slower)

h0_model = BrownianMotionDrift(mu0, sig, noise_type, sde_type).to(device)
h1_model = BrownianMotionDrift(mu1, beta, noise_type, sde_type).to(device)
y0 = torch.full(size=(batch_size, state_size), fill_value=0.).to(device)

In [None]:
n_paths       = [32, 64, 128]
n_grid_points = [32, 64, 128]
n_atoms       = 1024

dyadic_order     = 0
static_kernel    = sigkernel.LinearKernel()
signature_kernel = sigkernel.SigKernel(static_kernel=static_kernel, dyadic_order=dyadic_order)

### 2. Generate distributions

In [None]:
mmd_h0 = torch.zeros((len(n_paths), len(n_grid_points), n_atoms))
mmd_h1 = torch.zeros((len(n_paths), len(n_grid_points), n_atoms))
_scaler = 5

for i, gp in enumerate(tqdm(n_grid_points)):
    ts = torch.linspace(0, T, gp).to(device)

    _dt = dt_scale*torch.diff(ts)[0]

    with torch.no_grad():
        h0_paths = torchsde.sdeint(h0_model, y0, ts, method='euler', dt = _dt).to(device)
        h1_paths = torchsde.sdeint(h1_model, y0, ts, method='euler', dt = _dt).to(device)
        
        h0_paths = torch.cat([
            ts.unsqueeze(-1).expand(batch_size, ts.size(0), 1), 
            torch.transpose(h0_paths, 1, 0)
        ], dim=2)
        
        h1_paths = torch.cat([
            ts.unsqueeze(-1).expand(batch_size, ts.size(0), 1), 
            torch.transpose(h1_paths, 1, 0)
        ], dim=2)
        
        t_h0_paths = scale_transform(h0_paths.clone(), _scaler)
        t_h1_paths = scale_transform(h1_paths.clone(), _scaler)
        t_h0_paths[..., 0] /= _scaler*T
        t_h1_paths[..., 0] /= _scaler*T
        
    for j, np in enumerate(n_paths):
        h0_dists, h1_dists = return_mmd_distributions(
            t_h0_paths, 
            t_h1_paths, 
            signature_kernel, 
            n_atoms=n_atoms, 
            n_paths=np, 
            max_batch=32
        )
        
        mmd_h0[i, j] = h0_dists
        mmd_h1[i, j] = h1_dists

In [None]:
np_  = len(n_paths)
ngp_ = len(n_grid_points)

fig, axes = plt.subplots(np_, ngp_, figsize=(6*ngp_, 6*np_))
n_bins = int(n_atoms/16)

for i in range(ngp_):
    for j  in range(np_):
        this_h0 = mmd_h0[i, j]
        this_h1 = mmd_h1[i, j]
        
        crit_val = this_h0.sort()[0][int(n_atoms*(0.95))]

        gp_ = n_grid_points[i]
        pt_ = n_paths[j]
        axes[i,j].set_title(f"l = {gp_}, n = {pt_}. Expected Type II error: {100*expected_type2_error(this_h1, crit_val):.2f}%", fontsize="small")
        axes[i,j].hist(sorted(this_h0), bins=n_bins, color="dodgerblue", alpha=0.5, label="$H_0$", density=True)
        axes[i,j].hist(sorted(this_h1), bins=n_bins, color="tomato"    , alpha=0.5, label="$H_1$", density=True)
        #plt.legend()
        make_grid(axis=axes[i,j])
        
plt.savefig("type_2_worked_paths_length.png", dpi=300)

## 3. Lead-lag plots for paper

In [None]:
def generate_ar3_path(n, phi1, phi2, phi3, sigma=1):
    """Generate an AR(3) process."""
    path = np.zeros(n)
    eps = np.random.normal(0, sigma, n)
    path[1:3] = eps[1:3]
    
    for t in range(3, n):
        path[t] = phi1 * path[t-1] + phi2 * path[t-2] + phi3 * path[t-3] + eps[t]
    return path

# Parameters for each scenario
n = 128
phi1, phi2, phi3 = 0.3, 0.3, 0.05

correlated_params      = (phi1, phi2, phi3)
uncorrelated_params    = (phi1, -phi2, 0.00)
anti_correlated_params = (-phi1, -phi2, -phi3)

# Generate paths
n_paths = 1024
correlated_rets      = np.array([generate_ar3_path(n, *correlated_params) for _ in range(n_paths)])/100
uncorrelated_rets    = np.array([generate_ar3_path(n, *uncorrelated_params) for _ in range(n_paths)])/100
anti_correlated_rets = np.array([generate_ar3_path(n, *anti_correlated_params) for _ in range(n_paths)])/100

In [None]:
# Turn into price processes
price_processes = False
if price_processes:
    correlated_paths = (1 + correlated_rets).cumprod(axis=1)
    uncorrelated_paths = (1 + uncorrelated_rets).cumprod(axis=1)
    anti_correlated_paths = (1 + anti_correlated_rets).cumprod(axis=1)
else:
    correlated_paths = correlated_rets
    uncorrelated_paths = uncorrelated_rets
    anti_correlated_paths = anti_correlated_rets

In [None]:
for p, q, z in zip(correlated_paths[:16], uncorrelated_paths[:16], anti_correlated_paths[:16]):
    plt.plot(p, alpha=0.25, color="dodgerblue")
    plt.plot(q, alpha=0.25, color="tomato")
    plt.plot(z, alpha=0.25, color="seagreen")

In [None]:
def lead_lag_transform(bank: torch.Tensor, **kwargs) -> torch.Tensor:
    n_paths, length, dim = bank.size()

    res_length = 2 * length - 1
    res_dim    = 2*dim

    res = torch.zeros((n_paths, res_length, res_dim))

    # Add lagged paths
    for i in 2*np.arange(dim):
        lagged_values   = torch.repeat_interleave(bank.clone(), repeats=2, dim=1)[..., int(i/2)]
        res[..., i]     = lagged_values[:, :-1]
        res[..., i + 1] = lagged_values[:, 1:]

    return res

In [None]:
ll_correlated      = lead_lag_transform(torch.tensor(correlated_paths).unsqueeze(-1))
ll_uncorrelated    = lead_lag_transform(torch.tensor(uncorrelated_paths).unsqueeze(-1))
ll_anti_correlated = lead_lag_transform(torch.tensor(anti_correlated_paths).unsqueeze(-1))

In [None]:
average_ll_correlated = ll_correlated.mean(axis=0)
average_ll_uncorrelated = ll_uncorrelated.mean(axis=0)
average_ll_anti_correlated = ll_anti_correlated.mean(axis=0)

In [None]:
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(9, 3))

ax1.plot(average_ll_correlated[..., 0], average_ll_correlated[..., 1], alpha=0.5, label="correlated", color="dodgerblue")
ax2.plot(average_ll_uncorrelated[..., 0], average_ll_uncorrelated[..., 1], alpha=0.5, label="uncorrelated", color="tomato")
ax3.plot(average_ll_anti_correlated[..., 0], average_ll_anti_correlated[..., 1], alpha=0.5, label="anti_correlated", color="seagreen")
plt.tight_layout()

In [None]:
s = iisignature.prepare (2, 2)
logsig_correl = iisignature.logsig(ll_correlated.numpy(), s)
logsig_uncorrel = iisignature.logsig(ll_uncorrelated.numpy(), s)
logsig_anticorrel = iisignature.logsig(ll_anti_correlated.numpy(), s)

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(6, 3))
ax.set_xlabel("$S^{[1, 2]}(\mathsf{x})$")
ax.hist(-0.5*logsig_correl[:, -1], bins=64, alpha=0.5, color="dodgerblue", density=True, label="correlated_returns")
ax.hist(-0.5*logsig_uncorrel[:, -1], bins=64, alpha=0.5, color="tomato", density=True, label="uncorrelated_returns")
ax.hist(-0.5*logsig_anticorrel[:, -1], bins=64, alpha=0.5, color="seagreen", density=True, label="anti_correlated_returns")
ax.legend(fontsize="small")
ax.set_title("Distribution of second-order log-signature terms", fontsize="medium")
plt.tight_layout()
plt.savefig("second_order_logdist.png", dpi=300)