In [1]:
import os
os.chdir("/home/martin/s3ts")

In [2]:
from storage.har_datasets import STSDataset, StreamingTimeSeries, StreamingTimeSeriesCopy

In [3]:
from s3ts.api.ucr import load_ucr_classification
from s3ts.api.ts2sts import finite_random_STS
import numpy as np
import torch

X, Y, mapping = load_ucr_classification("BasicMotions")
#X, Y, mapping = load_ucr_classification("GunPoint")
print(X.shape, Y.shape, len(np.unique(Y)))

STS, SCS = finite_random_STS(X, Y, length=60)
print(STS.shape, SCS.shape)

Loading 'BasicMotions' from cache...
(80, 6, 100) (80,) 4
(6, 6000) (6000,)


In [4]:
ds = StreamingTimeSeries(STS, SCS, wsize=32, wstride=1)

In [5]:
from pytorch_lightning import LightningDataModule
from torch.utils.data import DataLoader

class LSTSDataset(LightningDataModule):

    """ Data module for the experiments. """

    STS: np.ndarray     # data stream
    SCS: np.ndarray     # class stream
    DM: np.ndarray      # dissimilarity matrix

    data_split: dict[str: np.ndarray]    
                        # train / val / test split
    batch_size: int     # dataloader batch size

    def __init__(self,
            stsds: StreamingTimeSeries,    
            data_split: dict, batch_size: int, 
            random_seed: int = 42, 
            num_workers: int = 1
            ) -> None:

        # save parameters as attributes
        super().__init__()
        
        self.batch_size = batch_size
        self.random_seed = random_seed
        self.num_workers = num_workers

        self.stsds = stsds
        self.wdw_len = self.stsds.wsize
        self.wdw_str = self.stsds.wstride
        self.sts_str = False

        # gather dataset info   
        self.n_dims = self.stsds.STS.shape[1]
        self.n_classes = len(np.unique(self.stsds.SCS))

        # convert to tensors
        if not torch.is_tensor(self.stsds.STS):
            self.stsds.STS = torch.from_numpy(self.stsds.STS).to(torch.float32)
        if not torch.is_tensor(self.stsds.SCS):
            self.stsds.SCS = torch.from_numpy(self.stsds.SCS).to(torch.int64)

        train_indices = self.stsds.indices[data_split["train"](self.stsds.indices)]
        test_indices = self.stsds.indices[data_split["test"](self.stsds.indices)]
        val_indices = self.stsds.indices[data_split["val"](self.stsds.indices)]

        self.ds_train = StreamingTimeSeriesCopy(self.stsds, train_indices)
        self.ds_test = StreamingTimeSeriesCopy(self.stsds, test_indices)
        self.ds_val = StreamingTimeSeriesCopy(self.stsds, val_indices)
        
    def train_dataloader(self) -> DataLoader:
        """ Returns the training DataLoader. """
        return DataLoader(self.ds_train, batch_size=self.batch_size, 
            num_workers=self.num_workers, shuffle=True,
            pin_memory=True, persistent_workers=True)

    def val_dataloader(self) -> DataLoader:
        """ Returns the validation DataLoader. """
        return DataLoader(self.ds_val, batch_size=self.batch_size, 
            num_workers=self.num_workers, shuffle=False,
            pin_memory=True, persistent_workers=True)

    def test_dataloader(self) -> DataLoader:
        """ Returns the test DataLoader. """
        return DataLoader(self.ds_test, batch_size=self.batch_size, 
            num_workers=self.num_workers, shuffle=False,
            pin_memory=True, persistent_workers=True)
    
    def predict_dataloader(self) -> DataLoader:
        """ Returns the test DataLoader. """
        return DataLoader(self.ds_test, batch_size=self.batch_size, 
            num_workers=self.num_workers, shuffle=False,
            pin_memory=True, persistent_workers=True)

In [7]:
indices_shuffled = np.arange(ds.indices.shape[0])[:5000]
np.random.shuffle(indices_shuffled)

data_split = {
    "train": lambda x: np.isin(x, indices_shuffled),
    "val": lambda x: np.isin(x, np.arange(ds.indices.shape[0])[5064:5500]),
    "test": lambda x: np.isin(x, np.arange(ds.indices.shape[0])[5064:]),
}

dm = LSTSDataset(ds, data_split=data_split, batch_size=32, random_seed=42, num_workers=16)

In [8]:
from typing import List

In [9]:
@torch.jit.script
def dtw_compute(dist_tensor: torch.Tensor, grad_tensor: torch.Tensor, w: float) -> None:
    for i in range(1, dist_tensor.shape[2]):
        for j in range(1, dist_tensor.shape[3]):
            # elements has shape (n, k, 3)
            elements = torch.stack([w * dist_tensor[:, :, i, j-1], dist_tensor[:, :, i-1, j], w * dist_tensor[:, :, i-1, j-1]], dim=2)

            value, id = torch.min(elements, dim=2) # shape (n, k)

            dist_tensor[:,:, i, j] += value

            grad_tensor[id==0][:, :, i, j] += w * grad_tensor[id==0][:, :, i, j-1]

@torch.jit.script
def dtw_compute_by_index(dist_tensor: torch.Tensor, grad_tensor: torch.Tensor, w: float, n: int, s: int) -> None:
    for i in range(1, dist_tensor.shape[2]):
        for j in range(1, dist_tensor.shape[3]):
            elements = torch.stack([w * dist_tensor[n, s, i, j-1], dist_tensor[n, s, i-1, j], w * dist_tensor[n, s, i-1, j-1]], dim=0)

            value, id = torch.min(elements, dim=0)

            dist_tensor[n, s, i, j] += value

            grad_tensor[id==0][n, s, i, j] += w * grad_tensor[id==0][n, s, i, j-1]

@torch.jit.script
def torch_dtw_fast(x: torch.Tensor, y: torch.Tensor, w: float, eps: float = 1e-5):
    # shape of x (n, dim, x_len) y (m, dim, y_len)    
    # performs convolution-like operation, for each kernel the DF
    # (of shape (kernel_size, T)) is computed, then summed across channels
    # x has shape (batch, c, time_dimension)

    # compute pairwise diffs (squared)
    p_diff = x[:,None,:,None,:] - y[None,:,:,:,None] # shape (n, n_kernel, d, Kernel_size, T)
    euc_d = torch.square(p_diff).sum(2) # shape (n, n_kernel, kernel_size, T)

    # compute dtw
    DTW = euc_d.clone()
    DTW[:,:,0,:] = torch.cumsum(DTW[:,:,0,:], dim=2)
    DTW[:,:,:,0] = torch.cumsum(DTW[:,:,:,0], dim=2)

    # p_diff contains the partial derivatives of DTW[n, k, i, j] wrt K[k, d, i] (dims (n, k, d, i, j))
    p_diff = p_diff / torch.sqrt(euc_d[:,:, None, :, :] + eps)


    dtw_compute(DTW, p_diff, w)
    # futures : List[torch.jit.Future[None]] = []
    # for n in range(DTW.shape[0]):
    #     for s in range(DTW.shape[1]):
    #         futures.append(torch.jit.fork(dtw_compute_by_index, DTW, p_diff, w, n, s))

    # for future in futures:
    #     torch.jit.wait(future)

    return DTW.sqrt(), p_diff

class torch_dtw(torch.autograd.Function):

    @staticmethod
    def forward(x, y, w):
        DTW, p_diff = torch_dtw_fast(x, y, w)
        return DTW, p_diff
    
    @staticmethod
    def setup_context(ctx, inputs, output):
        DTW, p_diff = output
        ctx.save_for_backward(p_diff)
    
    @staticmethod
    def backward(ctx, dtw_grad, p_diff_grad):
        p_diff, = ctx.saved_tensors
        mult = (p_diff * dtw_grad[:,:,None,:,:])
        return mult.mean(dim=(1, 3)), mult.mean(dim=(0, 4)), None

In [10]:
class DTWLayer(torch.nn.Module):
    def __init__(self, n_patts, d_patts, l_patts, l_out: int = None, rho: float = 1) -> None:
        super().__init__()

        if not l_out is None:
            self.l_out = l_patts
        else:
            self.l_out = l_patts

        self.w: torch.float32 = rho ** (1/l_patts)
        self.patts = torch.nn.Parameter(torch.randn(n_patts, d_patts, l_patts))
    
    def forward(self, x):
        return torch_dtw.apply(x, self.patts, self.w)[0][:,:,:,self.l_out:]

In [11]:
model = torch.nn.Sequential(
    DTWLayer(8, 6, 16, l_out=16, rho=0.01),
    torch.nn.Conv2d(in_channels=8, out_channels=32, kernel_size=5),
    torch.nn.ReLU(),
    torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(kernel_size=2),
    torch.nn.Conv2d(in_channels=64, out_channels=128, kernel_size=4),
    torch.nn.Flatten(),
    torch.nn.Linear(in_features=128, out_features=4)
)

In [12]:
from pytorch_lightning import Trainer, LightningModule
import torchmetrics as tm
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn import functional as F
from pytorch_lightning.loggers import TensorBoardLogger

In [13]:
class lmodel(LightningModule):

    def __init__(self, model: torch.nn.Module, num_classes: int, lr: float = 0.001) -> None:

        name = f"Test model for DTW-layer"

        self.n_classes = num_classes
        # save parameters as attributes
        super().__init__()

        # select model architecture class
        self.model = model

        self.softmax = torch.nn.Softmax()

        self.lr = lr
        # create metrics
        for phase in ["train", "val", "test"]: 
            self.__setattr__(f"{phase}_acc", tm.Accuracy(num_classes=num_classes, task="multiclass"))
            self.__setattr__(f"{phase}_f1",  tm.F1Score(num_classes=num_classes, task="multiclass"))
            if phase != "train":
                self.__setattr__(f"{phase}_auroc", tm.AUROC(num_classes=num_classes, task="multiclass"))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """ Forward pass. """
        x = self.model(x)
        x = self.softmax(x)

        return x

    def _inner_step(self, batch: dict[str: torch.Tensor], stage: str = None):

        """ Inner step for the training, validation and testing. """
        output = self.model(batch["series"])
        loss = F.cross_entropy(output, batch["label"])

        # compute metrics
        acc = self.__getattr__(f"{stage}_acc")(output, batch["label"])
        f1  = self.__getattr__(f"{stage}_f1")(output, batch["label"])
        if stage != "train":
            auroc = self.__getattr__(f"{stage}_auroc")(output, batch["label"])  

        # log loss and metrics
        on_step = True if stage == "train" else False
        self.log(f"{stage}_loss", loss, on_epoch=True, on_step=on_step, prog_bar=True, logger=True)

        self.log(f"{stage}_acc", acc, on_epoch=True, on_step=False, prog_bar=True, logger=True)
        self.log(f"{stage}_f1", f1, on_epoch=True, on_step=False, prog_bar=False, logger=True)
        if stage != "train":
            self.log(f"{stage}_auroc", auroc, on_epoch=True, on_step=False, prog_bar=True, logger=True)

        # return loss
        return loss

    def training_step(self, batch: dict[str: torch.Tensor], batch_idx: int):
        """ Training step. """
        return self._inner_step(batch, stage="train")
        
    def validation_step(self, batch: dict[str: torch.Tensor], batch_idx: int):
        """ Validation step. """
        return self._inner_step(batch, stage="val")

    def configure_optimizers(self):
        """ Configure the optimizers. """
        mode = "max"
        monitor = "val_acc"
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": ReduceLROnPlateau(optimizer, 
                    mode=mode, factor=np.sqrt(0.1), patience=5, min_lr=0.5e-7),
                "interval": "epoch",
                "monitor": monitor,
                "frequency": 10
                # If "monitor" references validation metrics, then "frequency" should be set to a
                # multiple of "trainer.check_val_every_n_epoch".
            },
        }

In [14]:
trainer = Trainer(max_epochs=50, accelerator="auto")

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [15]:
model_lightning = lmodel(model, 4, lr=0.0001)

In [16]:
list(model_lightning.model.parameters())[0].min()

tensor(-2.6968, grad_fn=<MinBackward1>)

In [17]:
trainer.fit(model=model_lightning, datamodule=dm)

You are using a CUDA device ('NVIDIA GeForce RTX 3060') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name       | Type               | Params
--------------------------------------------------
0 | model      | Sequential         | 190 K 
1 | softmax    | Softmax            | 0     
2 | train_acc  | MulticlassAccuracy | 0     
3 | train_f1   | MulticlassF1Score  | 0     
4 | val_acc    | MulticlassAccuracy | 0     
5 | val_f1     | MulticlassF1Score  | 0     
6 | val_auroc  | MulticlassAUROC    | 0     
7 | test_acc   | MulticlassAccuracy | 0     
8 | test_f1    | MulticlassF1Score  | 0     
9 | test_auroc | MulticlassAUROC    | 0     
--------------------------------------------------
190 K     Trainable params
0         Non-trainable params
190 K     Total params
0.761     Total estimated model params size (MB)


Epoch 6:   4%|▍         | 7/156 [00:00<00:05, 29.13it/s, v_num=3, train_loss_step=0.618, val_loss=0.655, val_acc=0.828, val_auroc=0.134, train_loss_epoch=0.611, train_acc=0.827]  