In [None]:
!nvidia-smi

Mon Aug 25 07:58:16 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   56C    P8             12W /   70W |       2MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

#1. Import libraries

In [None]:
import torch

import numpy as np
import pandas as pd
import seaborn as sns

In [None]:
# Set random seed
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

<torch._C.Generator at 0x7fad5d193df0>

# 2. Data loading and preprocessing

The dataset contains 5,000 Time Series examples (obtained with ECG) with 140 timesteps. Each sequence corresponds to a single heartbeat from a single patient with congestive heart failure.

The training set contains 500 samples, while the other 4500 is in the testing set. Both contains all 5 types of heartbeats (classes):

* Normal (N)
* R-on-T Premature Ventricular Contraction (R-on-T PVC)
* Premature Ventricular Contraction (PVC)
* Supra-ventricular Premature or Ectopic Beat (SP or EB)
* Unclassified Beat (UB).

Download the dataset (Already uploaded to google drive)

In [None]:
!gdown 1fIKPykPzZ5pky8B59OYBtH7pzBORuZy3

Downloading...
From: https://drive.google.com/uc?id=1fIKPykPzZ5pky8B59OYBtH7pzBORuZy3
To: /content/ECG5000.zip
100% 10.6M/10.6M [00:00<00:00, 20.3MB/s]


In [None]:
!unzip -qq ECG5000.zip

replace ECG5000.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: A


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
train_path = f"/content/ECG5000_TRAIN.txt"
test_path = f"/content/ECG5000_TEST.txt"

# Load the datasets
train_data = np.loadtxt(train_path)
test_data = np.loadtxt(test_path)

all_data = np.vstack([train_data, test_data])

# Split into features (X) and labels (y)
X_all, y_all = all_data[:, 1:], all_data[:, 0].astype(int)

print("All data shape:", X_all.shape)
print(np.unique(y_all))

All data shape: (5000, 140)
[1 2 3 4 5]


After that, we'll separate normal vs anomalous sequences

In [None]:
class_names = ['Normal', 'R-on-T PVC', 'PVC', 'SP or EB', 'UB']
CLASS_NORMAL = 1

X_normal = X_all[y_all==CLASS_NORMAL]
X_anomaly = X_all[y_all!=CLASS_NORMAL]

print("Normal data shape:", X_normal.shape)
print("Anomaly data shape:", X_anomaly.shape)

Normal data shape: (2919, 140)
Anomaly data shape: (2081, 140)


We'll be training only normal sequences. The majority of the normals will be used for training, and we'll set aside part of it for validation and testing. The testing set will be used in conjunction with the anomalous sequences later when we are detecting anomalies

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_val = train_test_split(
    X_normal,
    test_size = 0.4,
    random_state = RANDOM_SEED
)

X_val, X_test_normals = train_test_split(
    X_val,
    test_size = 0.5,
    random_state = RANDOM_SEED
)

print("Train normals", X_train.shape)
print("Val normals", X_val.shape)
print("Test normals", X_test_normals.shape)

Train normals (1751, 140)
Val normals (584, 140)
Test normals (584, 140)


In [None]:
# Test = all anomalies + some normals
X_test = np.vstack([X_test_normals, X_anomaly])
y_test = np.hstack([
    np.zeros(len(X_test), dtype=int),   # normals = 0
    np.ones(len(X_anomaly), dtype=int)    # anomalies = 1
])
print("Test set:", X_test.shape, y_test.shape)

Test set: (2665, 140) (4746,)


Since ECG signals vary in amplitude and offset (e.g. , patient differences, electrode placement), data normalisation is necessary to prevent the model from wasting capacity to mdoel absolute scale instead of waveform shape

In [None]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler.fit(X_train)

X_train = scaler.transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

print("Train shape:", X_train.shape)
print("Val shape:", X_val.shape)
print("Test shape:", X_test.shape)

Train shape: (1751, 140)
Val shape: (584, 140)
Test shape: (2665, 140)


We'll then need to reshape the datasets into a shape of (N, n_len, features_num) so that they can be used with PyTorch

In [None]:
X_train = X_train[..., None]
X_val = X_val[..., None]
X_test = X_test[..., None]

print("Train shape:", X_train.shape)
print("Val shape:", X_val.shape)
print("Test shape:", X_test.shape)

Train shape: (1751, 140, 1)
Val shape: (584, 140, 1)
Test shape: (2665, 140, 1)


We'll then need to wrap these dataset into DataLoaders

In [None]:
from torch.utils.data import DataLoader, TensorDataset

def to_loader(X, y=None, batch_size=32, shuffle=False):
    X_t = torch.tensor(X, dtype=torch.float32)
    if y is None:
        ds = TensorDataset(X_t)
    else:
        y_t = torch.tensor(y, dtype=torch.long)
        ds = TensorDataset(X_t, y_t)
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

X_train_loader = to_loader(X_train,  batch_size=128, shuffle=True)
X_val_loader   = to_loader(X_val, batch_size=128)
X_test_loader  = to_loader(X_test, y_test, batch_size=256)

AssertionError: Size mismatch between tensors

#5. LSTM-AE

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LSTMAE(nn.Module):
    def __init__(self, seq_len=140, feat_dim=1, hidden_size=64, latent_dim=16, num_layers=2, dropout=0.0):
        super().__init__()
        self.seq_len = seq_len

        # ---- Encoder ----
        self.encoder_lstm = nn.LSTM(
            input_size=feat_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        self.to_latent = nn.Linear(hidden_size, latent_dim)

        # ---- Decoder ----
        self.decoder_lstm = nn.LSTM(
            input_size=latent_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        self.to_output = nn.Linear(hidden_size, feat_dim)

    def encode(self, x):
        # x: (B, T, F)
        _, (h_n, _) = self.encoder_lstm(x)   # take last hidden state
        h_last = h_n[-1]                     # (B, H)
        z = self.to_latent(h_last)           # (B, D)
        return z

    def decode(self, z):
        # Repeat latent across time to reconstruct sequence
        z_seq = z.unsqueeze(1).repeat(1, self.seq_len, 1)  # (B, T, D)
        y, _ = self.decoder_lstm(z_seq)                    # (B, T, H)
        x_hat = self.to_output(y)                          # (B, T, F)
        return x_hat

    def forward(self, x):
        z = self.encode(x)
        x_hat = self.decode(z)
        return x_hat, z


Model initialization

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LSTMAE(
    seq_len=140, feat_dim=1,
    hidden_size=64, latent_dim=16,
    num_layers=1, dropout=0.0
).to(device)

print(model)


TRaining setup
* Optimizer: Adam
* Loss: MAE
* Gradient clipping

In [None]:
opt = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)

def run_epoch(loader, train=True):
    model.train(train)
    total_loss = 0.0

    for (xb,) in loader:            # train/val loaders yield only X
        xb = xb.to(device)
        x_hat, _ = model(xb)
        loss = F.l1_loss(x_hat, xb)  # MAE loss

        if train:
            opt.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()

        total_loss += loss.item() * xb.size(0)

    return total_loss / len(loader.dataset)


Training loop:

In [None]:
best_val, patience, bad_epochs = float("inf"), 10, 0

for epoch in range(200):
    train_loss = run_epoch(X_train_loader, train=True)
    val_loss   = run_epoch(X_val_loader, train=False)

    print(f"Epoch {epoch+1}: train={train_loss:.4f}, val={val_loss:.4f}")

    # If val_loss improves by 1e-5, adjust best_val and save model
    if val_loss < best_val - 1e-5:
        best_val, bad_epochs = val_loss, 0
        torch.save(model.state_dict(), "lstm_ae_best.pt")
    else:
        bad_epochs += 1

    # Trigger early stopping and
    if bad_epochs >= patience:
        print("Early stopping triggered")
        break


Using the trained model.

In [None]:
model.load_state_dict(torch.load("lstm_ae_best.pt"))
model.eval()

with torch.no_grad():
    xb, _ = next(iter(X_test_loader))
    xb = xb.to(device)
    x_hat, z = model(xb)

print("Input batch shape:", xb.shape)
print("Reconstruction shape:", x_hat.shape)
print("Latent shape:", z.shape)


# Extract reconstruction error

In [None]:
import numpy as np

@torch.no_grad()
def get_recon_errors(loader, model, device):
    model.eval()
    errors, labels = [], []
    for batch in loader:
        if len(batch) == 2:   # (X, y) for test set
            xb, yb = batch
            labels.append(yb.numpy())
        else:                 # (X,) for train/val (no labels)
            xb = batch[0]

        xb = xb.to(device)
        x_hat, _ = model(xb)

        # error per sample = mean abs diff across time/features
        err = (x_hat - xb).abs().mean(dim=(1,2)).cpu().numpy()
        errors.append(err)
    errors = np.concatenate(errors)
    labels = None if not labels else np.concatenate(labels)
    return errors, labels

train_errs, _        = get_recon_errors(X_train_loader, model, device)
val_errs, _          = get_recon_errors(X_val_loader, model, device)
test_errs, test_lbls = get_recon_errors(X_test_loader, model, device)


In [None]:
thr = np.quantile(train_errs, 0.99)
print(thr)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score, average_precision_score, confusion_matrix

preds = (test_errs > thr).astype(int)
P, R, F1, _ = precision_recall_fscore_support(test_lbls, preds, average='binary', zero_division=0)
roc  = roc_auc_score(test_lbls, test_errs)
pr   = average_precision_score(test_lbls, test_errs)
cm   = confusion_matrix(test_lbls, preds)

print({"precision": P, "recall": R, "f1": F1, "roc_auc": roc, "pr_auc": pr})
print("Confusion matrix:\n", cm)


In [None]:
import matplotlib.pyplot as plt

# Histograms
plt.figure()
plt.hist(train_errs, bins=50, alpha=0.6, label="train normals")
plt.hist(test_errs[test_lbls==0], bins=50, alpha=0.6, label="test normals")
plt.hist(test_errs[test_lbls==1], bins=50, alpha=0.6, label="test anomalies")
plt.axvline(thr, color='r', linestyle='--', label=f"thr={thr:.4f}")
plt.legend(); plt.title("Reconstruction error distributions"); plt.show()

# PR & ROC curves
from sklearn.metrics import precision_recall_curve, roc_curve, auc
p, r, _ = precision_recall_curve(test_lbls, test_errs)
fpr, tpr, _ = roc_curve(test_lbls, test_errs)
plt.figure(); plt.plot(r, p); plt.title("PR curve"); plt.xlabel("Recall"); plt.ylabel("Precision"); plt.show()
plt.figure(); plt.plot(fpr, tpr); plt.title("ROC curve"); plt.xlabel("FPR"); plt.ylabel("TPR"); plt.show()

# Reconstruction overlays (pick one normal and one anomaly)
import numpy as np
def show_recon(i):
    xb, yb = next(iter(test_loader))
    xb = xb.to(device)
    xh, _ = model(xb)
    plt.figure()
    plt.plot(xb[i].cpu().numpy().squeeze(), label="original")
    plt.plot(xh[i].cpu().numpy().squeeze(), label="reconstructed")
    plt.legend(); plt.title(f"y={yb[i].item()}, err={float((xh[i]-xb[i]).abs().mean()):.4f}")
    plt.show()
