# ECG Master Pipeline - Production Training Ready

Complete notebook-based pipeline for ECG classification with preprocessing, robust training, evaluation and inference.

## Quick Start Commands

**Headless execution (recommended for full runs):**
```powershell
jupyter nbconvert --to notebook --execute notebooks/master_pipeline.ipynb --ExecutePreprocessor.timeout=-1 --output logs/run_$(Get-Date -Format 'yyyyMMdd_HHmmss').ipynb
```

**Interactive modes:**
- Quick smoke test (256 samples): Run the "QUICK SMOKE RUN" cell
- Medium run (5k samples): Set `ECG_PREPROCESS_LIMIT=5000` in config, run training cells
- Full production: Set `ECG_PREPROCESS_LIMIT=0` (no limit), run training cells

## Notes
- Windows asyncio fix applied automatically in environment setup
- GPU auto-detection with fallback to CPU
- Mixed precision training enabled automatically on CUDA
- All artifacts saved to `artifacts/`, logs to `logs/`
- Checkpoints saved every epoch + best model by val F1

---
**Sections:** Config → Environment → Utilities → Mapping → Preprocessing → Dataset → Model → Training → Evaluation → Inference → Runbook

In [2]:
# Environment checks and directory setup
import os, sys, asyncio
if sys.platform == "win32":
    try:
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    except Exception:
        pass

from pathlib import Path
# Find project root by looking for Dataset folder or going up from cwd
ROOT = Path.cwd().resolve()
initial_root = ROOT
# If we're in notebooks/ subdirectory, go up one level
if ROOT.name == 'notebooks' and (ROOT.parent / 'Dataset').exists():
    ROOT = ROOT.parent
    print(f'[Adjusted] ROOT from {initial_root} -> {ROOT}')
# If still no Dataset found, try going up one more level
elif not (ROOT / 'Dataset').exists() and (ROOT.parent / 'Dataset').exists():
    ROOT = ROOT.parent
    print(f'[Adjusted] ROOT from {initial_root} -> {ROOT}')
DATASET_DIR = (ROOT / "Dataset")
ARTIFACTS_DIR = (ROOT / "artifacts")
PROCESSED_DIR = ARTIFACTS_DIR / "processed"
FIGURES_DIR = ARTIFACTS_DIR / "figures"
LOGS_DIR = ROOT / "logs"
for p in [ARTIFACTS_DIR, PROCESSED_DIR, PROCESSED_DIR / "records", FIGURES_DIR, LOGS_DIR]:
    p.mkdir(parents=True, exist_ok=True)

print('ROOT:', ROOT)
print('DATASET_DIR:', DATASET_DIR)
print('DATASET_DIR exists:', DATASET_DIR.exists())
if DATASET_DIR.exists():
    subdirs = [d.name for d in DATASET_DIR.iterdir() if d.is_dir()]
    print(f'Dataset subdirectories: {subdirs}')
print('ARTIFACTS_DIR:', ARTIFACTS_DIR)
print('PROCESSED_DIR:', PROCESSED_DIR)


[Adjusted] ROOT from D:\ecg-research\notebooks -> D:\ecg-research
ROOT: D:\ecg-research
DATASET_DIR: D:\ecg-research\Dataset
DATASET_DIR exists: True
Dataset subdirectories: ['PTB_Diagnostic', 'CinC2017', 'Chapman_Shaoxing', 'ptb-xl']
ARTIFACTS_DIR: D:\ecg-research\artifacts
PROCESSED_DIR: D:\ecg-research\artifacts\processed


## 2. ENVIRONMENT CHECKS & SETUP
Detects hardware, sets deterministic seeds, configures device and mixed precision.


In [None]:
# Environment checks: imports, device detection, seeds, asyncio fix
import os, sys, random, json, time, math, asyncio
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from collections import Counter, defaultdict
from pathlib import Path

# Windows asyncio policy fix (prevents timeout warnings in nbconvert)
if sys.platform == "win32":
    try:
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        print("[Windows] asyncio policy set to WindowsSelectorEventLoopPolicy")
    except Exception as e:
        print(f"[Warning] Could not set asyncio policy: {e}")

print("=" * 60)
print("ENVIRONMENT CHECK")
print("=" * 60)
print(f"Python: {sys.version.split()[0]}")
print(f"PyTorch: {torch.__version__}")
print(f"NumPy: {np.__version__}")
print(f"Pandas: {pd.__version__}")

# Device detection
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\nDevice: {DEVICE}")

if DEVICE.type == 'cuda':
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"CUDA Version: {torch.version.cuda}")
    gpu_mem_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
    print(f"GPU Memory: {gpu_mem_gb:.2f} GB")
    print(f"Available GPUs: {torch.cuda.device_count()}")

    # Set batch size based on GPU
    BATCH_SIZE = BATCH_SIZE_GPU
    USE_AMP = MIXED_PRECISION
    print(f"\n[GPU Mode] Batch size: {BATCH_SIZE}, Mixed Precision: {USE_AMP}")

    # Optional: enable cudnn benchmark for performance
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = False  # Slight speed boost, minor non-determinism
else:
    BATCH_SIZE = BATCH_SIZE_CPU
    USE_AMP = False
    print(f"\n[CPU Mode] Batch size: {BATCH_SIZE}, Mixed Precision: disabled")

# Set deterministic seeds (use SEED from config or default)
if 'SEED' not in globals():
    SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if DEVICE.type == 'cuda':
    torch.cuda.manual_seed_all(SEED)

print(f"\nSeed: {SEED} (deterministic mode enabled)")
print("=" * 60)

# Check disk space (optional sanity check)
try:
    import shutil
    total, used, free = shutil.disk_usage(str(ROOT))
    free_gb = free / (1024**3)
    print(f"Free disk space: {free_gb:.2f} GB")
    if free_gb < 50:
        print("[Warning] Less than 50GB free. Preprocessing may require significant space.")
except Exception:
    pass

print("=" * 60)


## 1. CONFIGURATION - USER-TUNABLE HYPERPARAMETERS
Edit this cell to configure your training run. All settings have sensible defaults.


In [5]:
# ===== PRODUCTION TRAINING CONFIGURATION =====
# Adjust these parameters based on your hardware and dataset size

# --- Data preprocessing ---
TARGET_FS = 500                           # Target sampling frequency (Hz)
TARGET_SAMPLES = 5000                     # Signal length (10s @ 500Hz)
ECG_PREPROCESS_LIMIT = 0                  # 0=all data, >0=limit for quick runs (e.g., 5000)

# --- Training hyperparameters ---
BATCH_SIZE_CPU = 8                        # Batch size when running on CPU
BATCH_SIZE_GPU = 64                       # Batch size when GPU available (adjust based on GPU memory)
EPOCHS = 20                               # Number of training epochs
LR = 1e-3                                 # Initial learning rate
WEIGHT_DECAY = 1e-4                       # AdamW weight decay for regularization
GRAD_ACCUM_STEPS = 1                      # Gradient accumulation steps (increase if GPU memory limited)
CLIP_NORM = 1.0                           # Gradient clipping norm (0 to disable)
SCHEDULER_TYPE = 'cosine'                 # 'cosine' or 'step' learning rate schedule
EARLY_STOP_PATIENCE = 0                   # Early stopping patience (0 to disable)

# --- Performance options ---
MIXED_PRECISION = True                    # Enable mixed precision (auto-enabled on CUDA)
NUM_WORKERS = 0                           # DataLoader workers (0=main thread, safe for Windows)
DRY_RUN = False                           # True: run only 10 steps per epoch for quick validation

# --- Paths (auto-detected, override if needed) ---
from pathlib import Path
import os
ROOT = Path(os.environ.get('ECG_ROOT', Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()))
DATASET_DIR = ROOT / "Dataset"
ARTIFACTS_DIR = ROOT / "artifacts"
PROCESSED_DIR = ARTIFACTS_DIR / "processed"
CHECKPOINT_DIR = PROCESSED_DIR / "checkpoints"
FIGURES_DIR = ARTIFACTS_DIR / "figures"
LOGS_DIR = ROOT / "logs"

# --- Label configuration ---
LABEL_ORDER = ['MI', 'AF', 'BBB', 'NORM', 'OTHER']
LABEL_TO_INT = {label: idx for idx, label in enumerate(LABEL_ORDER)}
INT_TO_LABEL = {idx: label for label, idx in LABEL_TO_INT.items()}

# --- Seeds for reproducibility ---
SEED = int(os.environ.get('ECG_SEED', 42))

# Create required directories
for directory in [ARTIFACTS_DIR, PROCESSED_DIR, CHECKPOINT_DIR, FIGURES_DIR, LOGS_DIR]:
    Path(directory).mkdir(parents=True, exist_ok=True)

print("=" * 60)
print("CONFIGURATION LOADED")
print("=" * 60)
print(f"Dataset limit: {'ALL DATA' if ECG_PREPROCESS_LIMIT == 0 else f'{ECG_PREPROCESS_LIMIT} samples'}")
print(f"Epochs: {EPOCHS}, LR: {LR}, Weight Decay: {WEIGHT_DECAY}")
print(f"Batch size (CPU/GPU): {BATCH_SIZE_CPU}/{BATCH_SIZE_GPU}")
print(f"Scheduler: {SCHEDULER_TYPE}, Grad Accum: {GRAD_ACCUM_STEPS}, Clip: {CLIP_NORM}")
print(f"Mixed Precision: {MIXED_PRECISION}, Workers: {NUM_WORKERS}")
print(f"Dry run mode: {DRY_RUN}")
print(f"Output dirs: {CHECKPOINT_DIR}, {FIGURES_DIR}")
print("=" * 60)


BATCH_SIZE 8 EPOCHS 2 AMP False


In [6]:
# Utilities: IO, normalization, resample, safe save/load
import json, gzip
import numpy as np
from scipy import signal
from pathlib import Path

def zscore_norm(x, eps=1e-6):
    x = np.asarray(x, dtype=np.float32)
    m = x.mean(axis=-1, keepdims=True)
    s = x.std(axis=-1, keepdims=True)
    s[s < eps] = 1.0
    return (x - m) / s

def pad_or_truncate(x, target_len):
    x = np.asarray(x, dtype=np.float32)
    if x.ndim == 1:
        if x.shape[0] >= target_len:
            return x[:target_len]
        else:
            pad = target_len - x.shape[0]
            return np.pad(x, (0, pad), mode='constant')
    elif x.ndim == 2:
        # assume shape (leads, samples)
        if x.shape[1] >= target_len:
            return x[:, :target_len]
        else:
            pad = target_len - x.shape[1]
            return np.pad(x, ((0,0),(0,pad)), mode='constant')
    else:
        raise ValueError('Unexpected signal shape')

def safe_save_npz(path: Path, signal_array, label:int, metadata=None):
    path.parent.mkdir(parents=True, exist_ok=True)
    if metadata is None:
        metadata = {}
    np.savez_compressed(path, signal=signal_array.astype(np.float32), label=int(label), metadata=json.dumps(metadata))

def load_npz(path:Path):
    with np.load(path, allow_pickle=True) as d:
        sig = d['signal'].astype(np.float32)
        lbl = int(d['label'])
        meta = json.loads(d['metadata'].tolist() if hasattr(d['metadata'],'tolist') else d['metadata'])
    return sig, lbl, meta


In [5]:
# Load unified mapping if present; else load candidate, else fallback
from collections import Counter
UNIFIED_CSV = LOGS_DIR / "unified_label_mapping.csv"
CANDIDATE_CSV = LOGS_DIR / "unified_label_mapping.candidate.csv"

mapping_index = {}
if UNIFIED_CSV.exists() and UNIFIED_CSV.stat().st_size>0:
    df_map = pd.read_csv(UNIFIED_CSV, dtype=str).fillna('')
    print('Loaded unified mapping:', UNIFIED_CSV, len(df_map))
else:
    if CANDIDATE_CSV.exists() and CANDIDATE_CSV.stat().st_size>0:
        df_map = pd.read_csv(CANDIDATE_CSV, dtype=str).fillna('')
        print('Loaded candidate mapping:', CANDIDATE_CSV, len(df_map))
    else:
        df_map = pd.DataFrame(columns=['dataset','record_id','mapped_label'])
        print('No mapping CSV found; will default to OTHER')

# Build mapping index (dataset -> key -> label)
for _, row in df_map.iterrows():
    ds = str(row.get('dataset','')).strip()
    rid = str(row.get('record_id','')).strip().replace('\\','/').strip('/')
    lab = str(row.get('mapped_label','')).strip().upper()
    if not ds or not rid:
        continue
    mapping_index.setdefault(ds, {})[rid] = lab

print('Datasets in mapping:', list(mapping_index.keys())[:10])


Loaded unified mapping: D:\ecg-research\logs\unified_label_mapping.csv 84556
Datasets in mapping: ['ptb-xl', 'CinC2017', 'PTB_Diagnostic', 'Chapman_Shaoxing']


In [7]:
# label lookup utility used during preprocessing
def lookup_mapped_label(dataset_name, record_id):
    idx = mapping_index.get(dataset_name, {})
    if record_id in idx:
        lab = idx[record_id].upper()
        return lab if lab in LABEL_TO_INT else 'OTHER'
    # try basename
    base = record_id.split('/')[-1]
    if base in idx:
        lab = idx[base].upper()
        return lab if lab in LABEL_TO_INT else 'OTHER'
    return 'OTHER'


## Preprocessing (streaming). This cell scans supported datasets and writes per-record .npz files into artifacts/processed/records. It is I/O-heavy and may take hours for full dataset.

In [7]:
# Preprocessing: very conservative memory-safe loop
import wfdb
import scipy.io
from pathlib import Path
from tqdm import tqdm
import traceback

RECORDS_DIR = PROCESSED_DIR / "records"
RECORDS_DIR.mkdir(parents=True, exist_ok=True)

# helper to read recordings (WFDB .hea/.dat or .mat)
def read_record_generic(full_path: Path):
    # returns (signal (n_leads, n_samples), fs, meta_dict)
    try:
        if full_path.suffix.lower() == '.mat':
            data = scipy.io.loadmat(str(full_path))
            # try several common keys
            for k in ['val','data','sig','ecg']:
                if k in data:
                    arr = data[k]
                    arr = np.asarray(arr, dtype=np.float32)
                    if arr.ndim==2 and arr.shape[0] > arr.shape[1]:
                        # ensure shape (leads, samples)
                        return arr, int(data.get('fs', TARGET_FS)), {'source':'mat','path':str(full_path)}
            # fallback - find first numeric
            arr = None
            for v in data.values():
                if isinstance(v, np.ndarray) and v.ndim==2:
                    arr = v.astype(np.float32)
                    break
            if arr is None:
                raise RuntimeError('No 2D array found in mat')
            return arr, int(data.get('fs', TARGET_FS)), {'source':'mat','path':str(full_path)}
        else:
            # WFDB read using record name without .hea
            rec_dir = full_path.parent
            rec_name = full_path.stem
            record = wfdb.rdrecord(str(full_path.with_suffix('')))
            sig = np.asarray(record.p_signal.T, dtype=np.float32)  # shape (leads, samples)
            fs = int(getattr(record, 'fs', TARGET_FS))
            return sig, fs, {'source':'wfdb','path':str(full_path)}
    except Exception as e:
        # bubble up
        raise

# iterate datasets (supported minimal set)
candidates = []
if DATASET_DIR.exists():
    for ds in sorted(DATASET_DIR.iterdir()):
        if ds.is_dir():
            candidates.append(ds)
print('Datasets found:', [p.name for p in candidates])

# We'll process with a limit if provided
LIMIT = int(os.environ.get('ECG_PREPROCESS_LIMIT', 0))
print('Processing limit (0 means all):', LIMIT)

manifest = []
skipped = 0
processed = 0

# For speed and safety, define file patterns per dataset (common)
patterns = {
    'ptb-xl': ['**/*.dat','**/*.hea','**/*_hr.mat','**/*_lr.mat'],
    'CinC2017': ['**/*.mat','**/*.hea','**/*.atr','training/*.mat'],
    'PTB_Diagnostic': ['**/*.dat','**/*.hea'],
    'Chapman_Shaoxing': ['**/*.dat','**/*.hea','**/*.mat']
}

# If wfdb package missing, fallback to synthetic creation
if not candidates:
    print('No dataset folders – generating synthetic samples for quick smoke tests')
    t = np.linspace(0, 10, TARGET_SAMPLES, dtype=np.float32)
    for i in range(200):
        s = np.sin(2*np.pi*(1+i*0.1)*t).astype(np.float32)
        out = RECORDS_DIR / f"SYNTH_{i:05d}.npz"
        safe_save_npz(out, s, i%len(LABEL_ORDER), {'dataset':'SYNTH'})
        manifest.append({'path': f"records/{out.name}", 'label': int(i%len(LABEL_ORDER))})
    processed = len(manifest)
else:
    # iterate dataset folders and patterns
    for ds in candidates:
        ds_name = ds.name
        pat_list = patterns.get(ds_name, ['**/*.hea','**/*.mat','**/*.dat'])
        files = []
        for pat in pat_list:
            files.extend(list(ds.rglob(pat)))
        # prefer .hea as index entries: convert to unique set
        files = sorted(set(files))
        if LIMIT and processed >= LIMIT:
            break
        for fpath in tqdm(files, desc=f"Processing {ds_name}", unit='file'):
            try:
                # simple TRY: read using wfdb or mat loader; if fails, skip
                try:
                    sig, fs, meta = read_record_generic(fpath)
                except Exception:
                    # if WFDB read fails try reading .hea by name
                    try:
                        rec = wfdb.rdrecord(str(fpath.with_suffix('')))
                        sig = np.asarray(rec.p_signal.T, dtype=np.float32)
                        fs = int(getattr(rec, 'fs', TARGET_FS))
                        meta = {'source':'wfdb'}
                    except Exception as e:
                        skipped += 1
                        continue

                # resample if needed
                if fs != TARGET_FS:
                    # resample each lead
                    num = int(round(sig.shape[1] * (TARGET_FS / float(fs))))
                    sig = signal.resample(sig, num, axis=1).astype(np.float32)
                    fs = TARGET_FS

                # normalize and pad/truncate
                if sig.ndim == 1:
                    sig = np.expand_dims(sig, 0)
                sig = zscore_norm(sig)
                sig = pad_or_truncate(sig, TARGET_SAMPLES)

                # build record id relative to dataset root
                try:
                    rel = fpath.relative_to(DATASET_DIR).as_posix()
                except Exception:
                    rel = fpath.name
                # lookup mapped label
                mapped = lookup_mapped_label(ds_name, rel)
                label_int = LABEL_TO_INT.get(mapped, LABEL_TO_INT['OTHER'])

                out_file = RECORDS_DIR / f"{ds_name}__{rel.replace('/','__').replace('.','_')}.npz"
                safe_save_npz(out_file, sig, label_int, {'dataset': ds_name, 'src': rel})
                manifest.append({'path': f"records/{out_file.name}", 'label': label_int})
                processed += 1

                if LIMIT and processed >= LIMIT:
                    break
            except Exception as e:
                skipped += 1
                # write short log entry
                with open(LOGS_DIR / "preprocess_errors.log", "a", encoding="utf-8") as fh:
                    fh.write(f"{fpath} -> {repr(e)}\n")
                continue

print('Done. processed:', processed, 'skipped:', skipped)
# persist manifest and splits
import json
with open(PROCESSED_DIR / "manifest.jsonl", "w", encoding="utf-8") as fh:
    for rec in manifest:
        fh.write(json.dumps(rec) + "\n")

# build simple stratified splits
from sklearn.model_selection import train_test_split
paths = [m['path'] for m in manifest]
labels = [m['label'] for m in manifest]
if paths:
    train_p, test_p, y_train, y_test = train_test_split(paths, labels, test_size=0.2, stratify=labels, random_state=SEED)
    val_p, test_p, y_val, y_test = train_test_split(test_p, y_test, test_size=0.5, stratify=y_test, random_state=SEED)
    splits = {'paths': {'train': train_p, 'val': val_p, 'test': test_p}}
    with open(PROCESSED_DIR / "splits.json", "w", encoding="utf-8") as fh:
        json.dump(splits, fh, indent=2)
    print('Splits saved. Train:', len(train_p), 'Val:', len(val_p), 'Test:', len(test_p))
else:
    print('No manifest entries – nothing to split.')


Datasets found: ['Chapman_Shaoxing', 'CinC2017', 'ptb-xl', 'PTB_Diagnostic']
Processing limit (0 means all): 0


Processing Chapman_Shaoxing: 100%|██████████| 90304/90304 [4:02:02<00:00,  6.22file/s]   
Processing CinC2017: 100%|██████████| 17656/17656 [48:34<00:00,  6.06file/s] 
Processing ptb-xl: 100%|██████████| 87196/87196 [5:33:57<00:00,  4.35file/s]   
Processing PTB_Diagnostic: 100%|██████████| 1098/1098 [05:26<00:00,  3.37file/s]


Done. processed: 196252 skipped: 2
Splits saved. Train: 157001 Val: 19625 Test: 19626


## Dataset & DataLoader (lazy loading)

In [8]:
# PyTorch Dataset reading .npz files lazily
import torch
from torch.utils.data import Dataset, DataLoader

class ECGDataset(Dataset):
    def __init__(self, entries, base_dir):
        self.entries = entries
        self.base_dir = Path(base_dir)
    def __len__(self):
        return len(self.entries)
    def __getitem__(self, idx):
        p = self.entries[idx]
        sig, label, meta = load_npz(self.base_dir / p.split('records/')[-1])
        # ensure shape (1, samples)
        if sig.ndim == 2:
            # use mean across leads for single-lead baseline
            sig = sig.mean(axis=0, keepdims=True)
        tensor = torch.from_numpy(sig).float()
        return tensor, torch.tensor(label, dtype=torch.long)

# quick loader constructor
def build_loaders(limit=None):
    import json
    with open(PROCESSED_DIR / 'splits.json','r') as fh:
        splits = json.load(fh)
    train_list = splits['paths']['train']
    val_list = splits['paths']['val']
    test_list = splits['paths']['test']
    if limit:
        train_list = train_list[:limit]
        val_list = val_list[:int(limit*0.2)]
        test_list = test_list[:int(limit*0.2)]
    train_ds = ECGDataset(train_list, PROCESSED_DIR / 'records')
    val_ds = ECGDataset(val_list, PROCESSED_DIR / 'records')
    test_ds = ECGDataset(test_list, PROCESSED_DIR / 'records')
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
    return train_loader, val_loader, test_loader

# show example batch if available
try:
    tr, va, te = build_loaders(limit=16)
    xb, yb = next(iter(tr))
    print('example batch:', xb.shape, yb.shape)
except Exception as e:
    print('build_loaders failed:', e)


example batch: torch.Size([8, 1, 5000]) torch.Size([8])


## Model (compact 1D ResNet-like). GPU intensive: forward/backward, mixed precision.

In [9]:
# Simple 1D CNN with residual blocks
import torch.nn as nn
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k=7, s=2):
        super().__init__()
        self.conv = nn.Conv1d(in_ch, out_ch, kernel_size=k, stride=s, padding=k//2)
        self.bn = nn.BatchNorm1d(out_ch)
        self.act = nn.GELU()
    def forward(self,x):
        return self.act(self.bn(self.conv(x)))

class SmallResNet1D(nn.Module):
    def __init__(self, in_ch=1, num_classes=len(LABEL_ORDER)):
        super().__init__()
        self.stem = nn.Sequential(
            ConvBlock(in_ch, 16, k=11, s=2),
            ConvBlock(16, 32, k=9, s=2),
        )
        self.res1 = nn.Sequential(
            ConvBlock(32, 32, k=7, s=1),
            ConvBlock(32, 32, k=5, s=1),
        )
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Dropout(0.3),
            nn.Linear(32, num_classes)
        )
    def forward(self,x):
        x = self.stem(x)
        r = self.res1(x)
        x = x + r
        return self.head(x)

model = SmallResNet1D().to(DEVICE)
print(model)
print('num params:', sum(p.numel() for p in model.parameters() if p.requires_grad))


SmallResNet1D(
  (stem): Sequential(
    (0): ConvBlock(
      (conv): Conv1d(1, 16, kernel_size=(11,), stride=(2,), padding=(5,))
      (bn): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): GELU(approximate='none')
    )
    (1): ConvBlock(
      (conv): Conv1d(16, 32, kernel_size=(9,), stride=(2,), padding=(4,))
      (bn): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): GELU(approximate='none')
    )
  )
  (res1): Sequential(
    (0): ConvBlock(
      (conv): Conv1d(32, 32, kernel_size=(7,), stride=(1,), padding=(3,))
      (bn): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): GELU(approximate='none')
    )
    (1): ConvBlock(
      (conv): Conv1d(32, 32, kernel_size=(5,), stride=(1,), padding=(2,))
      (bn): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act): GELU(approximate='none')
    )
  )
  (head): Seq

## 7. PRODUCTION TRAINING LOOP
**=== LONG RUN: USER MUST RUN THIS CELL MANUALLY ===**

Full-featured training with:
- Mixed precision (AMP)
- Gradient accumulation & clipping
- Cosine/step LR scheduler
- Checkpointing & resume
- Early stopping (optional)
- Per-class metrics logging

**Estimated runtime:** ~10-60 min depending on dataset size and hardware
**Disk usage:** ~500MB-2GB for checkpoints


In [10]:
# Production training loop with all features
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR, StepLR
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, precision_recall_fscore_support
import json
import time
from tqdm import tqdm
from datetime import datetime

def worker_init_fn(worker_id):
    """Ensure reproducible DataLoader workers"""
    np.random.seed(SEED + worker_id)
    random.seed(SEED + worker_id)

def compute_class_weights(labels):
    """Compute class weights for imbalanced datasets"""
    from collections import Counter
    counts = Counter(labels)
    total = sum(counts.values())
    weights = {cls: total / (len(counts) * count) for cls, count in counts.items()}
    weight_tensor = torch.tensor([weights.get(i, 1.0) for i in range(len(LABEL_ORDER))], dtype=torch.float32)
    return weight_tensor

def evaluate_detailed(model, loader, device):
    """Comprehensive evaluation with per-class metrics"""
    model.eval()
    all_labels = []
    all_preds = []
    all_probs = []

    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)

            if USE_AMP:
                with torch.amp.autocast('cuda'):
                    logits = model(xb)
            else:
                logits = model(xb)

            probs = torch.softmax(logits, dim=1)
            preds = logits.argmax(dim=1)

            all_labels.extend(yb.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Compute metrics
    acc = accuracy_score(all_labels, all_preds)
    f1_macro = f1_score(all_labels, all_preds, average='macro', zero_division=0)
    f1_weighted = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    # Per-class metrics
    precision, recall, f1, support = precision_recall_fscore_support(
        all_labels, all_preds, average=None, zero_division=0
    )

    per_class = {}
    for idx, label_name in enumerate(LABEL_ORDER):
        per_class[label_name] = {
            'precision': float(precision[idx]) if idx < len(precision) else 0.0,
            'recall': float(recall[idx]) if idx < len(recall) else 0.0,
            'f1': float(f1[idx]) if idx < len(f1) else 0.0,
            'support': int(support[idx]) if idx < len(support) else 0
        }

    cm = confusion_matrix(all_labels, all_preds)

    return {
        'accuracy': float(acc),
        'f1_macro': float(f1_macro),
        'f1_weighted': float(f1_weighted),
        'per_class': per_class,
        'confusion_matrix': cm.tolist(),
        'labels': all_labels,
        'predictions': all_preds,
        'probabilities': all_probs
    }

def train_production(model, train_loader, val_loader, test_loader=None):
    """
    Production training with all features enabled.
    """
    print("=" * 60)
    print("PRODUCTION TRAINING START")
    print("=" * 60)
    print(f"Device: {DEVICE}")
    print(f"Batch size: {BATCH_SIZE}, Epochs: {EPOCHS}")
    print(f"LR: {LR}, Weight decay: {WEIGHT_DECAY}")
    print(f"Gradient accumulation: {GRAD_ACCUM_STEPS}, Clip norm: {CLIP_NORM}")
    print(f"Scheduler: {SCHEDULER_TYPE}, Early stop patience: {EARLY_STOP_PATIENCE}")
    print(f"Mixed precision: {USE_AMP}")
    print(f"Dry run: {DRY_RUN}")
    print("=" * 60)

    # Setup optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

    if SCHEDULER_TYPE == 'cosine':
        scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=LR * 0.01)
    else:
        scheduler = StepLR(optimizer, step_size=max(1, EPOCHS // 3), gamma=0.1)

    # Mixed precision scaler
    scaler = torch.amp.GradScaler('cuda', enabled=USE_AMP and DEVICE.type == 'cuda')

    # Compute class weights for imbalanced data
    all_labels = [item['label'] for item in json.load(open(PROCESSED_DIR / 'splits.json'))['paths']['train']]
    # Actually we need to load from manifest - simplified for now
    try:
        # Try to compute weights from available data
        sample_labels = []
        for i, (_, y) in enumerate(train_loader):
            sample_labels.extend(y.numpy())
            if i >= 10:  # Sample first 10 batches
                break
        class_weights = compute_class_weights(sample_labels)
        print(f"Class weights: {class_weights.tolist()}")
        criterion = nn.CrossEntropyLoss(weight=class_weights.to(DEVICE))
    except Exception as e:
        print(f"[Warning] Could not compute class weights: {e}")
        criterion = nn.CrossEntropyLoss()

    # Training state
    start_epoch = 0
    best_val_f1 = -1.0
    patience_counter = 0
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'val_f1_macro': [],
        'val_f1_weighted': [],
        'learning_rates': []
    }

    # Resume from checkpoint if exists
    resume_ckpt = CHECKPOINT_DIR / "last_checkpoint.pth"
    if resume_ckpt.exists():
        print(f"\n[Resume] Loading checkpoint from {resume_ckpt}")
        try:
            ckpt = torch.load(resume_ckpt, map_location=DEVICE)
            model.load_state_dict(ckpt['model_state'])
            optimizer.load_state_dict(ckpt['optimizer_state'])
            scheduler.load_state_dict(ckpt['scheduler_state'])
            if 'scaler_state' in ckpt and USE_AMP:
                scaler.load_state_dict(ckpt['scaler_state'])
            start_epoch = ckpt.get('epoch', 0)
            history = ckpt.get('history', history)
            best_val_f1 = ckpt.get('best_val_f1', -1.0)
            print(f"[Resume] Continuing from epoch {start_epoch}, best val F1: {best_val_f1:.4f}")
        except Exception as e:
            print(f"[Warning] Could not resume from checkpoint: {e}")
            print("[Warning] Starting from scratch")

    # Training loop
    total_start = time.time()

    for epoch in range(start_epoch, EPOCHS):
        epoch_start = time.time()
        model.train()

        train_loss = 0.0
        train_correct = 0
        train_total = 0

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}", disable=False)

        for batch_idx, (xb, yb) in enumerate(pbar):
            if DRY_RUN and batch_idx >= 10:
                break

            xb, yb = xb.to(DEVICE), yb.to(DEVICE)

            # Forward pass with mixed precision
            if USE_AMP:
                with torch.amp.autocast('cuda'):
                    logits = model(xb)
                    loss = criterion(logits, yb) / GRAD_ACCUM_STEPS
            else:
                logits = model(xb)
                loss = criterion(logits, yb) / GRAD_ACCUM_STEPS

            # Backward pass
            if USE_AMP:
                scaler.scale(loss).backward()
            else:
                loss.backward()

            # Gradient accumulation
            if (batch_idx + 1) % GRAD_ACCUM_STEPS == 0:
                if CLIP_NORM > 0:
                    if USE_AMP:
                        scaler.unscale_(optimizer)
                    nn.utils.clip_grad_norm_(model.parameters(), CLIP_NORM)

                if USE_AMP:
                    scaler.step(optimizer)
                    scaler.update()
                else:
                    optimizer.step()

                optimizer.zero_grad()

            # Statistics
            train_loss += loss.item() * GRAD_ACCUM_STEPS
            preds = logits.argmax(dim=1)
            train_correct += (preds == yb).sum().item()
            train_total += yb.size(0)

            # Update progress bar
            pbar.set_postfix({
                'loss': f"{train_loss / (batch_idx + 1):.4f}",
                'acc': f"{100.0 * train_correct / train_total:.2f}%"
            })

        # Epoch metrics
        avg_train_loss = train_loss / len(train_loader)
        train_acc = train_correct / train_total

        # Validation
        print("\n[Validation]")
        val_metrics = evaluate_detailed(model, val_loader, DEVICE)

        # Update history
        history['train_loss'].append(avg_train_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_metrics['accuracy'])
        history['val_f1_macro'].append(val_metrics['f1_macro'])
        history['val_f1_weighted'].append(val_metrics['f1_weighted'])
        history['learning_rates'].append(optimizer.param_groups[0]['lr'])

        # Scheduler step
        scheduler.step()

        epoch_time = time.time() - epoch_start

        # Print epoch summary
        print(f"\nEpoch {epoch+1}/{EPOCHS} - {epoch_time:.1f}s")
        print(f"  Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"  Val Acc: {val_metrics['accuracy']:.4f}, Val F1 (macro): {val_metrics['f1_macro']:.4f}")
        print(f"  LR: {optimizer.param_groups[0]['lr']:.6f}")

        # Save checkpoints
        checkpoint = {
            'epoch': epoch + 1,
            'model_state': model.state_dict(),
            'optimizer_state': optimizer.state_dict(),
            'scheduler_state': scheduler.state_dict(),
            'scaler_state': scaler.state_dict() if USE_AMP else None,
            'history': history,
            'best_val_f1': best_val_f1,
            'config': {
                'lr': LR,
                'batch_size': BATCH_SIZE,
                'epochs': EPOCHS,
                'seed': SEED
            }
        }

        # Save last checkpoint
        torch.save(checkpoint, CHECKPOINT_DIR / "last_checkpoint.pth")

        # Save best checkpoint
        if val_metrics['f1_macro'] > best_val_f1:
            best_val_f1 = val_metrics['f1_macro']
            torch.save(checkpoint, CHECKPOINT_DIR / "best_model.pth")
            print(f"  [Checkpoint] New best model saved! F1: {best_val_f1:.4f}")
            patience_counter = 0
        else:
            patience_counter += 1

        # Early stopping
        if EARLY_STOP_PATIENCE > 0 and patience_counter >= EARLY_STOP_PATIENCE:
            print(f"\n[Early Stop] No improvement for {EARLY_STOP_PATIENCE} epochs. Stopping.")
            break

        # Save metrics per epoch
        epoch_metrics = {
            'epoch': epoch + 1,
            'timestamp': datetime.now().isoformat(),
            'train_loss': avg_train_loss,
            'train_acc': train_acc,
            'val_metrics': val_metrics,
            'lr': optimizer.param_groups[0]['lr']
        }

        metrics_file = CHECKPOINT_DIR / f"metrics_epoch_{epoch+1:03d}.json"
        with open(metrics_file, 'w') as f:
            json.dump(epoch_metrics, f, indent=2)

    # Training complete
    total_time = time.time() - total_start
    print("\n" + "=" * 60)
    print("TRAINING COMPLETE")
    print("=" * 60)
    print(f"Total time: {total_time / 60:.1f} minutes")
    print(f"Best validation F1 (macro): {best_val_f1:.4f}")

    # Save final history
    with open(CHECKPOINT_DIR / "training_history.json", 'w') as f:
        json.dump(history, f, indent=2)

    # Test set evaluation if available
    if test_loader is not None:
        print("\n[Test Set Evaluation]")
        test_metrics = evaluate_detailed(model, test_loader, DEVICE)
        print(f"  Test Acc: {test_metrics['accuracy']:.4f}")
        print(f"  Test F1 (macro): {test_metrics['f1_macro']:.4f}")
        print(f"  Test F1 (weighted): {test_metrics['f1_weighted']:.4f}")

        # Save test metrics
        with open(CHECKPOINT_DIR / "test_metrics.json", 'w') as f:
            json.dump({
                'accuracy': test_metrics['accuracy'],
                'f1_macro': test_metrics['f1_macro'],
                'f1_weighted': test_metrics['f1_weighted'],
                'per_class': test_metrics['per_class'],
                'confusion_matrix': test_metrics['confusion_matrix']
            }, f, indent=2)

    return history, best_val_f1


## 8. VISUALIZATION & EVALUATION
High-quality plots for training curves, confusion matrix, ROC, and PR curves.


In [11]:
# Comprehensive visualization functions
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
from sklearn.preprocessing import label_binarize
import numpy as np
from datetime import datetime

def plot_training_curves(history, save_dir=FIGURES_DIR):
    """Plot training and validation metrics over epochs"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle('Training Progress', fontsize=16, fontweight='bold')

    # Loss
    ax = axes[0, 0]
    ax.plot(history.get('train_loss', []), label='Train Loss', marker='o', markersize=4)
    if 'val_loss' in history:
        ax.plot(history.get('val_loss', []), label='Val Loss', marker='s', markersize=4)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Loss')
    ax.set_title('Loss Over Time')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Accuracy
    ax = axes[0, 1]
    ax.plot(history.get('train_acc', []), label='Train Acc', marker='o', markersize=4)
    ax.plot(history.get('val_acc', []), label='Val Acc', marker='s', markersize=4)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('Accuracy')
    ax.set_title('Accuracy Over Time')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # F1 Score
    ax = axes[1, 0]
    if 'val_f1_macro' in history:
        ax.plot(history['val_f1_macro'], label='Val F1 (Macro)', marker='o', markersize=4)
    if 'val_f1_weighted' in history:
        ax.plot(history['val_f1_weighted'], label='Val F1 (Weighted)', marker='s', markersize=4)
    ax.set_xlabel('Epoch')
    ax.set_ylabel('F1 Score')
    ax.set_title('F1 Score Over Time')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Learning Rate
    ax = axes[1, 1]
    if 'learning_rates' in history:
        ax.plot(history['learning_rates'], marker='o', markersize=4, color='orange')
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Learning Rate')
        ax.set_title('Learning Rate Schedule')
        ax.set_yscale('log')
        ax.grid(True, alpha=0.3)

    plt.tight_layout()
    save_path = save_dir / f"training_curves_{timestamp}.png"
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[Saved] {save_path}")
    return save_path

def plot_confusion_matrix(cm, labels=LABEL_ORDER, save_dir=FIGURES_DIR, epoch=None):
    """Plot confusion matrix heatmap"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    fig, ax = plt.subplots(figsize=(10, 8))

    # Normalize for better visualization
    cm_norm = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=labels, yticklabels=labels, ax=ax,
                cbar_kws={'label': 'Normalized Count'})

    ax.set_xlabel('Predicted Label', fontsize=12, fontweight='bold')
    ax.set_ylabel('True Label', fontsize=12, fontweight='bold')
    title = f'Confusion Matrix'
    if epoch is not None:
        title += f' (Epoch {epoch})'
    ax.set_title(title, fontsize=14, fontweight='bold')

    plt.tight_layout()
    suffix = f"_epoch{epoch}" if epoch else ""
    save_path = save_dir / f"confusion_matrix{suffix}_{timestamp}.png"
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[Saved] {save_path}")
    return save_path

def plot_per_class_metrics(per_class_metrics, save_dir=FIGURES_DIR):
    """Plot per-class precision, recall, F1 bar chart"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    labels = list(per_class_metrics.keys())
    precision = [per_class_metrics[l]['precision'] for l in labels]
    recall = [per_class_metrics[l]['recall'] for l in labels]
    f1 = [per_class_metrics[l]['f1'] for l in labels]

    x = np.arange(len(labels))
    width = 0.25

    fig, ax = plt.subplots(figsize=(12, 6))
    ax.bar(x - width, precision, width, label='Precision', alpha=0.8)
    ax.bar(x, recall, width, label='Recall', alpha=0.8)
    ax.bar(x + width, f1, width, label='F1 Score', alpha=0.8)

    ax.set_xlabel('Class', fontsize=12, fontweight='bold')
    ax.set_ylabel('Score', fontsize=12, fontweight='bold')
    ax.set_title('Per-Class Metrics', fontsize=14, fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(labels)
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    ax.set_ylim([0, 1.05])

    plt.tight_layout()
    save_path = save_dir / f"per_class_metrics_{timestamp}.png"
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[Saved] {save_path}")
    return save_path

def plot_roc_curves(labels, predictions, probabilities, save_dir=FIGURES_DIR):
    """Plot ROC curves for each class (one-vs-rest)"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Binarize labels
    n_classes = len(LABEL_ORDER)
    y_true = label_binarize(labels, classes=list(range(n_classes)))
    y_score = np.array(probabilities)

    fig, ax = plt.subplots(figsize=(10, 8))

    colors = plt.cm.Set1(np.linspace(0, 1, n_classes))

    for i, (color, label_name) in enumerate(zip(colors, LABEL_ORDER)):
        fpr, tpr, _ = roc_curve(y_true[:, i], y_score[:, i])
        roc_auc = auc(fpr, tpr)
        ax.plot(fpr, tpr, color=color, lw=2,
                label=f'{label_name} (AUC = {roc_auc:.3f})')

    ax.plot([0, 1], [0, 1], 'k--', lw=2, label='Random (AUC = 0.500)')
    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('False Positive Rate', fontsize=12, fontweight='bold')
    ax.set_ylabel('True Positive Rate', fontsize=12, fontweight='bold')
    ax.set_title('ROC Curves (One-vs-Rest)', fontsize=14, fontweight='bold')
    ax.legend(loc='lower right', fontsize=9)
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    save_path = save_dir / f"roc_curves_{timestamp}.png"
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[Saved] {save_path}")
    return save_path

def plot_precision_recall_curves(labels, predictions, probabilities, save_dir=FIGURES_DIR):
    """Plot Precision-Recall curves for each class"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Binarize labels
    n_classes = len(LABEL_ORDER)
    y_true = label_binarize(labels, classes=list(range(n_classes)))
    y_score = np.array(probabilities)

    fig, ax = plt.subplots(figsize=(10, 8))

    colors = plt.cm.Set1(np.linspace(0, 1, n_classes))

    for i, (color, label_name) in enumerate(zip(colors, LABEL_ORDER)):
        precision, recall, _ = precision_recall_curve(y_true[:, i], y_score[:, i])
        avg_precision = average_precision_score(y_true[:, i], y_score[:, i])
        ax.plot(recall, precision, color=color, lw=2,
                label=f'{label_name} (AP = {avg_precision:.3f})')

    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('Recall', fontsize=12, fontweight='bold')
    ax.set_ylabel('Precision', fontsize=12, fontweight='bold')
    ax.set_title('Precision-Recall Curves', fontsize=14, fontweight='bold')
    ax.legend(loc='lower left', fontsize=9)
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    save_path = save_dir / f"precision_recall_curves_{timestamp}.png"
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"[Saved] {save_path}")
    return save_path

def generate_all_plots(history, test_metrics):
    """Generate all visualization plots"""
    print("\n" + "=" * 60)
    print("GENERATING VISUALIZATIONS")
    print("=" * 60)

    # Training curves
    plot_training_curves(history, FIGURES_DIR)

    # Confusion matrix
    plot_confusion_matrix(np.array(test_metrics['confusion_matrix']), LABEL_ORDER, FIGURES_DIR)

    # Per-class metrics
    plot_per_class_metrics(test_metrics['per_class'], FIGURES_DIR)

    # ROC curves
    plot_roc_curves(test_metrics['labels'], test_metrics['predictions'],
                   test_metrics['probabilities'], FIGURES_DIR)

    # PR curves
    plot_precision_recall_curves(test_metrics['labels'], test_metrics['predictions'],
                                 test_metrics['probabilities'], FIGURES_DIR)

    print("=" * 60)


## Smoke tests — quick checks to ensure pipeline integrity

In [13]:
# Basic smoke tests: manifest existence, ability to load one record, model forward pass
errors = []
if not (PROCESSED_DIR / 'manifest.jsonl').exists():
    errors.append('manifest.jsonl missing')
else:
    # try to load first manifest entry
    import json
    with open(PROCESSED_DIR / 'manifest.jsonl','r',encoding='utf-8') as fh:
        first = fh.readline().strip()
    if not first:
        errors.append('manifest empty')
    else:
        rec = json.loads(first)
        path = PROCESSED_DIR / 'records' / Path(rec['path']).name
        try:
            sig, lbl, meta = load_npz(path)
            print('Loaded sample shape', sig.shape, 'label', lbl)
        except Exception as e:
            errors.append(f'load_npz failed: {e}')

# model forward test
try:
    m = model.to(DEVICE)
    m.eval()
    dummy = torch.randn(2,1,TARGET_SAMPLES).to(DEVICE)
    with torch.no_grad():
        out = m(dummy)
    print('Model forward ok, out shape', out.shape)
except Exception as e:
    errors.append(f'model forward failed: {e}')

if errors:
    print('SMOKE TESTS FOUND ISSUES:')
    for e in errors:
        print('-', e)
else:
    print('SMOKE TESTS PASSED')


Loaded sample shape (12, 5000) label 4
Model forward ok, out shape torch.Size([2, 5])
SMOKE TESTS PASSED


## 9. PRE-FLIGHT CHECKLIST
Run this before training to verify all requirements are met.


In [None]:
# Pre-flight checklist
def run_preflight_checks():
    """Verify all prerequisites before training"""
    print("=" * 60)
    print("PRE-FLIGHT CHECKLIST")
    print("=" * 60)

    checks_passed = []
    checks_failed = []

    # Check 1: Dataset directory
    if DATASET_DIR.exists():
        subdirs = [d.name for d in DATASET_DIR.iterdir() if d.is_dir()]
        if subdirs:
            checks_passed.append(f"✓ Dataset directory found with {len(subdirs)} subdirectories")
        else:
            checks_failed.append("✗ Dataset directory exists but is empty")
    else:
        checks_failed.append("✗ Dataset directory not found")

    # Check 2: Unified mapping CSV
    mapping_csv = LOGS_DIR / "unified_label_mapping.csv"
    if mapping_csv.exists():
        df = pd.read_csv(mapping_csv)
        checks_passed.append(f"✓ Unified mapping CSV found ({len(df)} records)")

        # Check mapping coverage
        if 'mapped_label' in df.columns:
            unmapped = df['mapped_label'].isna().sum() + (df['mapped_label'] == '').sum()
            coverage = 100 * (1 - unmapped / len(df))
            if coverage > 80:
                checks_passed.append(f"✓ Mapping coverage: {coverage:.1f}%")
            else:
                checks_failed.append(f"✗ Low mapping coverage: {coverage:.1f}% (recommend >80%)")
    else:
        checks_failed.append("✗ Unified mapping CSV not found")

    # Check 3: Processed records
    manifest_file = PROCESSED_DIR / "manifest.jsonl"
    if manifest_file.exists():
        with open(manifest_file, 'r') as f:
            num_records = sum(1 for _ in f)
        checks_passed.append(f"✓ Manifest found ({num_records} processed records)")
    else:
        checks_failed.append("✗ Manifest not found - run preprocessing first")

    # Check 4: Splits
    splits_file = PROCESSED_DIR / "splits.json"
    if splits_file.exists():
        with open(splits_file, 'r') as f:
            splits = json.load(f)
        train_size = len(splits['paths']['train'])
        val_size = len(splits['paths']['val'])
        test_size = len(splits['paths']['test'])
        checks_passed.append(f"✓ Splits found (train:{train_size}, val:{val_size}, test:{test_size})")
    else:
        checks_failed.append("✗ Splits not found - run preprocessing first")

    # Check 5: GPU availability
    if torch.cuda.is_available():
        mem_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
        checks_passed.append(f"✓ GPU available: {torch.cuda.get_device_name(0)} ({mem_gb:.1f} GB)")
    else:
        checks_passed.append("⚠ No GPU detected - training will use CPU (slower)")

    # Check 6: Disk space
    try:
        import shutil
        total, used, free = shutil.disk_usage(str(ROOT))
        free_gb = free / (1024**3)
        if free_gb > 50:
            checks_passed.append(f"✓ Sufficient disk space: {free_gb:.1f} GB free")
        else:
            checks_failed.append(f"✗ Low disk space: {free_gb:.1f} GB free (recommend >50GB)")
    except Exception:
        pass

    # Print results
    for check in checks_passed:
        print(check)

    for check in checks_failed:
        print(check)

    print("=" * 60)

    if checks_failed:
        print("\n⚠ WARNINGS DETECTED - Review failed checks before proceeding")
        return False
    else:
        print("\n✓ ALL CHECKS PASSED - Ready to train!")
        return True

# Run checks
run_preflight_checks()
    # final eval
    rep = evaluate(model, te)
    print('Test eval:', rep)
    # confusion matrix plot
    if 'confusion' in rep:
        plot_confusion(rep['confusion'])
    return rep

# Example usage:
# run_full(limit=200, do_preprocess=False, do_train=False)
print('Orchestrator ready. To run: run_full(limit=500, do_preprocess=False, do_train=True)')


In [15]:
# ===== QUICK SMOKE TRAIN (256 samples) =====
# Paste this as a single cell and run it.
import os, json, time
from pathlib import Path
import numpy as np
import torch

# safety: ensure dirs exist
PROCESSED_DIR = Path(PROCESSED_DIR) if 'PROCESSED_DIR' in globals() else Path('artifacts/processed')
FIGURES_DIR = PROCESSED_DIR / 'figures'
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
FIGURES_DIR.mkdir(parents=True, exist_ok=True)

# small safety aliases
USE_AMP = bool(globals().get('USE_MIXED_PRECISION', False))
BATCH_SIZE = int(globals().get('BATCH_SIZE', 8))
EPOCHS = int(1)  # single epoch smoke test
LIMIT = 256      # number of train samples to use for smoke test

print("SMOKE RUN CONFIG: LIMIT=", LIMIT, "BATCH_SIZE=", BATCH_SIZE, "DEVICE=", globals().get('DEVICE', 'cpu'), "USE_AMP=", USE_AMP)

# helper to run safely
try:
    # build loaders using your notebook's build_loaders() function
    tr_loader, val_loader, te_loader = build_loaders(limit=LIMIT)
    print("Loader sizes (train/val/test):", len(tr_loader.dataset), len(val_loader.dataset), len(te_loader.dataset))
except Exception as e:
    raise RuntimeError("Failed to build loaders — check splits.json and PROCESSED_DIR. Error: " + str(e))

# quick check on first batch shape
xb, yb = next(iter(tr_loader))
print("sample batch shapes:", xb.shape, yb.shape)

# ensure model, train, evaluate functions are present
if 'model' not in globals():
    raise RuntimeError("Model object 'model' not found in notebook. Define it before running this cell.")

# run training (one epoch) and time it
start = time.time()
history = train(model, tr_loader, val_loader, epochs=EPOCHS, lr=float(globals().get('LR', 1e-3)))
elapsed = time.time() - start
print(f"Smoke training finished in {elapsed:.1f}s")

# save history and plot
try:
    from pathlib import Path
    with open(PROCESSED_DIR / "smoke_training_history.json","w",encoding="utf-8") as fh:
        json.dump(history, fh, indent=2)
    plot_history(history, savepath=FIGURES_DIR/'smoke_training_curves.png')
except Exception as e:
    print("Could not save/plot history:", e)

# final quick eval on test split
rep = evaluate(model, te_loader)
print("Test eval:", rep)
if 'confusion' in rep:
    try:
        plot_confusion(rep['confusion'], savepath=FIGURES_DIR/'smoke_confusion.png')
    except Exception as e:
        print("Could not plot confusion:", e)

print("Artifacts (some):", list(PROCESSED_DIR.glob("*"))[:10])
print("Figures (some):", list(FIGURES_DIR.glob("*"))[:10])


SMOKE RUN CONFIG: LIMIT= 256 BATCH_SIZE= 8 DEVICE= cpu USE_AMP= False
Loader sizes (train/val/test): 256 51 51
sample batch shapes: torch.Size([8, 1, 5000]) torch.Size([8])


  scaler = torch.cuda.amp.GradScaler(enabled=USE_AMP)
  with torch.cuda.amp.autocast(enabled=USE_AMP):


Epoch 1/1 loss=1.3237 val_f1=1.0000
Smoke training finished in 30.8s
Saved D:\ecg-research\artifacts\processed\figures\smoke_training_curves.png




Test eval: {'acc': 1.0, 'f1_macro': 1.0, 'confusion': [[51]]}
Saved D:\ecg-research\artifacts\processed\figures\smoke_confusion.png
Artifacts (some): [WindowsPath('D:/ecg-research/artifacts/processed/records'), WindowsPath('D:/ecg-research/artifacts/processed/checkpoints'), WindowsPath('D:/ecg-research/artifacts/processed/manifest.jsonl'), WindowsPath('D:/ecg-research/artifacts/processed/progress.json'), WindowsPath('D:/ecg-research/artifacts/processed/splits.json'), WindowsPath('D:/ecg-research/artifacts/processed/label_map.json'), WindowsPath('D:/ecg-research/artifacts/processed/labels.npy'), WindowsPath('D:/ecg-research/artifacts/processed/figures'), WindowsPath('D:/ecg-research/artifacts/processed/checkpoint_ep1.pth'), WindowsPath('D:/ecg-research/artifacts/processed/best_model.pth')]
Figures (some): [WindowsPath('D:/ecg-research/artifacts/processed/figures/smoke_training_curves.png'), WindowsPath('D:/ecg-research/artifacts/processed/figures/smoke_confusion.png')]


## Final notes

- For a quick smoke run set `ECG_PREPROCESS_LIMIT=5000` in your environment and run the preprocessing cell.
- For full production, run headless overnight: `jupyter nbconvert --to notebook --execute notebooks/master_pipeline.ipynb --output logs/preprocess_run.ipynb`
- If you want me to generate a variant that uses TFRecords, ONNX export, MLflow logging, or multi-label training — say which and I'll produce it.
