# Combined ECG Pipeline: MIT-BIH (3-class, 2-lead) & INCART (2-lead subset to match MIT-BIH)

This notebook merges the workflows from your two notebooks:

- **MIT-BIH:** start from raw WFDB records → extract two leads → map to **3 classes (N/S/V)** → segment around R-peaks → **SMOTE** → train.
- **INCART:** select **two leads analogous to MIT-BIH** (Lead II + V1 if available, otherwise V5) → map to **3 classes (N/S/V)** → segment around R-peaks → **SMOTE** → train.

It uses a **shared model** (1D CNN + BatchNorm) and produces comparable metrics for each dataset.

## Requirements

Run the cell below if you're in a fresh environment (e.g., Colab). If packages are already installed, you can skip.

In [None]:
# If running in Colab or a minimal environment, uncomment these:
!pip install wfdb==4.1.2 imbalanced-learn==0.12.2 numpy pandas scipy scikit-learn tensorflow==2.15.0 matplotlib


Collecting wfdb==4.1.2
  Using cached wfdb-4.1.2-py3-none-any.whl.metadata (4.3 kB)
Collecting imbalanced-learn==0.12.2
  Using cached imbalanced_learn-0.12.2-py3-none-any.whl.metadata (8.2 kB)
[31mERROR: Could not find a version that satisfies the requirement tensorflow==2.15.0 (from versions: 2.16.0rc0, 2.16.1, 2.16.2, 2.17.0rc0, 2.17.0rc1, 2.17.0, 2.17.1, 2.18.0rc0, 2.18.0rc1, 2.18.0rc2, 2.18.0, 2.18.1, 2.19.0rc0, 2.19.0, 2.19.1, 2.20.0rc0, 2.20.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow==2.15.0[0m[31m
[0m

In [None]:
# upgrade pip first (often fixes resolver issues)
!pip install -U pip

# TensorFlow 2.18–2.20 works with this notebook
!pip install "tensorflow>=2.18,<2.21" wfdb imbalanced-learn numpy pandas scipy scikit-learn matplotlib
# Optional GPU build on supported NVIDIA setups:
# pip install "tensorflow[and-cuda]>=2.18,<2.21"




In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Configuration

- **Data locations:** point `MITBIH_DIR` and `INCART_DIR` to folders with WFDB records (`.dat/.hea`) and annotations (`atr`).
- **Leads:** We harmonize to **Lead II** + **V1** if available (fallback to **V5**).
- **Classes:** 3-class mapping (N/S/V) per AAMI-style grouping.

The pipeline:
1. Auto-detect records in each folder (or specify lists).
2. Resample each record to a shared sample rate (`TARGET_FS`) for consistent window sizes across datasets.
3. Extract R-peak–centered windows per beat with z-score normalization.
4. Map beat symbols to `{'N','S','V'}`; drop others.
5. Split (patient/record-wise) into **train/test** (configurable).
6. **Apply SMOTE to train only** (flatten → SMOTE → reshape).
7. Train a shared **1D CNN + BatchNorm** and evaluate.


In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
import wfdb
from wfdb import processing
from scipy.signal import resample_poly
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import tensorflow as tf
from tensorflow.keras import utils

# USER CONFIG
MITBIH_DIR = "/content/drive/MyDrive/ml project/data/mitdb"  # <-- change to your MIT-BIH data folder
INCART_DIR = "/content/drive/MyDrive/ml project/data/incart"   # <-- change to your INCART data folder

MITBIH_RECORDS = None  # e.g., ['100','101','102', ...]
INCART_RECORDS = None  # e.g., ['I01','I02', ...]

TARGET_FS = 360  # MIT-BIH is 360 Hz; INCART will be upsampled from ~257 Hz
PRE_SEC  = 0.20
POST_SEC = 0.40
TEST_SIZE = 0.2
RANDOM_STATE = 42
LOGDIR = "logs/ecg_joint"
EPOCHS = 25
BATCH_SIZE = 128

PRIMARY_LEAD_NAMES = ['MLII', 'II', 'Lead II', 'lead_II', 'le II', 'le2']
SECONDARY_LEAD_PRIORITY = ['V1', 'V5']

N_SET = set(list("NLRej"))
S_SET = set(['A','a','J','S'])
V_SET = set(['V','E'])

def symbol_to_class(sym):
    if sym in N_SET:
        return 'N'
    if sym in S_SET:
        return 'S'
    if sym in V_SET:
        return 'V'
    return None

CLASS_ORDER = ['N','S','V']
CLASS_TO_IDX = {c:i for i,c in enumerate(CLASS_ORDER)}
IDX_TO_CLASS = {i:c for c,i in CLASS_TO_IDX.items()}

def one_hot(y_idx, num_classes=3):
    return utils.to_categorical(y_idx, num_classes=num_classes)


## Utilities: Lead Selection, Resampling, Segmentation, and Loading

In [None]:
def pick_two_leads(signal_names):
    # Choose two leads analogous to MIT-BIH: primary = II/MLII; secondary = V1, fallback V5, else any V*
    names = list(signal_names)
    # Primary
    primary_idx = None
    for alias in PRIMARY_LEAD_NAMES:
        if alias in names:
            primary_idx = names.index(alias)
            break
    if primary_idx is None:
        for i, nm in enumerate(names):
            if nm.strip().upper() in ['MLII','II','LEAD II','LEAD_II']:
                primary_idx = i
                break
    if primary_idx is None:
        raise ValueError(f"Primary lead (II/MLII) not found in {names}")
    # Secondary
    secondary_idx = None
    for sec in SECONDARY_LEAD_PRIORITY:
        if sec in names:
            secondary_idx = names.index(sec)
            break
    if secondary_idx is None:
        for i, nm in enumerate(names):
            if nm.strip().upper().startswith('V'):
                secondary_idx = i
                break
    if secondary_idx is None:
        raise ValueError(f"No chest lead (V1/V5/any V*) found in {names}")
    if secondary_idx == primary_idx:
        for i, nm in enumerate(names):
            if i != primary_idx and nm.strip().upper().startswith('V'):
                secondary_idx = i
                break
    return primary_idx, secondary_idx

def resample_to_target(sig, fs, target_fs=TARGET_FS):
    if fs == target_fs:
        return sig
    from fractions import Fraction
    frac = Fraction(target_fs, int(fs)).limit_denominator(1000)
    up, down = frac.numerator, frac.denominator
    return resample_poly(sig, up, down)

def extract_windows(signals, ann_samples, ann_symbols, fs, pre_sec=PRE_SEC, post_sec=POST_SEC):
    pre = int(round(pre_sec * fs))
    post = int(round(post_sec * fs))
    win_len = pre + post
    X_list, y_list = [], []
    n = signals.shape[0]
    for s, sym in zip(ann_samples, ann_symbols):
        cls = symbol_to_class(sym)
        if cls is None:
            continue
        start = s - pre
        end = s + post
        if start < 0 or end > n:
            continue
        seg = signals[start:end, :]
        seg = (seg - seg.mean(axis=0, keepdims=True)) / (seg.std(axis=0, keepdims=True) + 1e-6)
        X_list.append(seg.astype(np.float32))
        y_list.append(CLASS_TO_IDX[cls])
    if not X_list:
        return np.empty((0, win_len, 2), dtype=np.float32), np.empty((0,), dtype=int)
    return np.stack(X_list, axis=0), np.array(y_list, dtype=int)

def autodetect_records(folder: str):
    p = Path(folder)
    recs = sorted([f.stem for f in p.glob("*.hea")])
    return recs

def load_dataset_from_folder(folder, record_ids=None, dataset_name=""):
    if record_ids is None:
        record_ids = autodetect_records(folder)
    X_all, y_all, rec_all = [], [], []
    for rid in record_ids:
        rec_path = str(Path(folder) / rid)
        try:
            rec = wfdb.rdrecord(rec_path)
        except Exception as e:
            print(f"[WARN] Skipping {rid}: rdrecord failed ({e})")
            continue
        sig = rec.p_signal
        names = rec.sig_name
        fs = rec.fs
        try:
            i_primary, i_secondary = pick_two_leads(names)
        except ValueError as e:
            print(f"[WARN] Skipping {rid}: {e}")
            continue
        sig2 = np.stack([sig[:, i_primary], sig[:, i_secondary]], axis=1)
        sig2_rs = np.column_stack([
            resample_to_target(sig2[:,0], fs, TARGET_FS),
            resample_to_target(sig2[:,1], fs, TARGET_FS),
        ])
        try:
            ann = wfdb.rdann(rec_path, 'atr')
            ann_samples = np.array(ann.sample, dtype=int)
            ann_symbols = np.array(ann.symbol)
        except Exception as e:
            print(f"[WARN] Skipping {rid}: rdann failed ({e})")
            continue
        X_rec, y_rec = extract_windows(sig2_rs, ann_samples, ann_symbols, TARGET_FS)
        if X_rec.shape[0] == 0:
            print(f"[INFO] No usable beats for {rid}")
            continue
        X_all.append(X_rec)
        y_all.append(y_rec)
        rec_all.append(np.full(y_rec.shape, fill_value=rid, dtype=object))
        print(f"[OK] {dataset_name} {rid}: beats={X_rec.shape[0]} lead_primary={names[i_primary]} lead_secondary={names[i_secondary]}")
    if not X_all:
        return np.empty((0, int(round((PRE_SEC+POST_SEC)*TARGET_FS)), 2), dtype=np.float32), \
               np.empty((0,), dtype=int), np.empty((0,), dtype=object)
    X = np.concatenate(X_all, axis=0)
    y = np.concatenate(y_all, axis=0)
    rec_idx = np.concatenate(rec_all, axis=0)
    return X, y, rec_idx

def record_wise_split(rec_idx, test_size=TEST_SIZE, random_state=RANDOM_STATE):
    import numpy as _np
    from sklearn.model_selection import train_test_split as _tts
    uniq = _np.unique(rec_idx)
    train_recs, test_recs = _tts(uniq, test_size=test_size, random_state=random_state, shuffle=True)
    train_mask = _np.isin(rec_idx, train_recs)
    test_mask = _np.isin(rec_idx, test_recs)
    return train_mask, test_mask

def apply_smote(X, y_idx):
    N, T, C = X.shape
    X_flat = X.reshape(N, T*C)
    smote = SMOTE(random_state=RANDOM_STATE)
    Xr, yr = smote.fit_resample(X_flat, y_idx)
    Xr = Xr.reshape(Xr.shape[0], T, C).astype(np.float32)
    return Xr, yr

def build_cnn_model(input_shape, num_classes=3):
    inp = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Conv1D(32, 7, padding='same', activation='relu')(inp)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(2)(x)
    x = tf.keras.layers.Conv1D(64, 5, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.MaxPooling1D(2)(x)
    x = tf.keras.layers.Conv1D(128, 3, padding='same', activation='relu')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    out = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    model = tf.keras.Model(inp, out)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def describe_distribution(y_idx, title=""):
    import pandas as _pd
    import numpy as _np
    counts = _pd.Series(y_idx).value_counts().reindex([0,1,2], fill_value=0)
    df = _pd.DataFrame({"class": [IDX_TO_CLASS[i] for i in counts.index], "count": counts.values})
    print("\n"+title)
    print(df)
    return df


## MIT-BIH → 3-class, 2-lead → SMOTE → Train

In [None]:
# Load MIT-BIH
mit_X, mit_y, mit_rec = load_dataset_from_folder(MITBIH_DIR, MITBIH_RECORDS, dataset_name="MIT-BIH")
win_len = mit_X.shape[1] if mit_X.size > 0 else int(round((PRE_SEC+POST_SEC)*TARGET_FS))
print("MIT-BIH shapes:", mit_X.shape, mit_y.shape)

# Split by record
mit_train_mask, mit_test_mask = record_wise_split(mit_rec)
Xtr_mit, ytr_mit = mit_X[mit_train_mask], mit_y[mit_train_mask]
Xte_mit, yte_mit = mit_X[mit_test_mask], mit_y[mit_test_mask]

describe_distribution(ytr_mit, "MIT-BIH Train (pre-SMOTE)")
describe_distribution(yte_mit, "MIT-BIH Test")

# Apply SMOTE to train only
if Xtr_mit.shape[0] > 0:
    Xtr_mit_sm, ytr_mit_sm = apply_smote(Xtr_mit, ytr_mit)
    describe_distribution(ytr_mit_sm, "MIT-BIH Train (post-SMOTE)")
else:
    Xtr_mit_sm, ytr_mit_sm = Xtr_mit, ytr_mit
    print("[WARN] No MIT-BIH training beats; SMOTE skipped.")

# Save preprocessed datasets
np.savez_compressed("mitbih_3class_2lead.npz",
                    X_train=Xtr_mit_sm, y_train=ytr_mit_sm,
                    X_test=Xte_mit, y_test=yte_mit, rec_test=mit_rec[mit_test_mask])
print("Saved -> mitbih_3class_2lead.npz")

# Build & train model
mit_model = build_cnn_model(input_shape=(win_len, 2), num_classes=3)
tb = tf.keras.callbacks.TensorBoard(log_dir=str(Path(LOGDIR) / "mitbih"), histogram_freq=0)
es = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor='val_accuracy', mode='max')

if Xtr_mit_sm.shape[0] > 0:
    mit_hist = mit_model.fit(
        Xtr_mit_sm, one_hot(ytr_mit_sm, 3),
        validation_split=0.15,
        epochs=EPOCHS, batch_size=BATCH_SIZE,
        callbacks=[tb, es], verbose=2
    )
else:
    print("[WARN] MIT-BIH training skipped (no data).")

# Evaluate
if Xte_mit.shape[0] > 0:
    yhat = mit_model.predict(Xte_mit, verbose=0).argmax(axis=1)
    print("MIT-BIH Classification Report:")
    print(classification_report(yte_mit, yhat, target_names=CLASS_ORDER, digits=4))
    print("Confusion Matrix:")
    print(confusion_matrix(yte_mit, yhat))


[OK] MIT-BIH 100: beats=2272 lead_primary=MLII lead_secondary=V5
[OK] MIT-BIH 101: beats=1863 lead_primary=MLII lead_secondary=V1
[WARN] Skipping 102: Primary lead (II/MLII) not found in ['V5', 'V2']
[OK] MIT-BIH 103: beats=2083 lead_primary=MLII lead_secondary=V2
[WARN] Skipping 104: Primary lead (II/MLII) not found in ['V5', 'V2']
[OK] MIT-BIH 105: beats=2567 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 106: beats=2027 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 107: beats=59 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 108: beats=1761 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 109: beats=2529 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 111: beats=2124 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 112: beats=2538 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 113: beats=1794 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 114: beats=1875 lead_primary=MLII lead_secondary=V5
[OK] MIT-BIH 115: beats=1952 lead_primary=MLII lead_secondary=V1
[OK] MIT-BIH 116:

## INCART → 2-lead subset (II + V1/V5) → 3-class → SMOTE → Train

In [None]:
# Load INCART using same functions (lead picking will choose II + V1/V5)
inc_X, inc_y, inc_rec = load_dataset_from_folder(INCART_DIR, INCART_RECORDS, dataset_name="INCART")
print("INCART shapes:", inc_X.shape, inc_y.shape)

# Split by record
inc_train_mask, inc_test_mask = record_wise_split(inc_rec)
Xtr_inc, ytr_inc = inc_X[inc_train_mask], inc_y[inc_train_mask]
Xte_inc, yte_inc = inc_X[inc_test_mask], inc_y[inc_test_mask]

describe_distribution(ytr_inc, "INCART Train (pre-SMOTE)")
describe_distribution(yte_inc, "INCART Test")

# Apply SMOTE to train only
if Xtr_inc.shape[0] > 0:
    Xtr_inc_sm, ytr_inc_sm = apply_smote(Xtr_inc, ytr_inc)
    describe_distribution(ytr_inc_sm, "INCART Train (post-SMOTE)")
else:
    Xtr_inc_sm, ytr_inc_sm = Xtr_inc, ytr_inc
    print("[WARN] No INCART training beats; SMOTE skipped.")

# Save preprocessed datasets
np.savez_compressed("incart_3class_2lead.npz",
                    X_train=Xtr_inc_sm, y_train=ytr_inc_sm,
                    X_test=Xte_inc, y_test=yte_inc, rec_test=inc_rec[inc_test_mask])
print("Saved -> incart_3class_2lead.npz")

# Build & train model (same architecture for comparability)
inc_model = build_cnn_model(input_shape=(win_len, 2), num_classes=3)
tb2 = tf.keras.callbacks.TensorBoard(log_dir=str(Path(LOGDIR) / "incart"), histogram_freq=0)
es2 = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True, monitor='val_accuracy', mode='max')

if Xtr_inc_sm.shape[0] > 0:
    inc_hist = inc_model.fit(
        Xtr_inc_sm, one_hot(ytr_inc_sm, 3),
        validation_split=0.15,
        epochs=EPOCHS, batch_size=BATCH_SIZE,
        callbacks=[tb2, es2], verbose=2
    )
else:
    print("[WARN] INCART training skipped (no data).")

# Evaluate
if Xte_inc.shape[0] > 0:
    yhat = inc_model.predict(Xte_inc, verbose=0).argmax(axis=1)
    print("INCART Classification Report:")
    print(classification_report(yte_inc, yhat, target_names=CLASS_ORDER, digits=4))
    print("Confusion Matrix:")
    print(confusion_matrix(yte_inc, yhat))


[OK] INCART I01: beats=2757 lead_primary=II lead_secondary=V1
[OK] INCART I02: beats=2673 lead_primary=II lead_secondary=V1
[OK] INCART I03: beats=2451 lead_primary=II lead_secondary=V1
[OK] INCART I04: beats=2406 lead_primary=II lead_secondary=V1
[OK] INCART I05: beats=1767 lead_primary=II lead_secondary=V1
[OK] INCART I06: beats=2493 lead_primary=II lead_secondary=V1
[OK] INCART I07: beats=2705 lead_primary=II lead_secondary=V1
[OK] INCART I08: beats=2129 lead_primary=II lead_secondary=V1
[OK] INCART I09: beats=2988 lead_primary=II lead_secondary=V1
[OK] INCART I10: beats=3682 lead_primary=II lead_secondary=V1
[OK] INCART I11: beats=2081 lead_primary=II lead_secondary=V1
[OK] INCART I12: beats=2807 lead_primary=II lead_secondary=V1
[OK] INCART I13: beats=2023 lead_primary=II lead_secondary=V1
[OK] INCART I14: beats=1865 lead_primary=II lead_secondary=V1
[OK] INCART I15: beats=2634 lead_primary=II lead_secondary=V1
[OK] INCART I16: beats=1522 lead_primary=II lead_secondary=V1
[OK] INC

## Optional: Cross-Dataset Evaluation

Evaluate how each model generalizes across datasets (domain shift).

In [None]:
# MIT-BIH model on INCART test
if 'mit_model' in globals() and 'Xte_inc' in globals() and Xte_inc.shape[0] > 0:
    yhat = mit_model.predict(Xte_inc, verbose=0).argmax(axis=1)
    print("MIT-BIH model on INCART Test:")
    print(classification_report(yte_inc, yhat, target_names=CLASS_ORDER, digits=4))

# INCART model on MIT-BIH test
if 'inc_model' in globals() and 'Xte_mit' in globals() and Xte_mit.shape[0] > 0:
    yhat = inc_model.predict(Xte_mit, verbose=0).argmax(axis=1)
    print("INCART model on MIT-BIH Test:")
    print(classification_report(yte_mit, yhat, target_names=CLASS_ORDER, digits=4))


MIT-BIH model on INCART Test:
              precision    recall  f1-score   support

           N     0.8632    0.5135    0.6439     32330
           S     0.0026    0.1786    0.0051       140
           V     0.2105    0.3392    0.2598      5772

    accuracy                         0.4860     38242
   macro avg     0.3588    0.3438    0.3029     38242
weighted avg     0.7615    0.4860    0.5836     38242

INCART model on MIT-BIH Test:
              precision    recall  f1-score   support

           N     0.9047    0.7885    0.8426     20831
           S     0.0112    0.1827    0.0211       197
           V     0.2841    0.2393    0.2598      2211

    accuracy                         0.7311     23239
   macro avg     0.4000    0.4035    0.3745     23239
weighted avg     0.8380    0.7311    0.7802     23239



## Save Trained Models

In [None]:
# Save models after training (if trained)
try:
    mit_model.save("mitbih_3class_2lead_cnn.keras")
    print("Saved -> mitbih_3class_2lead_cnn.keras")
except Exception as e:
    print("MIT-BIH model not saved (maybe no training ran):", e)

try:
    inc_model.save("incart_3class_2lead_cnn.keras")
    print("Saved -> incart_3class_2lead_cnn.keras")
except Exception as e:
    print("INCART model not saved (maybe no training ran):", e)


Saved -> mitbih_3class_2lead_cnn.keras
Saved -> incart_3class_2lead_cnn.keras
