In [1]:
# Work based on "TranAD: Deep Transformer Networks for Anomaly Detection in Multivariate Time Series Dat" by Tuli et. al. https://arxiv.org/abs/2201.07284
# Original implementation https://github.com/imperial-qore/TranAD


%load_ext autoreload
%autoreload 2

import os
os.chdir("..")  # Go up one level to the UROP directory

In [2]:
# Settings
SEED = 1
BATCH_SIZE = 128
NUM_WORKERS = 4
LR = 1e-3

# Model hyper-parameters
PE_DROPOUT = 0.3
TF_DROPOUT = 0.3
D_MODEL = 64
N_HEAD = 4
NUM_LAYERS = 3
DIM_FF = 128

In [3]:
import torch
from torch.utils.data import DataLoader
from src import LazyWindowedDataset, train_test_split

torch.manual_seed(SEED)

# Initialize Dataset
full_train_source_dataset = LazyWindowedDataset(
    root_dir="datasets/RoboticArm",
    split="train",
    anomaly_type=['normal'],
    domain_type=['source', 'target'],
    window_size_ms=100,
    stride_ms=50,
)

train_source_dataset, val_source_dataset = train_test_split(full_train_source_dataset)

train_loader = DataLoader(train_source_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, drop_last=True)
val_loader = DataLoader(val_source_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, drop_last=True)

test_source_dataset = LazyWindowedDataset(
    root_dir="datasets/RoboticArm",
    split="test",
    anomaly_type=['normal', 'anomaly'],
    domain_type=['source', 'target'],
    window_size_ms=100,
    stride_ms=50,
)
test_loader = DataLoader(test_source_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, drop_last=True)

In [4]:
import math
import torch
import torch.nn as nn

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=6000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model) # [max_len, d_model]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # [max_len, 1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sin to even indices, cos to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Reshape to [1, max_len, d_model] so it can be broadcast across batch
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len, :]  # Add positional encoding
        return self.dropout(x)


class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=3, dim_feedforward=128, dropout=0.1, pe_dropout=0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout=pe_dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                                   dim_feedforward=dim_feedforward, dropout=dropout,
                                                   batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_proj = nn.Linear(d_model, input_dim)

        self.config = {
            'input_dim': input_dim,
            'd_model': d_model,
            'nhead': nhead,
            'num_layers': num_layers,
            'dim_feedforward': dim_feedforward,
            'dropout': dropout,
            'pe_dropout': pe_dropout
        }

    def forward(self, x):
        # x shape: (batch_size, seq_len, input_dim)
        x = self.input_proj(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.output_proj(x)
        return x

    def get_config(self,):
        return self.config

    @staticmethod
    def from_config(config):
        return TimeSeriesTransformer(
            input_dim=config['input_dim'],
            d_model=config.get('d_model', 64),
            nhead=config.get('nhead', 4),
            num_layers=config.get('num_layers', 3),
            dim_feedforward=config.get('dim_feedforward', 128),
            dropout=config.get('dropout', 0.1),
            pe_dropout=config.get('pe_dropout', 0.1)
        )

In [5]:
config = {
    'input_dim': 7,
    'd_model': D_MODEL,
    'nhead': N_HEAD,
    'num_layers': NUM_LAYERS,
    'dim_feedforward': DIM_FF,
    'dropout': TF_DROPOUT,
    'pe_dropout': PE_DROPOUT
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = TimeSeriesTransformer.from_config(config).to(device)
optimizer = torch.optim.AdamW(model.parameters())
criterior = torch.nn.MSELoss()

In [6]:
from torchinfo import summary
summary(model, input_size=(BATCH_SIZE, 1600, 7))

Layer (type:depth-idx)                        Output Shape              Param #
TimeSeriesTransformer                         [128, 1600, 7]            --
├─Linear: 1-1                                 [128, 1600, 64]           512
├─PositionalEncoding: 1-2                     [128, 1600, 64]           --
│    └─Dropout: 2-1                           [128, 1600, 64]           --
├─TransformerEncoder: 1-3                     [128, 1600, 64]           --
│    └─ModuleList: 2-2                        --                        --
│    │    └─TransformerEncoderLayer: 3-1      [128, 1600, 64]           33,472
│    │    └─TransformerEncoderLayer: 3-2      [128, 1600, 64]           33,472
│    │    └─TransformerEncoderLayer: 3-3      [128, 1600, 64]           33,472
├─Linear: 1-4                                 [128, 1600, 7]            455
Total params: 101,383
Trainable params: 101,383
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 6.59
Input size (MB): 5.73
Forward/backward pass 

In [7]:
from src import train_model, evaluate

try:
    train_model(
        name='abra', 
        model=model, 
        criterion=criterior, 
        optimizer=optimizer, 
        train_loader=train_loader, 
        val_loader=None, # Skip validation to speed up
        merge_startegy='stack',
        num_epochs=1, 
        verbose=1,
        break_at_batch=50,
    )
except KeyboardInterrupt:
    print("Training interrupted by user.")

Training:   3%|▎         | 50/1712 [00:38<21:06,  1.31batch/s]

Epoch [1/1] (Checkpoint Epoch: 1) | Train Loss: 0.435989 | Val Loss: 0.000000 | Val AUC: 0.0000
Time Spent: 38.09s | ETA: 0.00s | Current Time: 2025-05-12 09:10:58
Checkpoint saved at checkpoints/abra_TimeSeriesTransformer_epoch_1.pt





In [8]:
loss, auc = evaluate(model, test_loader, criterior, merge_strategy='stack', verbose=True)
print(f"Overall S+T | Loss: {loss:.4f}, AUC: {auc:.4f}")

Evaluation: 100%|██████████| 540/540 [02:03<00:00,  4.38batch/s]

Overall S+T | Loss: 0.0986, AUC: 0.7695



