In [1]:
from torch import optim
from models.VAE import VAE, VAE_train
from lib.mmd import SignatureKernel,mmd_loss
from lib.datasets import get_stock_price,sample_indices,train_test_split
from lib.aug import apply_augmentations,parse_augmentations
import torch
from torch import nn
import torch.nn.functional as F
from typing import List
from lib.utils import sample_indices


In [2]:
data_config = {
    "ticker" : "^GSPC",
    "interval" : "1d",
    "column" : 1,  
    "window_size" : 20,
    "dir" : "datasets",
    "subdir" : "stock"
}
sig_config = {
    "augmentations": [
        {"name": "AddTime"},
        {"name": "LeadLag"},
    ],
    "device" : "cuda:0",
    "depth" : 4,
}

In [3]:
tensor_data = get_stock_price(data_config)
x_real_train, x_real_test = train_test_split(tensor_data, train_test_ratio=0.8, device=sig_config["device"])
if sig_config["augmentations"] is not None:
    sig_config["augmentations"] = parse_augmentations(sig_config.get('augmentations'))
print("Before augmentation shape:",x_real_train.shape)
if sig_config["augmentations"] is not None:
    # Print the tensor shape after each augmentation
    x_aug_sig = apply_augmentations(x_real_train,sig_config["augmentations"])
    # Input dimension of encoder
    # We'll flat the tensor
    input_dim = x_aug_sig.shape[1]*x_aug_sig.shape[2]
print("After augmentation shape:",x_aug_sig.shape)
x_aug_sig = x_aug_sig.to(sig_config["device"])

Rolled data for training, shape torch.Size([1232, 20, 1])
Before augmentation shape: torch.Size([985, 20, 1])
torch.Size([985, 20, 2])
torch.Size([985, 39, 4])
After augmentation shape: torch.Size([985, 39, 4])


In [None]:
# This function has bug.
def compute_mmd(z: torch.Tensor , prior_z: torch.Tensor):
    """ Computes MMD between z and prior_z using RBF kernel. """
    def rbf_kernel(x, y, sigma=1.0):
        x_size = x.size(0)
        y_size = y.size(0)
        dim = x.size(1)

        xx = torch.matmul(x, x.t())  # Shape: (x_size, x_size)
        yy = torch.matmul(y, y.t())  # Shape: (y_size, y_size)
        xy = torch.matmul(x, y.t())  # Shape: (x_size, y_size)

        x_sq = xx.diag().unsqueeze(1).expand_as(xx)
        y_sq = yy.diag().unsqueeze(0).expand_as(yy)

        dist_xx = x_sq + x_sq.t() - 2 * xx
        dist_yy = y_sq + y_sq.t() - 2 * yy
        dist_xy = x_sq + y_sq - 2 * xy

        sigma_sq = sigma ** 2
        k_xx = torch.exp(-dist_xx / (2 * sigma_sq))
        k_yy = torch.exp(-dist_yy / (2 * sigma_sq))
        k_xy = torch.exp(-dist_xy / (2 * sigma_sq))
        print("k_xx shape {}".format(k_xx.shape))
        return k_xx, k_yy, k_xy

    k_xx, k_yy, k_xy = rbf_kernel(z, z), rbf_kernel(prior_z, prior_z), rbf_kernel(z, prior_z)

    mmd = k_xx.mean() + k_yy.mean() - 2 * k_xy.mean()
    return mmd


In [5]:
class InfoVAE(nn.Module):
    def __init__(self, x_aug_sig, epoch, batch_size, hidden_dims: List, device) -> None:
        super(InfoVAE, self).__init__()

        self.x_aug_sig = x_aug_sig
        print("Input tensor shape: {}".format(x_aug_sig.shape))
        self.epoch = epoch
        self.batch_size = batch_size
        self.device = device

        # Assume len(hidden_dims)=3.
        self.encoder_mu = nn.Sequential(
            nn.Linear(hidden_dims[0],hidden_dims[1]),
            nn.LeakyReLU(),
            nn.Linear(hidden_dims[1],hidden_dims[2]),
            nn.LeakyReLU(),
        )
        self.encoder_sigma = nn.Sequential(
            nn.Linear(hidden_dims[0],hidden_dims[1]),
            nn.Tanh(),
            nn.Linear(hidden_dims[1],hidden_dims[2]),
            nn.LeakyReLU(),
        )
        self.decoder = nn.Sequential(
            nn.Linear(hidden_dims[2],hidden_dims[1]),
            nn.LeakyReLU(),
            nn.Linear(hidden_dims[1],hidden_dims[0]),
            nn.LeakyReLU(),
        )

        # To device
        self.encoder_mu.to(device)
        self.encoder_sigma.to(device)
        self.decoder.to(device)
    
    def encode(self, x):
        x_flatten = x.view(x.shape[0],-1)
        mean = self.encoder_mu(x_flatten)
        log_var = self.encoder_sigma(x_flatten)
        # Clipping
        log_var = torch.clamp(log_var, min=-10, max=10)
        noise = torch.randn(x.shape[0],mean.shape[1]).to(self.device)
        z = mean + torch.exp(0.5*log_var).mul(noise)
        return mean, log_var, z
        
    def decode(self,z):
        reconstructed_data = self.decoder(z)
        return reconstructed_data

    def loss(self,mean, log_var, sample_data, reconstructed_data, lambda_mmd=10):
        """ Compute InfoVAE loss with MMD regularization. """
        # Reconstruction loss
        recon_loss = F.mse_loss(reconstructed_data, sample_data, reduction='sum')

        # Sample prior (Standard Normal)
        batch_size, z_dim = mean.shape
        prior_z = torch.randn(batch_size, z_dim).to(mean.device)

        # Compute KL divergence
        kl_div = 0.5 * torch.sum(mean.pow(2) + log_var.exp() - 1 - log_var)

        # Compute MMD
        mmd = compute_mmd(mean, prior_z)

        # InfoVAE Loss
        loss = recon_loss + lambda_mmd * mmd + (lambda_mmd - 1) * kl_div
        return loss

    
    def generate(self,x: torch.Tensor):
        _, _, z = self.encode(x)
        reconstructed_data = self.decode(z)
        return reconstructed_data

In [6]:
def train(model,optimizer):
    early_stop = 500
    cnt = 0
    min_loss = float('inf')
    for i in range(model.epoch):
        # Sample time indices of size equal to the batch size
        # From sefl.x_aug_sig
        time_indics = sample_indices(model.x_aug_sig.shape[0],model.batch_size,"cuda")
        sample_data = model.x_aug_sig[time_indics]
        # print("sample_data shape {}".format(sample_data.shape))
        # Encode 
        mean, log_var, z = model.encode(sample_data)
        # Decode
        reconstructed_data = model.decode(z)
        # print("reconstructed_data shape {},".format(reconstructed_data.shape))
        # Calculate loss
        loss = model.loss(mean,log_var,sample_data.view(sample_data.shape[0],-1),reconstructed_data)
        # Backpropogation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Print loss
        if i%500==0:
            print("Epoch {} loss {}".format(i,loss.item()))
        # Early stop
        if loss.item()<min_loss:
            min_loss = loss.item()
            cnt = 0
        else:
            cnt += 1
            if cnt>early_stop:
                break

In [7]:
lr = 1e-4
batch_size = 200
epoch = 20001
hidden_dims = [input_dim,12,3]
InfoVAE = InfoVAE(x_aug_sig=x_aug_sig,epoch=epoch,batch_size=batch_size,hidden_dims=hidden_dims,device='cuda')
print(InfoVAE)
optimizer = torch.optim.Adam(InfoVAE.parameters(),lr=lr)

Input tensor shape: torch.Size([985, 39, 4])
InfoVAE(
  (encoder_mu): Sequential(
    (0): Linear(in_features=156, out_features=12, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Linear(in_features=12, out_features=3, bias=True)
    (3): LeakyReLU(negative_slope=0.01)
  )
  (encoder_sigma): Sequential(
    (0): Linear(in_features=156, out_features=12, bias=True)
    (1): Tanh()
    (2): Linear(in_features=12, out_features=3, bias=True)
    (3): LeakyReLU(negative_slope=0.01)
  )
  (decoder): Sequential(
    (0): Linear(in_features=3, out_features=12, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Linear(in_features=12, out_features=156, bias=True)
    (3): LeakyReLU(negative_slope=0.01)
  )
)


In [8]:
train(InfoVAE,optimizer=optimizer)

k_xx shape torch.Size([200, 200])


AttributeError: 'tuple' object has no attribute 'shape'