In [None]:
import torch
import math

### squeeze vs unsqueeze

In [None]:
a = torch.tensor(3)
a

tensor(3)

In [None]:
a.size()

torch.Size([])

In [None]:
a.unsqueeze(-1).unsqueeze(1)

torch.Size([1, 1])

In [None]:
torch.randn([3,2,10]).unsqueeze(-1).size()

torch.Size([3, 2, 10, 1])

# Brownian Motion

In [None]:
def generate_brownian(n_paths, n_steps):

    dW = torch.randn(n_paths, n_steps)
    dW[:, 0] = 0.0
    W = dW.cumsum(dim=-1)

    return W


In [None]:
generate_brownian(2,5)

tensor([[0.0000, 0.4881, 1.2043, 0.7738, 0.5763],
        [0.0000, 0.4551, 1.8405, 3.0901, 4.2672]])

In [None]:
def generate_geometric_brownian(n_paths=2, n_steps=5, sigma=0.2, dt=1/250):

    t = torch.arange(n_steps) * dt
    W = generate_brownian(n_paths, n_steps)

    return torch.exp((-0.5 * sigma**2) * t + sigma *  torch.sqrt(torch.tensor(dt)) * W)

In [None]:
generate_geometric_brownian()

tensor([[1.0000, 1.0009, 1.0047, 1.0251, 1.0306],
        [1.0000, 1.0083, 1.0206, 1.0143, 1.0306]])

In [None]:
temp = generate_geometric_brownian()
temp.size()

torch.Size([2, 5])

In [None]:
temp.unsqueeze(1).size()

torch.Size([2, 1, 5])

# Derivative payoff

In [None]:
def european_payoff(input, call=True, strike=1.0):

    if call:
        return torch.nn.functional.relu(input[..., -1] - strike)
    else:
        return torch.nn.functional.relu(strike - input[..., -1])

In [None]:
spot = generate_geometric_brownian()
spot

tensor([[1.0000, 1.0037, 1.0057, 1.0038, 0.9949],
        [1.0000, 1.0001, 0.9949, 1.0002, 1.0075]])

In [None]:
spot = spot.unsqueeze(1)
spot.size()

torch.Size([2, 1, 5])

In [None]:
payoff = european_payoff(spot)
payoff

tensor([0.0000, 0.0088])

# profit and loss function (pl)

In [None]:
def pl(spot, unit, cost=None, payoff=None):

    output = unit[..., :-1].mul(spot.diff(dim=-1)).sum(dim=(-2,-1))

    if payoff is not None:
        pass

    if cost is not None:
        pass

    return output

In [None]:
spot.diff(dim=-1)

tensor([[[ 3.6993e-03,  2.0161e-03, -1.9387e-03, -8.8783e-03]],

        [[ 9.1910e-05, -5.2137e-03,  5.2958e-03,  7.3167e-03]]])

In [None]:
unit = torch.randn_like(spot)
unit

tensor([[[ 1.0331, -1.3795, -0.2519, -0.9670,  1.2934]],

        [[-0.7395, -1.2538,  0.0543, -0.1659, -2.4832]]])

In [None]:
pl(spot, unit)

tensor([0.0101, 0.0055])

In [None]:
def pl(spot, unit, cost=None, payoff=None):

    output = unit[..., :-1].mul(spot.diff(dim=-1)).sum(dim=(-2,-1))

    if payoff is not None:
        output -= payoff.squeeze(-1)

    if cost is not None:
        pass

    return output

In [None]:
payoff = european_payoff(spot)
payoff.size()

torch.Size([2, 1])

In [None]:
pl(spot, unit, payoff=payoff)

tensor([ 0.0101, -0.0019])

In [None]:
def pl(spot, unit, cost=None, payoff=None):

    output = unit[..., :-1].mul(spot.diff(dim=-1)).sum(dim=(-2,-1))

    if payoff is not None:
        output -= payoff.squeeze(-1)

    if cost is not None:
        c = torch.tensor(cost).unsqueeze(0).unsqueeze(-1)
        output -= (spot[..., :-1].mul(unit.diff(dim=-1)).abs() * c).sum(dim=(-2, -1))

    return output

In [None]:
torch.tensor(0.1).unsqueeze(0).unsqueeze(-1)

tensor([[0.1000]])

In [None]:
unit = torch.randn_like(spot)
unit

tensor([[[-0.0996, -1.5431, -0.2767,  0.7152,  0.4910]],

        [[-0.3751, -1.8371,  0.6746, -1.8968, -0.8270]]])

In [None]:
pl(spot, unit, cost=0.1)

tensor([0.3844, 0.7594])

In [None]:
pl(spot, unit, cost= 0.1, payoff=payoff)

tensor([0.3844, 0.7520])

# Features

In [None]:
def time_to_maturity(spot, dt):

    n_paths, _, n_steps = spot.size()
    t = torch.arange(n_steps) * dt

    return (t[-1] - t).unsqueeze(0).expand(n_paths, 1, -1)

In [None]:
t = torch.arange(5) * 0.01
t

tensor([0.0000, 0.0100, 0.0200, 0.0300, 0.0400])

In [None]:
(t[-1] - t).unsqueeze(0).expand(2, 1, -1)

tensor([[[0.0400, 0.0300, 0.0200, 0.0100, 0.0000]],

        [[0.0400, 0.0300, 0.0200, 0.0100, 0.0000]]])

In [None]:
time_to_maturity(spot, 0.004)

tensor([[[0.0160, 0.0120, 0.0080, 0.0040, 0.0000]],

        [[0.0160, 0.0120, 0.0080, 0.0040, 0.0000]]])

In [None]:
def moneyness(spot, strike):
    return spot/strike

In [None]:
moneyness(spot, 1.1).size()

torch.Size([2, 1, 5])

In [None]:
def log_moneyness(spot, strike):
    return torch.log(spot/strike)

In [None]:
log_moneyness(spot, 1.0)

tensor([[[ 0.0000e+00,  3.6925e-03,  5.6991e-03,  3.7696e-03, -5.1147e-03]],

        [[ 0.0000e+00,  9.1906e-05, -5.1350e-03,  1.7391e-04,  7.4627e-03]]])

In [None]:
def volatility(spot, vol):
    return torch.ones_like(spot) * vol

In [None]:
volatility(spot, 0.2)

tensor([[[0.2000, 0.2000, 0.2000, 0.2000, 0.2000]],

        [[0.2000, 0.2000, 0.2000, 0.2000, 0.2000]]])

# MLP

In [None]:
from torch import nn

class MLP(nn.Module):

    def __init__(self, n_inputs):
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(n_inputs, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512,1)
        )

    def forward(self, x):

        out = self.model(x)

        return out

In [None]:
m = MLP(3)

In [None]:
m

MLP(
  (model): Sequential(
    (0): Linear(in_features=3, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=512, bias=True)
    (5): ReLU()
    (6): Linear(in_features=512, out_features=1, bias=True)
  )
)

In [None]:
x = torch.randn(3)
m(x)

hello


tensor([-0.0157], grad_fn=<ViewBackward0>)

In [None]:
m.__call__(x)

hello


tensor([-0.0157], grad_fn=<ViewBackward0>)

# Dataset

### Practice

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

In [None]:
# x_train = torch.tensor(x_train.reshape(60000, 784)/255, dtype=torch.float32)
# x_test = torch.tensor(x_test.reshape(10000, 784)/255, dtype=torch.float32)

# y_train = torch.tensor(y_train, dtype=torch.long)
# y_test = torch.tensor(y_test, dtype=torch.int64)

In [None]:
x_train = torch.tensor(x_train/255, dtype=torch.float32)
x_test = torch.tensor(x_test/255, dtype=torch.float32)

y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.int64)

In [None]:
x_train.shape

torch.Size([60000, 28, 28])

In [None]:
y_train

tensor([5, 0, 4,  ..., 5, 6, 8])

In [None]:
y_train.unsqueeze(-1)

tensor([[5],
        [0],
        [4],
        ...,
        [5],
        [6],
        [8]])

In [None]:
class TestDataset(Dataset):

    def __init__(self, data):
        self.data_x = data[0]
        self.data_y = data[1]

    def __len__(self):
        return self.data_x.size(0)

    def __getitem__(self, index):
        return self.data_x[index], self.data_y[index]



In [None]:
ds = TestDataset((x_test, y_test))

In [None]:
len(ds)

10000

In [None]:
for i, j in ds:
    print(i.size(), j.size())

In [None]:
ds.__len__()

2

### Our case

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
spot.size()

torch.Size([2, 1, 5])

In [None]:
class MyDataset(Dataset):
    def __init__(self, data):
        self.data = torch.cat(data, dim=1)

    def __len__(self):
        return self.data.size(2)

    def __getitem__(self, index):
        return self.data[:, :, index].unsqueeze(1)

lm = moneyness(spot, 1.1)
t = time_to_maturity(spot, 0.004)
v = volatility(spot, 0.2)

ds = MyDataset([lm, t, v])

In [None]:
for i in ds:
    print(i.size())

torch.Size([2, 1, 3])
torch.Size([2, 1, 3])
torch.Size([2, 1, 3])
torch.Size([2, 1, 3])
torch.Size([2, 1, 3])


# compute_hedge

In [None]:
def compute_hedge(model, ds):
    outputs = []
    for i in ds:
        outputs.append(model(i))

    return torch.cat(outputs, dim=-1)

In [None]:
compute_hedge(m, ds).size()

torch.Size([2, 1, 5])

# compute_portfolio

In [None]:
def compute_portfolio(model, ds, payoff):

    unit = compute_hedge(model, ds)

    return pl(spot, unit)

In [None]:
compute_portfolio(m, ds, None)

tensor([-0.0001,  0.0002], grad_fn=<SumBackward1>)

In [None]:
def compute_portfolio_2(model, ds, payoff):

    unit = compute_hedge(model, ds)

    return pl(spot, unit, payoff=payoff)

In [None]:
compute_portfolio_2(m, ds, european_payoff(spot))

tensor([-0.0001, -0.0073], grad_fn=<SubBackward0>)

# Optimizer and Training

In [None]:
optimizer = torch.optim.Adam(m.parameters())



In [None]:
for i in range(10):
    optimizer.zero_grad()
    cash = compute_portfolio_2(m, ds, european_payoff(spot))
    loss = torch.mean(cash*cash)
    loss.backward()
    optimizer.step()

    print(loss)

tensor(2.6860e-05, grad_fn=<MeanBackward0>)
tensor(1.4329e-05, grad_fn=<MeanBackward0>)
tensor(9.2919e-06, grad_fn=<MeanBackward0>)
tensor(1.0486e-05, grad_fn=<MeanBackward0>)
tensor(1.2098e-05, grad_fn=<MeanBackward0>)
tensor(1.0574e-05, grad_fn=<MeanBackward0>)
tensor(9.1297e-06, grad_fn=<MeanBackward0>)
tensor(9.0523e-06, grad_fn=<MeanBackward0>)
tensor(9.6647e-06, grad_fn=<MeanBackward0>)
tensor(1.0037e-05, grad_fn=<MeanBackward0>)


# Loss functions

In [None]:
x = -torch.arange(4.0)
x

In [None]:
torch.logsumexp(-x, 0) - math.log(x.size(0))

In [None]:
def entropic_risk_measure(x):
    return torch.logsumexp(-x, 0) - math.log(x.size(0))

In [None]:
x = -torch.arange(10.0)
x

In [None]:
torch.topk(x, 6, largest=False)

In [None]:
torch.topk(x, 6, largest=False).values.mean()

In [None]:
torch.topk(x, math.ceil(0.6 * x.numel()), largest=False).values.mean()

# Training with entropic risk measure

In [None]:
for i in range(10):
    optimizer.zero_grad()
    cash = compute_portfolio_2(m, ds, european_payoff(spot))
    loss = entropic_risk_measure(cash)
    loss.backward()
    optimizer.step()

    print(loss)

# prev_hedge

In [None]:
from torch import nn

class MLP(nn.Module):

    def __init__(self, n_inputs):
        super().__init__()

        self.model = nn.Sequential(
            nn.Linear(n_inputs, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512,1)
        )

    def forward(self, x):

        out = self.model(x)

        return out

In [None]:
class NewModel(nn.Module):

    def __init__(self, model):
        super().__init__()
        self.model = model
        self.register_buffer("prev_hegde", None)

    def forward(self, x):
        if self.prev_hegde is None:
            self.register_buffer("prev_hedge", torch.zeros(x.size(0), x.size(1), 1))

        new_x = torch.cat([x, self.prev_hedge], dim=-1)
        out = self.model(new_x)
        self.prev_hedge = out.detach()

        return out

In [None]:
mm = NewModel(MLP(4))
mm

In [None]:
for i in range(10):
    optimizer.zero_grad()
    cash = compute_portfolio_2(mm, ds, european_payoff(spot))
    loss = entropic_risk_measure(cash)
    loss.backward()
    optimizer.step()

    print(loss)