In [None]:
!wget http://timeseriesclassification.com/Downloads/EigenWorms.zip
!unzip -f EigenWorms.zip

In [None]:
from isstorch import compute
from utils import compute_signatures

from sktime.utils.data_io import load_from_tsfile_to_dataframe
from sktime.datatypes._panel._convert import from_nested_to_3d_numpy

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset

from tqdm import trange

In [None]:
class DenseNet(nn.Module):
    def __init__(self, in_features, width=10, hidden_layers=10):
        super().__init__()
        self.batchnorm = nn.BatchNorm1d(in_features)
        layers = [
            nn.Sequential(
                nn.Linear(in_features, width),
                nn.ReLU()
            )
        ]
        for _ in range(hidden_layers-1):
            layers.append(
                nn.Sequential(
                    nn.Linear(width, width),
                    nn.ReLU()
                )
            )
        
        layers.append(
            nn.Sequential(
                nn.Linear(width, 5),
                nn.Tanh()
            )
        )
        
        self.stack = nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.batchnorm(x)
        return self.stack(x)
    
def train_loop(model, optimizer, loss_fn, dataloader, device):
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
def test_loop(model, loss_fn, dataloader, device):
    loss, correct = 0.0, 0
    sample_size = len(dataloader.dataset)
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X)
        
        loss += loss_fn(pred, y)
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    
    return correct / sample_size, loss / sample_size

In [None]:
train_data, train_labels = load_from_tsfile_to_dataframe('EigenWorms_TRAIN.ts')
train_data = from_nested_to_3d_numpy(train_data).transpose((0,2,1))
train_labels = train_labels.astype(int) - 1

test_data, test_labels = load_from_tsfile_to_dataframe('EigenWorms_TEST.ts')
test_data = from_nested_to_3d_numpy(test_data).transpose((0,2,1))
test_labels = test_labels.astype(int) - 1

print(train_data.shape, test_data.shape)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
train_tensors, train_labels = torch.tensor(train_data, device=device, dtype=torch.float), torch.tensor(train_labels).long()
test_tensors, test_labels = torch.tensor(test_data, device=device, dtype=torch.float), torch.tensor(test_labels).long()
print(f"Data loaded onto device {train_tensors.device}")

In [None]:
%%time
train_sigs = compute_signatures(train_tensors, level=3)
test_sigs = compute_signatures(test_tensors, level=3)

print(train_sigs.shape, test_sigs.shape)
in_features = train_sigs.shape[1]

In [None]:
train_dataloader = DataLoader(TensorDataset(train_sigs, train_labels), batch_size=10, shuffle=True, drop_last=True)
test_dataloader = DataLoader(TensorDataset(test_sigs, test_labels), batch_size=10, shuffle=True, drop_last=True)

In [None]:
model = DenseNet(in_features=in_features, width=100, hidden_layers=3).to(device)
optimizer = Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

epochs = 200
with trange(epochs) as pbar:
    for t in pbar:
        pbar.set_description(f"Epoch {t+1}")
        train_loop(model, optimizer, loss_fn, train_dataloader, device)
        acc, loss = test_loop(model, loss_fn, test_dataloader, device)
        pbar.set_postfix(accuracy=f"{acc:.2%}", loss=f"{loss:.3f}")
        