# AN2DL [2025‚Äì2026] ‚Äî Time Series Classification (Stratified K-Fold, SMOTE)

**NOTEBOOK BY thenegatives**


**Burchini - Collovigh - Corti - Ravasio**

## Google Drive

In [None]:
import sys, os

if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount("/gdrive")
    current_dir = "/gdrive/My Drive/DeepLearningChallenge/"
    try:
        os.chdir(current_dir)
    except FileNotFoundError:
        print("‚ö†Ô∏è Aggiorna `current_dir` alla tua cartella dati.")
else:
    current_dir = "."
print("Working dir:", os.getcwd())


Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
Working dir: /gdrive/My Drive/DeepLearningChallenge


## Librerie & seed

In [None]:
import os, random, warnings
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
from torch.optim import AdamW
from torch.cuda.amp import GradScaler, autocast

from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    f1_score,
    classification_report,
    accuracy_score,
    precision_score,
    recall_score,
)


import matplotlib.pyplot as plt

SEED = 42
warnings.simplefilter("ignore")

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


Device: cuda


## Configurazione

In [None]:
class CFG:
    n_folds = 6
    epochs = 65
    batch_size = 256
    lr = 5e-4
    weight_decay = 5e-4
    max_grad_norm = 2.0
    patience = 15
    label_smoothing = 0.01
    use_weighted_sampler = False

    hidden1 = 384
    hidden2 = 192
    dropout = 0.25

    train_csv = "pirate_pain_train.csv"
    train_labels_csv = "pirate_pain_train_labels.csv"
    test_csv = "pirate_pain_test.csv"

cfg = CFG()


## Caricamento dati

In [None]:
train_df = pd.read_csv(cfg.train_csv)
labels_df = pd.read_csv(cfg.train_labels_csv)
test_df  = pd.read_csv(cfg.test_csv)

data = pd.merge(train_df, labels_df, on="sample_index", how="left")

print("Train (time-step):", data.shape, "| Test (time-step):", test_df.shape)
print("Colonne:", list(data.columns)[:10], "...")


Train (time-step): (105760, 41) | Test (time-step): (211840, 40)
Colonne: ['sample_index', 'time', 'pain_survey_1', 'pain_survey_2', 'pain_survey_3', 'pain_survey_4', 'n_legs', 'n_hands', 'n_eyes', 'joint_00'] ...


## Selezione feature (35 input)

In [None]:
drop_cols = ["time", "n_legs", "n_hands", "n_eyes"]
feature_cols = [c for c in data.columns if c not in (["sample_index","label"] + drop_cols)]
assert len(feature_cols) == 35, f"Attese 35 feature, trovate {len(feature_cols)}"

print("Numero feature:", len(feature_cols))
print("Esempio:", feature_cols[:8])


Numero feature: 35
Esempio: ['pain_survey_1', 'pain_survey_2', 'pain_survey_3', 'pain_survey_4', 'joint_00', 'joint_01', 'joint_02', 'joint_03']


## Encoding etichette e tabella sequenze

In [None]:
label_encoder = {"no_pain":0, "low_pain":1, "high_pain":2}
inv_label_encoder = {v:k for k,v in label_encoder.items()}
data["label_encoded"] = data["label"].map(label_encoder).astype(int)

seq_df = data[["sample_index","label_encoded"]].drop_duplicates().reset_index(drop=True)
print("Soggetti:", len(seq_df))
print(seq_df["label_encoded"].value_counts().sort_index())


Soggetti: 661
label_encoded
0    511
1     94
2     56
Name: count, dtype: int64


## Helper: scaler, dataset, modello (MLP)

In [None]:
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
from torch.optim import AdamW
from torch.cuda.amp import GradScaler, autocast

def fit_minmax(X):
    mn = X.min(axis=0)
    mx = X.max(axis=0)
    denom = mx - mn
    denom = np.where(denom == 0.0, 1.0, denom)
    return mn, denom

def apply_minmax(X, mn, denom):
    return (X - mn) / denom

class NeuralNetwork(nn.Module):
  def __init__(self, in_dim=35, h1=128, h2=64, out_dim=3, dropout=0.10):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, h1),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(h1, h2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(h2, out_dim)
        )
  def forward(self, x):
        return self.net(x)

def make_loader(X, y=None, batch_size=256, shuffle=False, sampler=None):
    if y is None:
        ds = TensorDataset(torch.from_numpy(X.astype(np.float32)))
    else:
        ds = TensorDataset(torch.from_numpy(X.astype(np.float32)),
                           torch.from_numpy(y.astype(np.int64)))
    return DataLoader(ds, batch_size=batch_size, shuffle=(sampler is None and shuffle),
                      sampler=sampler, num_workers=2, pin_memory=True)


## Train & Eval

In [None]:
from sklearn.metrics import f1_score, classification_report
import torch

def train_one_epoch(model, loader, optimizer, criterion, scaler=None, max_grad_norm=2.0):
    model.train()
    total_loss, total_correct, total_samples = 0.0, 0, 0
    for x, y in loader:
        x = x.to(device); y = y.to(device)
        optimizer.zero_grad(set_to_none=True)
        with autocast(enabled=(scaler is not None)):
            logits = model(x)
            loss = criterion(logits, y)
        if scaler is not None:
            scaler.scale(loss).backward()
            if max_grad_norm is not None:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            scaler.step(optimizer); scaler.update()
        else:
            loss.backward()
            if max_grad_norm is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            optimizer.step()
        total_loss += loss.item() * x.size(0)
        total_correct += (logits.argmax(1) == y).sum().item()
        total_samples += x.size(0)
    return total_loss/max(1,total_samples), total_correct/max(1,total_samples)

@torch.no_grad()
def evaluate(model, loader, criterion=None):
    model.eval()
    total_loss, total_correct, total_samples = 0.0, 0, 0
    all_preds, all_tgts = [], []
    for x, y in loader:
        x = x.to(device); y = y.to(device)
        logits = model(x)
        if criterion is not None:
            loss = criterion(logits, y)
            total_loss += loss.item() * x.size(0)
        total_correct += (logits.argmax(1) == y).sum().item()
        total_samples += x.size(0)
        all_preds.append(logits.argmax(1).cpu().numpy())
        all_tgts.append(y.cpu().numpy())
    avg_loss = total_loss/max(1,total_samples) if total_samples>0 else None
    acc = total_correct/max(1,total_samples) if total_samples>0 else None
    return avg_loss, acc, np.concatenate(all_preds), np.concatenate(all_tgts)


def _get_x_from_batch(batch):
    # needed as batch may be a tensor, (x,), (x,y) or a dict
    if isinstance(batch, (list, tuple)):
        x = batch[0]
    elif isinstance(batch, dict):
        # try common keys
        for k in ('x', 'inputs', 'features'):
            if k in batch:
                x = batch[k]
                break
        else:
            raise ValueError("Impossibile estrarre le feature dal batch di tipo dict.")
    else:
        x = batch
    return x
@torch.no_grad()
def predict_proba(model, loader):
    model.eval()
    all_probs = []
    with torch.no_grad():
        for batch in loader:
            x = _get_x_from_batch(batch)
            x = x.to(device, non_blocking=True)  # uses previously defined variable "device"
            logits = model(x)
            probs = torch.softmax(logits, dim=1)
            all_probs.append(probs.detach().cpu().numpy())
    return np.concatenate(all_probs, axis=0)


## Stratified K-Fold (group-by `sample_index`) + OOF

In [None]:
X_all = data[feature_cols].values
y_all = data["label_encoded"].values
sid_all = data["sample_index"].values
X_test_all = test_df[feature_cols].values
sid_test_all = test_df["sample_index"].values

unique_sids = seq_df["sample_index"].values
sid_to_label = dict(zip(seq_df["sample_index"].values, seq_df["label_encoded"].values))
y_seq = np.array([sid_to_label[s] for s in unique_sids])

oof_probs = np.zeros((len(X_all), 3), dtype=np.float32)
oof_preds = np.zeros(len(X_all), dtype=np.int64)
oof_true  = y_all.copy()
test_fold_probs = []
val_metrics = []

from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from imblearn.over_sampling import SMOTE

skf = StratifiedKFold(n_splits=cfg.n_folds, shuffle=True, random_state=SEED)
for fold, (tr_seq_idx, va_seq_idx) in enumerate(skf.split(unique_sids, y_seq), start=1):
    tr_sids = unique_sids[tr_seq_idx]; va_sids = unique_sids[va_seq_idx]
    tr_mask = np.isin(sid_all, tr_sids); va_mask = np.isin(sid_all, va_sids)
    X_tr, y_tr = X_all[tr_mask], y_all[tr_mask]
    X_va, y_va = X_all[va_mask], y_all[va_mask]

    # Minority class oversampling is performed with SMOTE
    class_counts = np.bincount(y_tr)
    if len(class_counts) > 2:
        majority_cls = int(np.argmax(class_counts))
        minority_cls = 2  # high_pain
        if class_counts[minority_cls] < class_counts[majority_cls]:
            smote = SMOTE(sampling_strategy={minority_cls: int(class_counts[majority_cls])}, random_state=SEED)
            X_tr, y_tr = smote.fit_resample(X_tr, y_tr)

    mn, denom = fit_minmax(X_tr)
    X_tr_s = apply_minmax(X_tr, mn, denom)
    X_va_s = apply_minmax(X_va, mn, denom)
    X_te_s = apply_minmax(X_test_all, mn, denom)

    sampler = None
    if cfg.use_weighted_sampler:
        class_sample_count = np.array([np.sum(y_tr == t) for t in np.unique(y_tr)])
        weight_per_class = 1.0 / np.maximum(class_sample_count, 1)
        weights = np.array([weight_per_class[t] for t in y_tr])
        sampler = WeightedRandomSampler(weights=weights, num_samples=len(weights), replacement=True)

    classes = np.unique(y_tr)
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_tr)
    full_w = np.ones(3, dtype=np.float32)
    for i, c in enumerate(classes):
        full_w[c] = class_weights[i]

    # Spingiamo
    full_w[2] *= 1.5

    class_weights_t = torch.tensor(full_w, dtype=torch.float32, device=device)

    criterion = nn.CrossEntropyLoss(weight=class_weights_t, label_smoothing=cfg.label_smoothing)

    tr_loader = make_loader(X_tr_s, y_tr, batch_size=cfg.batch_size, shuffle=(sampler is None), sampler=sampler)
    va_loader = make_loader(X_va_s, y_va, batch_size=cfg.batch_size, shuffle=False)
    te_loader = make_loader(X_te_s, batch_size=cfg.batch_size, shuffle=False)

    model = NeuralNetwork(in_dim=len(feature_cols), h1=cfg.hidden1, h2=cfg.hidden2, out_dim=3, dropout=cfg.dropout).to(device)
    optimizer = AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    scaler = GradScaler()

    best_f1, best_state, no_improve = -1.0, None, 0
    print(f"\n===== FOLD {fold}/{cfg.n_folds} ‚Äî train rows: {len(X_tr)}, val rows: {len(X_va)} =====")
    for epoch in range(1, cfg.epochs+1):
        loss_tr, acc_tr = train_one_epoch(model, tr_loader, optimizer, criterion, scaler=scaler, max_grad_norm=cfg.max_grad_norm)
        loss_va, acc_va, preds_va, tgts_va = evaluate(model, va_loader, criterion=criterion)
        from sklearn.metrics import f1_score
        f1_va = f1_score(tgts_va, preds_va, average="micro")
        print(f"Epoch {epoch:02d} | loss_tr {loss_tr:.4f} acc_tr {acc_tr:.4f} | loss_va {loss_va:.4f} acc_va {acc_va:.4f} | f1_va {f1_va:.4f}")
        if f1_va > best_f1:
            best_f1 = f1_va
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= cfg.patience:
                print(f"Early stopping ‚Äî best F1: {best_f1:.4f}")
                break

    if best_state is not None:
        model.load_state_dict({k: v.to(device) for k, v in best_state.items()})
    # final evaluation on validation set with best_state
    _, _, preds_va, tgts_va = evaluate(model, va_loader, criterion=None)
    oof_preds[va_mask] = preds_va
    va_probs = predict_proba(model, va_loader)
    oof_probs[va_mask] = va_probs

    # Report for each fold
    rep = classification_report(
        tgts_va,
        preds_va,
        target_names=["no_pain","low_pain","high_pain"],
        digits=4
    )
    print("Fold report:\n", rep)

    # validation metrics for this fold
    acc_va  = accuracy_score(tgts_va, preds_va)
    prec_va = precision_score(tgts_va, preds_va, average="micro")
    rec_va  = recall_score(tgts_va, preds_va, average="micro")
    f1_va_macro = f1_score(tgts_va, preds_va, average="micro")

    val_metrics.append({
        "fold": fold,
        "accuracy": acc_va,
        "precision": prec_va,
        "recall": rec_va,
        "f1": f1_va_macro,
    })

    # test prediction on this fold
    test_probs = predict_proba(model, te_loader)
    test_fold_probs.append(test_probs)


from sklearn.metrics import f1_score, classification_report
oof_f1_micro = f1_score(oof_true, oof_preds, average="micro")
print(f"\nOOF F1 micro: {oof_f1_micro:.4f}")
print(classification_report(oof_true, oof_preds, target_names=["no_pain","low_pain","high_pain"], digits=4))


# we pick the fold with highest F1 micro
best_fold_metrics = max(val_metrics, key=lambda d: d["f1"])

row = {
    "Model": f"NN (fold {best_fold_metrics['fold']})",
    "Accuracy": best_fold_metrics["accuracy"],
    "Precision": best_fold_metrics["precision"],
    "Recall": best_fold_metrics["recall"],
    "F1 Score - Val": best_fold_metrics["f1"],
}

val_summary = pd.DataFrame([row])

print("\n Metriche del miglior fold di validazione (micro):")
print(val_summary.to_string(index=False, float_format=lambda x: f"{x:.4f}"))




===== FOLD 1/6 ‚Äî train rows: 148640, val rows: 17760 =====
Epoch 01 | loss_tr 0.7536 acc_tr 0.7108 | loss_va 0.6908 acc_va 0.7457 | f1_va 0.7457
Epoch 02 | loss_tr 0.4848 acc_tr 0.8311 | loss_va 0.5910 acc_va 0.7892 | f1_va 0.7892
Epoch 03 | loss_tr 0.3843 acc_tr 0.8702 | loss_va 0.5682 acc_va 0.8033 | f1_va 0.8033
Epoch 04 | loss_tr 0.3249 acc_tr 0.8964 | loss_va 0.5251 acc_va 0.8285 | f1_va 0.8285
Epoch 05 | loss_tr 0.2850 acc_tr 0.9148 | loss_va 0.4964 acc_va 0.8489 | f1_va 0.8489
Epoch 06 | loss_tr 0.2586 acc_tr 0.9263 | loss_va 0.4570 acc_va 0.8725 | f1_va 0.8725
Epoch 07 | loss_tr 0.2374 acc_tr 0.9359 | loss_va 0.4683 acc_va 0.8637 | f1_va 0.8637
Epoch 08 | loss_tr 0.2220 acc_tr 0.9422 | loss_va 0.4634 acc_va 0.8747 | f1_va 0.8747
Epoch 09 | loss_tr 0.2096 acc_tr 0.9463 | loss_va 0.4242 acc_va 0.8819 | f1_va 0.8819
Epoch 10 | loss_tr 0.1987 acc_tr 0.9518 | loss_va 0.4102 acc_va 0.8939 | f1_va 0.8939
Epoch 11 | loss_tr 0.1903 acc_tr 0.9547 | loss_va 0.4181 acc_va 0.8895 | f1_va

## Ensembling ‚Üí Predizioni Test + Aggregazione per soggetto

In [None]:
# ======================================================
# üîß Post-processing OOF
# ======================================================
print("\n=== Post-processing OOF a livello sample_index ===")

# aggregate by sample_index
df_oof = pd.DataFrame({
    "sample_index": sid_all,
    "y_true": oof_true,
    "p0": oof_probs[:, 0],
    "p1": oof_probs[:, 1],
    "p2": oof_probs[:, 2],
})

agg_oof = df_oof.groupby("sample_index").agg({
    "y_true": "first",
    "p0": "mean",
    "p1": "mean",
    "p2": "mean",
}).reset_index()

y_true_seq = agg_oof["y_true"].values
p_seq = agg_oof[["p0", "p1", "p2"]].values

# argmax @ sample level
baseline_preds = p_seq.argmax(axis=1)
baseline_f1 = f1_score(y_true_seq, baseline_preds, average="micro")
print(f"Seq-level baseline (argmax) F1 micro: {baseline_f1:.4f}")

# ------------------------------------------------------
# - if argmax == 2 but p2 <= t -> assigns best between {0,1}
# - else keep std argmax
# ------------------------------------------------------
best_t = None
best_f1 = -1.0

p2 = p_seq[:, 2]
best_non2 = p_seq[:, :2].argmax(axis=1)  # 0 or 1

for t in np.linspace(0.20, 0.80, 61):  # from 0.20 to 0.80 step 0.01
    # argmax standard
    preds = p_seq.argmax(axis=1)

    # find high pain prediction with low confidence
    mask_low_conf_2 = (preds == 2) & (p2 <= t)

    # in this case, pick best between no pain or low pain
    preds[mask_low_conf_2] = best_non2[mask_low_conf_2]

    f1_micro = f1_score(y_true_seq, preds, average="micro")

    if f1_micro > best_f1:
        best_f1 = f1_micro
        best_t = t

print(f"‚û°Ô∏è  Best F1 micro seq-level con soglia su high_pain: {best_f1:.4f} (t = {best_t:.2f})")

# final report
final_preds = p_seq.argmax(axis=1)
mask_low_conf_2 = (final_preds == 2) & (p2 <= best_t)
final_preds[mask_low_conf_2] = best_non2[mask_low_conf_2]

print("\nSeq-level classification report con soglia ottimizzata:")
print(classification_report(
    y_true_seq,
    final_preds,
    target_names=["no_pain","low_pain","high_pain"],
    digits=4
))



=== Post-processing OOF a livello sample_index ===
Seq-level baseline (argmax) F1 micro: 0.9531
‚û°Ô∏è  Best F1 micro seq-level con soglia su high_pain: 0.9561 (t = 0.57)

Seq-level classification report con soglia ottimizzata:
              precision    recall  f1-score   support

     no_pain     0.9639    0.9922    0.9778       511
    low_pain     0.9158    0.9255    0.9206        94
   high_pain     0.9500    0.6786    0.7917        56

    accuracy                         0.9561       661
   macro avg     0.9432    0.8654    0.8967       661
weighted avg     0.9559    0.9561    0.9539       661



In [None]:
test_mean_probs = np.mean(np.stack(test_fold_probs, axis=0), axis=0)
df_test_pred = pd.DataFrame({
    "sample_index": sid_test_all,
    "p0": test_mean_probs[:,0],
    "p1": test_mean_probs[:,1],
    "p2": test_mean_probs[:,2],
})
agg = df_test_pred.groupby("sample_index")[["p0","p1","p2"]].mean().reset_index()
agg["pred_label_id"] = agg[["p0","p1","p2"]].values.argmax(axis=1)
inv_label_encoder = {0:"no_pain", 1:"low_pain", 2:"high_pain"}
agg["label"] = agg["pred_label_id"].map(inv_label_encoder)
print(agg["label"].value_counts())
agg.head()




label
no_pain      1040
low_pain      183
high_pain     101
Name: count, dtype: int64


Unnamed: 0,sample_index,p0,p1,p2,pred_label_id,label
0,0,0.939205,0.051619,0.009176,0,no_pain
1,1,0.98158,0.016512,0.001908,0,no_pain
2,2,0.970753,0.016666,0.012581,0,no_pain
3,3,0.885723,0.03447,0.079807,0,no_pain
4,4,0.923291,0.016078,0.060631,0,no_pain


## Salvataggio `submission.csv`

In [None]:
submission = agg[["sample_index","label"]].sort_values("sample_index").reset_index(drop=True)
submission.to_csv("submission.csv", index=False)
print("‚úîÔ∏è Salvato:", os.path.abspath("submission.csv"))
submission.head()


‚úîÔ∏è Salvato: /gdrive/MyDrive/DeepLearningChallenge/submission.csv


Unnamed: 0,sample_index,label
0,0,no_pain
1,1,no_pain
2,2,no_pain
3,3,no_pain
4,4,no_pain
