In [1]:
from collections import Counter
import random
import numpy as np
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix, classification_report, \
precision_score, recall_score, f1_score, average_precision_score, precision_recall_curve

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

if device.type == "cuda":
    torch.cuda.empty_cache()

Using device: cpu


# Data

fake data

In [None]:
PAD_VALUE = -100 # padding value for labels (see collate_fn)
BATCH_SIZE = 16

class SeqLabelingDataset(Dataset):
    def __init__(self, n_samples=100, min_len=5, max_len=15, dyn_feat_dim=4, static_feat_dim=2):
        self.samples = []
        for _ in range(n_samples):
            seq_len = random.randint(min_len, max_len)
            dyn_feat = torch.randn(seq_len, dyn_feat_dim)
            static_feat = torch.randn(static_feat_dim)
            labels = torch.randint(0, 2, (seq_len,))
            self.samples.append((dyn_feat, static_feat, labels))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]

def collate_fn(batch):
    # pad sequences such that they have the same length
    dyn_seqs, stat_feats, label_seqs = zip(*batch)
    lengths = torch.tensor([len(seq) for seq in dyn_seqs])

    padded_dyn = pad_sequence(dyn_seqs, batch_first=True)  # [B, T, F]
    padded_labels = pad_sequence(label_seqs, batch_first=True, padding_value=PAD_VALUE)  # [B, T]
    static_feats = torch.stack(stat_feats)  # [B, S]
    return padded_dyn, static_feats, lengths, padded_labels

# Model

In [4]:
class LSTMTagger(nn.Module):
    def __init__(self, dyn_feat_dim, static_feat_dim, hidden_dim):
        super().__init__()
        self.lstm = nn.LSTM(dyn_feat_dim, hidden_dim, batch_first=True)
        self.classifier = nn.Linear(hidden_dim + static_feat_dim, 1) # binary classification

    def forward(self, x_dyn, x_static, lengths):
        packed_input = pack_padded_sequence(x_dyn, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, _ = self.lstm(packed_input)
        lstm_out, _ = pad_packed_sequence(packed_output, batch_first=True)  # [B, T, H]

        # Expand static features to [B, T, S]
        B, T, _ = lstm_out.shape
        x_static_exp = x_static.unsqueeze(1).expand(B, T, -1)

        combined = torch.cat([lstm_out, x_static_exp], dim=-1)  # [B, T, H+S]
        logits = self.classifier(combined).squeeze(-1)  # shape: [B, T]
        return logits

def compute_pos_weight(dataset):
    # for binary class only
    counts = Counter()
    for _, _, labels in dataset:
        labels = labels[labels != PAD_VALUE]
        counts.update(labels.tolist())

    pos, neg = counts[1], counts[0]
    pos_weight = neg / pos if pos > 0 else 1.0  # avoid div-by-zero
    return torch.tensor([pos_weight], dtype=torch.float32)

def evaluate(model, dataloader):
    model.eval()
    all_probs, all_preds, all_labels = [], [], []
    with torch.no_grad():
        for x_dyn, x_static, lengths, labels in dataloader:
            x_dyn = x_dyn.to(device)
            x_static = x_static.to(device)
            lengths = lengths.to(device)
            labels = labels.to(device)

            logits = model(x_dyn, x_static, lengths)  # [B, T]
            probs = torch.sigmoid(logits)             # [B, T]
            preds = (probs > 0.5).long()

            # Apply mask
            mask = labels != PAD_VALUE                # [B, T]
            masked_labels = labels[mask]
            masked_preds = preds[mask]
            masked_probs = probs[mask]

            all_labels.extend(masked_labels.cpu().numpy())
            all_preds.extend(masked_preds.cpu().numpy())
            all_probs.extend(masked_probs.cpu().numpy())

    metrics = {}
    metrics['acc'] = accuracy_score(all_labels, all_preds)
    metrics['precision'] = precision_score(all_labels, all_preds, zero_division=0)
    metrics['recall'] = recall_score(all_labels, all_preds, zero_division=0)
    metrics['f1'] = f1_score(all_labels, all_preds, zero_division=0)

    try:
        metrics['auc'] = roc_auc_score(all_labels, all_probs)
    except ValueError:
        metrics['auc'] = None

    try:
        metrics['aucpr'] = average_precision_score(all_labels, all_probs)
    except ValueError:
        metrics['aucpr'] = None

    cm = confusion_matrix(all_labels, all_preds)
    report = classification_report(all_labels, all_preds, digits=4)
    return metrics, cm, report, all_probs, all_preds, all_labels

In [None]:
dyn_feat_dim = 4       # fixed by data
static_feat_dim = 2    # fixed by data
epochs = 5

dataset = SeqLabelingDataset()
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

In [6]:
d = dataset.samples[3]
d[0].shape, d[1].shape, d[2].shape

(torch.Size([13, 4]), torch.Size([2]), torch.Size([13]))

# Training

In [7]:
pos_weight = compute_pos_weight(dataset).to(device)
criterion = nn.BCEWithLogitsLoss(reduction="none", pos_weight=pos_weight) # mask manually in training loop

def training_loop(hidden_dim, learning_rate, verbose=True):
    model = LSTMTagger(dyn_feat_dim, static_feat_dim, hidden_dim).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        all_preds, all_probs, all_labels = [], [], []
        for x_dyn, x_static, lengths, labels in dataloader:
            x_dyn = x_dyn.to(device)
            x_static = x_static.to(device)
            lengths = lengths.to(device)
            labels = labels.to(device)

            # mask loss
            mask = labels != PAD_VALUE               # [B, T]
            logits = model(x_dyn, x_static, lengths) # [B, T]
            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).long()

            loss = criterion(logits, labels.float()) # [B, T]
            loss = loss[mask].mean()
            total_loss += loss.item()

            all_labels.extend(labels[mask].detach().cpu().numpy())
            all_preds.extend(preds[mask].detach().cpu().numpy())
            all_probs.extend(probs[mask].detach().cpu().numpy())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        train_loss = total_loss / len(dataloader)
        train_acc = accuracy_score(all_labels, all_preds)
        train_auc = roc_auc_score(all_labels, all_probs)
        valid_metrics, _, _, _, _, _ = evaluate(model, dataloader) # TODO: change to valid set

        train_mesg = f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.4f} | Train AUC: {train_auc:.4f}"
        test_mesg = f"Valid Acc: {valid_metrics['acc']:.4f} | Valid AUC: {valid_metrics['auc']:.4f}"
        if verbose:
            print(f"Epoch {epoch+1} | {train_mesg} | {test_mesg}")

    return model

In [8]:
trained_model = training_loop(hidden_dim=32, learning_rate=0.001)

Epoch 1 | Train Loss: 0.6948 | Train Accuracy: 0.4897 | Train AUC: 0.4982 | Valid Acc: 0.4838 | Valid AUC: 0.5041
Epoch 2 | Train Loss: 0.6964 | Train Accuracy: 0.4877 | Train AUC: 0.5044 | Valid Acc: 0.4858 | Valid AUC: 0.5078
Epoch 3 | Train Loss: 0.6943 | Train Accuracy: 0.4936 | Train AUC: 0.5042 | Valid Acc: 0.4995 | Valid AUC: 0.5112
Epoch 4 | Train Loss: 0.6925 | Train Accuracy: 0.4809 | Train AUC: 0.5115 | Valid Acc: 0.4848 | Valid AUC: 0.5153
Epoch 5 | Train Loss: 0.6916 | Train Accuracy: 0.4887 | Train AUC: 0.5150 | Valid Acc: 0.4887 | Valid AUC: 0.5189


In [9]:
metrics, cn, report, all_probs, all_preds, all_labels = evaluate(trained_model, dataloader)
metrics

{'acc': 0.4887144259077527,
 'precision': 0.48655256723716384,
 'recall': 0.39019607843137255,
 'f1': 0.43307943416757344,
 'auc': 0.5189086636619284,
 'aucpr': 0.5047250808994895}

# Hyperparameter tuning
## Optuna

In [13]:
import optuna

In [14]:
def objective(trial):
    # hyparameters
    hidden_dim = trial.suggest_categorical('hidden_dim', [32, 64, 128])
    learning_rate = trial.suggest_float('lr', 1e-4, 1e-2, log=True)

    trained_model = training_loop(hidden_dim, learning_rate, verbose=False)

    # valid AUC (return to Optuna)
    metrics, _, _, _, _, _ = evaluate(trained_model, dataloader)
    return metrics['auc']

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20, show_progress_bar=False, gc_after_trial=True)

print("\nBest parameters:", study.best_params)
print("Best valid AUC:", study.best_value)

[I 2025-07-05 17:54:30,948] A new study created in memory with name: no-name-1157a158-ea16-42ec-9784-820d78461e33
[I 2025-07-05 17:54:31,330] Trial 0 finished with value: 0.5242651874109172 and parameters: {'hidden_dim': 32, 'lr': 0.0013885605438395297}. Best is trial 0 with value: 0.5242651874109172.
[I 2025-07-05 17:54:31,925] Trial 1 finished with value: 0.5032705420085519 and parameters: {'hidden_dim': 128, 'lr': 0.00014184539233762306}. Best is trial 0 with value: 0.5242651874109172.
[I 2025-07-05 17:54:32,441] Trial 2 finished with value: 0.5009283870719211 and parameters: {'hidden_dim': 32, 'lr': 0.00017293784865628656}. Best is trial 0 with value: 0.5242651874109172.
[I 2025-07-05 17:54:33,160] Trial 3 finished with value: 0.5697368927924805 and parameters: {'hidden_dim': 128, 'lr': 0.003168723832106195}. Best is trial 3 with value: 0.5697368927924805.
[I 2025-07-05 17:54:33,709] Trial 4 finished with value: 0.5126834623829886 and parameters: {'hidden_dim': 32, 'lr': 0.00040302


Best parameters: {'hidden_dim': 64, 'lr': 0.009842926980984207}
Best valid AUC: 0.5952270888709118


save to file

In [12]:
trials_results = study.trials_dataframe().sort_values(by='value', ascending=False)
trials_results.to_csv('optuna_trials.csv', index=False)