In [1]:
from collections import defaultdict
from typing import Tuple

import torch
from sklearn.linear_model import LinearRegression
from torch import nn
import signatory

from lib.augmentations import augment_path_and_compute_signatures,AddTime,LeadLag
from lib.datasets import get_stock_price
from lib.utils import sample_indices, to_numpy, set_seed, regression_on_linear_functional, predict
from lib.test_metrics import get_standard_test_metrics
from lib.network.arfnn import ArFNN

In [2]:
sigcwgan = {
    "device" : "cuda:0",
    "epoch" : 1000,
    "batch_size" : 200,
    "window_size_past" : 3,
    "window_size_future" : 3,
}
data_config = {
    "ticker" : "^GSPC",
    "interval" : "1d",
    "column" : 0,  
    "window_size" : 30,
    "dir" : "datasets",
    "subdir" : "stock"
}
sig_config = {
    "depth" : 2,
    "augmentations" : tuple([LeadLag()]),
}
arfnn_config = {
    "hidden_dims" : 3 * (50,),
}

set_seed(2024)

In [3]:
x_real_rolled = get_stock_price(data_config)
x_real_rolled = x_real_rolled.to(sigcwgan['device'])
print('Total data: ', list(x_real_rolled.shape))

Rolled data for training, shape torch.Size([1480, 30, 1])
Total data:  [1480, 30, 1]


In [4]:
test_metrics_test = get_standard_test_metrics(x_real_rolled)

print(test_metrics_test)

[SigW1Loss()]


In [5]:
def sigcwgan_loss(sig_pred: torch.Tensor, sig_fake_conditional_expectation: torch.Tensor):
    return torch.norm(sig_pred - sig_fake_conditional_expectation, p=2, dim=1).mean()

def calibrate_sigw1_metric(config, x_future, x_past):
    sigs_past = config.compute_sig_past(x_past)
    sigs_future = config.compute_sig_future(x_future)
    assert sigs_past.size(0) == sigs_future.size(0)
    X, Y = to_numpy(sigs_past), to_numpy(sigs_future)
    lm = LinearRegression()
    lm.fit(X, Y)
    sigs_pred = torch.from_numpy(lm.predict(X)).float().to(x_future.device)
    return sigs_pred

In [6]:
class SigCWGAN(nn.Module):
    def __init__(self, G, x_real: torch.Tensor, test_metrics, sigconfig: dict, **kargs):
        super(SigCWGAN, self).__init__()
        self.G = G
        self.p, self.q = kargs["window_size_past"], kargs["window_size_future"]
        self.epoch = kargs['epoch']
        self.device = kargs['device']

        self.x_real = x_real
        self.dim = self.latent_dim = x_real.shape[-1]

        self.losses_history = defaultdict(list)
        self.test_metrics = test_metrics
        self.sig_config = sigconfig
        self.batch_size = kargs["batch_size"]

        self.x_past = x_real[:, :self.p]
        x_future = x_real[:, self.p:]

        linear_functional = regression_on_linear_functional(x_future=x_future,x_past=self.x_past,sig_config=self.sig_config)
        sig_past = augment_path_and_compute_signatures(self.x_past,self.sig_config["depth"])
        
        self.sigs_pred = predict(linear_functional=linear_functional,sig_past=sig_past)

        self.G_optimizer = torch.optim.Adam(self.G.parameters(), lr=1e-2)
        self.G_scheduler = torch.optim.lr_scheduler.StepLR(self.G_optimizer, step_size=100, gamma=0.9)

    def sample_batch(self, ):
        random_indices = sample_indices(self.sigs_pred.shape[0], self.batch_size)  # sample indices
        # sample the least squares signature and the log-rtn condition
        sigs_pred = self.sigs_pred[random_indices].clone().to(self.device)
        x_past = self.x_past[random_indices].clone().to(self.device)
        return sigs_pred, x_past

    def step(self):
        self.G.train()
        self.G_optimizer.zero_grad()  # empty 'cache' of gradients
        sigs_pred, x_past = self.sample_batch()

        # Compute S(x_fake)
        # sigs_fake_ce, x_fake = sample_sig_fake(self.G, self.q, self.sig_config, x_past)
        x_past = x_past.repeat(self.sig_config["batch_size"], 1, 1).requires_grad_()
        x_fake = self.G.sample(self.q, x_past)
        sigs_fake_future = augment_path_and_compute_signatures(x_fake,self.sig_config["augmentations"])
        sigs_fake_ce = sigs_fake_future.reshape(self.sig_config["batch_size"], x_past.size(0), -1).mean(0)
        
        loss = sigcwgan_loss(sigs_pred, sigs_fake_ce)
        loss.backward()
        total_norm = torch.nn.utils.clip_grad_norm_(self.G.parameters(), 10)
        self.losses_history['loss'].append(loss.item())
        self.losses_history['total_norm'].append(total_norm)
        self.G_optimizer.step()
        self.G_scheduler.step()  # decaying learning rate slowly.
        self.evaluate(x_fake)

    def evaluate(self, x_fake):
        for test_metric in self.test_metrics:
            with torch.no_grad():
                test_metric(x_fake[:100])
            self.losses_history[test_metric.name].append(
                to_numpy(self.test_metrics.loss_componentwise)
            )

In [7]:
x_real_dim = x_real_rolled.shape[-1]
input_dim = sigcwgan["window_size_past"]*x_real_dim
arfnn_config.update(input_dim=input_dim)
arfnn_config.update(output_dim=x_real_dim)
arfnn_config.update(latent_dim=x_real_dim)
print(arfnn_config)

{'hidden_dims': (50, 50, 50), 'input_dim': 3, 'output_dim': 1, 'latent_dim': 1}


In [8]:
AR_FNN = ArFNN(input_dim=arfnn_config["input_dim"],output_dim=arfnn_config["output_dim"],hidden_dims=arfnn_config["hidden_dims"],latent_dim=arfnn_config["latent_dim"])

In [9]:
sigcwgan = SigCWGAN(G=AR_FNN,x_real=x_real_rolled,test_metrics=test_metrics_test,sigconfig=sig_config,**sigcwgan)

TypeError: 'int' object is not subscriptable