In [None]:
!pip install wfdb --no-deps -q


import numpy as np, sys
print("NumPy:", np.__version__)
print("Python:", sys.version)


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/163.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hNumPy: 2.0.2
Python: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]


In [None]:
!pip install -q tensorflow



In [None]:
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# MIT-BIH records per class (as in paper, Table 1)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target sample counts
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling details
FS = 360           # Hz
WIN_SEC = 10       # 10-second windows
WIN_SAMPLES = FS * WIN_SEC

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")


Config ready.


In [None]:
def read_signal(record_num, fs_target=FS):
    """Read one MIT-BIH record and resample to fs_target."""
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names:
        ch = names.index("v5")
    elif "mlii" in names:
        ch = names.index("mlii")
    else:
        ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
        fs = fs_target
    return sig, fs

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    """Extract 10s windows centered on annotated beats of a given label."""
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        # Symbol filtering: map MIT-BIH symbols to classes
        if label == "NOR" and sym == "N":
            center = idx
        elif label == "LBB" and sym == "L":
            center = idx
        elif label == "RBB" and sym == "R":
            center = idx
        elif label == "PVC" and sym == "V":
            center = idx
        elif label == "APC" and sym in ["A", "a"]:  # APC / atrial premature
            center = idx
        else:
            continue

        # Extract 10-second window around beat
        start = max(center - win_samples//2, 0)
        end = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(cls, "segments extracted:", len(pool[cls]))


NOR segments extracted: 7936
LBB segments extracted: 6600
RBB segments extracted: 5513
PVC segments extracted: 1347
APC segments extracted: 1860


In [None]:
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    # Select train
    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)

    # Remaining for test
    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

# Convert to arrays
train_X_1d = np.stack(train_X_1d)  # [N, 3600]
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y, dtype=np.int64)
test_y     = np.array(test_y, dtype=np.int64)

print("Train:", train_X_1d.shape, {k:int((train_y==v).sum()) for k,v in class_to_idx.items()})
print("Test :", test_X_1d.shape,  {k:int((test_y==v).sum()) for k,v in class_to_idx.items()})


Train: (2100, 3600) {'NOR': 450, 'LBB': 450, 'RBB': 450, 'PVC': 300, 'APC': 450}
Test : (420, 3600) {'NOR': 90, 'LBB': 90, 'RBB': 90, 'PVC': 60, 'APC': 90}


In [None]:
# === Dataset counts table (standalone cell) ===
# Expects: train_y, test_y, class_to_idx already defined somewhere above.

import numpy as np

# Try pandas for a tidy table; fall back to plain text if not available
try:
    import pandas as pd
    from IPython.display import display
except Exception:
    pd = None
    display = print

def to_int_labels(y):
    y = np.asarray(y)
    return y.argmax(axis=1) if y.ndim == 2 else y.astype(int)

labels = ["NOR","LBB","RBB","APC","PVC"]
ytr = to_int_labels(train_y)
yte = to_int_labels(test_y)

train_counts = {c: int(np.sum(ytr == class_to_idx[c])) for c in labels}
test_counts  = {c: int(np.sum(yte == class_to_idx[c])) for c in labels}
total_counts = {c: train_counts[c] + test_counts[c] for c in labels}

if pd is not None:
    df = pd.DataFrame({
        "Train": [train_counts[c] for c in labels],
        "Test":  [test_counts[c]  for c in labels],
        "Total": [total_counts[c] for c in labels],
    }, index=labels)
    # Grand totals row
    df.loc["Grand total"] = df.sum(numeric_only=True)
    display(df.astype("int64"))
else:
    print(f"{'Class':>6} | {'Train':>5} | {'Test':>5} | {'Total':>5}")
    print("-"*32)
    for c in labels:
        print(f"{c:>6} | {train_counts[c]:5d} | {test_counts[c]:5d} | {total_counts[c]:5d}")
    print("-"*32)
    print(f"{'TOTAL':>6} | {sum(train_counts.values()):5d} | {sum(test_counts.values()):5d} | {sum(total_counts.values()):5d}")

# Optional: quick check against the paper's fixed counts if TARGET_* are defined
try:
    ok = all(train_counts[c]==TARGET_TRAIN[c] and test_counts[c]==TARGET_TEST[c] for c in labels)
    print("Matches paper counts (2100 train, 420 test)" if ok else "Counts differ from paper setup.")
except NameError:
    pass


Unnamed: 0,Train,Test,Total
NOR,450,90,540
LBB,450,90,540
RBB,450,90,540
APC,450,90,540
PVC,300,60,360
Grand total,2100,420,2520


Matches paper counts (2100 train, 420 test)


In [None]:
import numpy as np
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

STFT_NPERSEG  = 512   # paper
STFT_NOVERLAP = 256   # reasonable 50% overlap (paper doesn’t state; this is standard)
SPEC_SIZE      = 256   # paper: 256x256 image

def stft_to_image(seg, fs=FS):
    # Symmetric Hann per paper formula (M-1 in the denominator)
    hann_sym = signal.windows.hann(STFT_NPERSEG, sym=True)

    f, t, Z = signal.stft(
        seg, fs=fs,
        window=hann_sym,           # exact Hanning window
        nperseg=STFT_NPERSEG,      # 512
        noverlap=STFT_NOVERLAP,    # 50% overlap (paper doesn’t specify; 256 is standard)
        nfft=STFT_NPERSEG,         # 512-point FFT
        boundary=None,             # no zero-padding at the ends
        padded=False               # avoid internal padding
    )

    # magnitude → dB → [0,255] grayscale
    S = np.abs(Z) + 1e-12
    S_db = 20 * np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min())
    img = Image.fromarray((S01 * 255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE), Image.BICUBIC)
    return np.array(img, np.uint8)

def make_spec_set(X_1d, y):
    imgs = [stft_to_image(seg, FS) for seg in X_1d]
    X = np.expand_dims(np.array(imgs, np.float32) / 255.0, axis=-1)
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)
print("Train 2D:", train_X_2d.shape, "Test 2D:", test_X_2d.shape)

Train 2D: (2100, 256, 256, 1) Test 2D: (420, 256, 256, 1)


In [None]:
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 1), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)  # width not fixed in paper; 64 keeps it small and stable
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out, name="ECG_2D_CNN")

model_2d = make_2d_cnn()
model_2d.summary()


In [None]:
from tensorflow import keras

LR = 0.001
BATCH = 500
EPOCHS = 100

model_2d.compile(optimizer=keras.optimizers.Adam(learning_rate=LR),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])

hist_2d = model_2d.fit(
    train_X_2d, train_y_oh,
    validation_data=(test_X_2d, test_y_oh),
    epochs=EPOCHS, batch_size=BATCH, verbose=1
)


Epoch 1/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 2s/step - accuracy: 0.8630 - loss: 0.6687 - val_accuracy: 0.8310 - val_loss: 0.6357
Epoch 2/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 225ms/step - accuracy: 0.9102 - loss: 0.2533 - val_accuracy: 0.8952 - val_loss: 0.3231
Epoch 3/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 225ms/step - accuracy: 0.9320 - loss: 0.1881 - val_accuracy: 0.9214 - val_loss: 0.2301
Epoch 4/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 223ms/step - accuracy: 0.9542 - loss: 0.1227 - val_accuracy: 0.9643 - val_loss: 0.1229
Epoch 5/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 228ms/step - accuracy: 0.9846 - loss: 0.0659 - val_accuracy: 0.9690 - val_loss: 0.1368
Epoch 6/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 225ms/step - accuracy: 0.9806 - loss: 0.0729 - val_accuracy: 0.9714 - val_loss: 0.1108
Epoch 7/100
[1m5/5[0m [32m━━━━━━━

In [None]:
import numpy as np

tr = np.array(hist_2d.history['accuracy'])
vl = np.array(hist_2d.history.get('val_accuracy', []))

print(f"Final train acc     : {tr[-1]*100:.2f}%")
if vl.size:
    print(f"Final val acc       : {vl[-1]*100:.2f}%")
    print(f"Average train acc   : {tr.mean()*100:.2f}%")
    print(f"Average val acc     : {vl.mean()*100:.2f}%")
    print(f"Best val acc        : {vl.max()*100:.2f}%  (epoch {vl.argmax()+1})")
else:
    print(f"Average train acc   : {tr.mean()*100:.2f}%")


NameError: name 'hist_2d' is not defined

## FULL CODE

In [None]:
# ============================================================
# ECG Arrhythmia Classification (Paper-faithful full pipeline)
# MIT-BIH: STFT (512 Hann) -> 256x256 spectrogram -> 2D-CNN
# ============================================================

# -------------------------
# 0) Config & dependencies
# -------------------------
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Records per class (from the paper)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target split (paper): NOR/LBB/RBB/APC (450 train, 90 test), PVC (300 train, 60 test)
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling
FS = 360           # Hz
WIN_SEC = 10       # seconds
WIN_SAMPLES = FS * WIN_SEC

# STFT / Spectrogram (paper/stable choices)
STFT_NPERSEG  = 512   # paper window size
STFT_NOVERLAP = 256   # 50% overlap (paper not explicit; standard)
SPEC_SIZE     = 256   # spectrogram image size (HxW)

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")

# ------------------------------------------
# 1) Read signals & extract labeled segments
# ------------------------------------------
def read_signal(record_num, fs_target=FS):
    """Read one MIT-BIH record and resample to fs_target."""
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names:
        ch = names.index("v5")
    elif "mlii" in names:
        ch = names.index("mlii")
    else:
        ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
        fs = fs_target
    return sig, fs

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    """
    Extract 10s windows centered on annotated beats of a given label.

    Label mapping (MIT-BIH symbols):
      NOR: 'N'
      LBB: 'L'
      RBB: 'R'
      PVC: 'V'
      APC: 'A' or 'a'
    """
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        if   label == "NOR" and sym == "N": center = idx
        elif label == "LBB" and sym == "L": center = idx
        elif label == "RBB" and sym == "R": center = idx
        elif label == "PVC" and sym == "V": center = idx
        elif label == "APC" and sym in ("A","a"): center = idx
        else:
            continue

        # 10s window centered on the beat
        start = max(center - win_samples//2, 0)
        end   = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(f"{cls}: segments extracted = {len(pool[cls])}")

# ---------------------------------------
# 2) Paper's fixed train/test split (1D)
# ---------------------------------------
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)

    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

# Arrays
train_X_1d = np.stack(train_X_1d)  # [N, 3600]
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y, dtype=np.int64)
test_y     = np.array(test_y, dtype=np.int64)

print("Train:", train_X_1d.shape, {k:int((train_y==v).sum()) for k,v in class_to_idx.items()})
print("Test :", test_X_1d.shape,  {k:int((test_y==v).sum()) for k,v in class_to_idx.items()})

# Optional: small counts table
try:
    import pandas as pd
    df = pd.DataFrame({
        "Train":[int((train_y==class_to_idx[c]).sum()) for c in class_to_idx],
        "Test":[int((test_y==class_to_idx[c]).sum()) for c in class_to_idx]
    }, index=list(class_to_idx.keys()))
    df["Total"] = df["Train"]+df["Test"]
    display(df)
except Exception:
    pass

# ------------------------------------------
# 3) STFT -> 256x256 grayscale spectrograms
# ------------------------------------------
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

def stft_to_image(seg, fs=FS):
    """
    STFT with symmetric Hann (Hanning) window of 512 (paper),
    50% overlap, magnitude -> dB -> [0,255], resized to 256x256.
    """
    hann_sym = signal.windows.hann(STFT_NPERSEG, sym=True)
    f, t, Z = signal.stft(
        seg, fs=fs,
        window=hann_sym,
        nperseg=STFT_NPERSEG,
        noverlap=STFT_NOVERLAP,
        nfft=STFT_NPERSEG,
        boundary=None,
        padded=False
    )
    S = np.abs(Z) + 1e-12
    S_db = 20*np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min() + 1e-12)
    img = Image.fromarray((S01*255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE), Image.BICUBIC)
    return np.array(img, np.uint8)

def make_spec_set(X_1d, y):
    imgs = [stft_to_image(seg, FS) for seg in X_1d]
    X = np.expand_dims(np.array(imgs, np.float32)/255.0, axis=-1)  # [N,H,W,1]
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)
print("Train 2D:", train_X_2d.shape, "Test 2D:", test_X_2d.shape)

# Optional: free 1D arrays to save RAM
import gc
del train_X_1d, test_X_1d
gc.collect()

# (Optional) Save spectrogram tensors so restarts don't hurt
# np.savez_compressed("ecg_spectros.npz",
#     train_X_2d=train_X_2d, train_y_oh=train_y_oh,
#     test_X_2d=test_X_2d,   test_y_oh=test_y_oh
# )

# ---------------------------
# 4) 2D-CNN (paper structure)
# ---------------------------
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 1), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)  # dense width not fixed by paper; 64 is a sensible choice
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out, name="ECG_2D_CNN")

model_2d = make_2d_cnn()
model_2d.summary()

# ------------------------------------
# 5) Train (paper LR/epochs; safe Bsz)
# ------------------------------------
LR = 0.001       # paper
EPOCHS = 100     # paper "iterations"
BATCH = 100      # practical; paper used 2500 (too big for most GPUs)

# (Optional) Mixed precision if VRAM is tight (uncomment next 3 lines)
# from tensorflow.keras import mixed_precision
# mixed_precision.set_global_policy('mixed_float16')
# train_X_2d, test_X_2d = train_X_2d.astype('float16'), test_X_2d.astype('float16')

# Safety: ensure tensors exist if the runtime was restarted
try:
    _ = train_X_2d.shape; _ = test_X_2d.shape; _ = train_y_oh.shape; _ = test_y_oh.shape
except NameError:
    # Uncomment this if you saved earlier:
    # data = np.load("ecg_spectros.npz")
    # train_X_2d, train_y_oh = data["train_X_2d"], data["train_y_oh"]
    # test_X_2d,  test_y_oh  = data["test_X_2d"],  data["test_y_oh"]
    # Or rebuild:
    train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
    test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)

# Helpful callbacks (optional but recommended)
cbs = [
    #keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]

model_2d.compile(optimizer=keras.optimizers.Adam(learning_rate=LR),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])

hist_2d = model_2d.fit(
    train_X_2d, train_y_oh,
    validation_data=(test_X_2d, test_y_oh),
    epochs=EPOCHS, batch_size=BATCH, verbose=1,
    callbacks=cbs
)

# -------------------------
# 6) Report & quick plots
# -------------------------
# Averages / best epoch
tr_acc = np.array(hist_2d.history['accuracy'])
vl_acc = np.array(hist_2d.history['val_accuracy'])
tr_loss = np.array(hist_2d.history['loss'])
vl_loss = np.array(hist_2d.history['val_loss'])

print(f"\nFinal train acc: {tr_acc[-1]*100:.2f}% | Final val acc: {vl_acc[-1]*100:.2f}%")
print(f"Avg  train acc : {tr_acc.mean()*100:.2f}% | Avg  val acc : {vl_acc.mean()*100:.2f}%")
print(f"Best val acc   : {vl_acc.max()*100:.2f}% (epoch {vl_acc.argmax()+1})")
print(f"Final loss     : train {tr_loss[-1]:.4f} | val {vl_loss[-1]:.4f}")

# Optional: exact paper metrics on TEST set
y_prob = model_2d.predict(test_X_2d, verbose=0)
y_pred = y_prob.argmax(axis=1)
y_true = test_y_oh.argmax(axis=1)

acc_pct = (y_pred == y_true).mean()*100.0
eps = 1e-7
ce = -np.sum(test_y_oh * np.log(np.clip(y_prob, eps, 1-eps)), axis=1).mean()

print(f"\n[Paper metrics] Test accuracy: {acc_pct:.2f}% | Cross-entropy: {ce:.6f}")

# Confusion matrix (uses sklearn if available)
try:
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(y_true, y_pred, labels=[0,1,2,3,4])
    print("\nClassification report:\n",
          classification_report(y_true, y_pred, target_names=[idx_to_class[i] for i in range(5)], digits=4))
    # Basic text CM (no seaborn)
    print("Confusion matrix:\n", cm)
except Exception:
    pass


Config ready.
NOR: segments extracted = 7936
LBB: segments extracted = 6600
RBB: segments extracted = 5513
PVC: segments extracted = 1347
APC: segments extracted = 1860
Train: (2100, 3600) {'NOR': 450, 'LBB': 450, 'RBB': 450, 'PVC': 300, 'APC': 450}
Test : (420, 3600) {'NOR': 90, 'LBB': 90, 'RBB': 90, 'PVC': 60, 'APC': 90}


Unnamed: 0,Train,Test,Total
NOR,450,90,540
LBB,450,90,540
RBB,450,90,540
PVC,300,60,360
APC,450,90,540


Train 2D: (2100, 256, 256, 1) Test 2D: (420, 256, 256, 1)


Epoch 1/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 114ms/step - accuracy: 0.2017 - loss: 1.6367 - val_accuracy: 0.2143 - val_loss: 1.5947 - learning_rate: 0.0010
Epoch 2/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 54ms/step - accuracy: 0.2739 - loss: 1.5865 - val_accuracy: 0.2786 - val_loss: 1.5664 - learning_rate: 0.0010
Epoch 3/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 63ms/step - accuracy: 0.3497 - loss: 1.5447 - val_accuracy: 0.3714 - val_loss: 1.4855 - learning_rate: 0.0010
Epoch 4/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 79ms/step - accuracy: 0.4620 - loss: 1.4104 - val_accuracy: 0.5571 - val_loss: 1.2169 - learning_rate: 0.0010
Epoch 5/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 58ms/step - accuracy: 0.5746 - loss: 1.1659 - val_accuracy: 0.5714 - val_loss: 1.0646 - learning_rate: 0.0010
Epoch 6/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m

In [None]:
# ============================================================
# ECG Arrhythmia Classification (Paper full pipeline)
# MIT-BIH: STFT (512 Hann) -> 256x256 spectrogram -> 2D-CNN
# ============================================================

# -------------------------
# 0) Config & dependencies
# -------------------------
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Records per class (from the paper)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target split (paper): NOR/LBB/RBB/APC (450 train, 90 test), PVC (300 train, 60 test)
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling
FS = 360           # Hz
WIN_SEC = 10       # seconds
WIN_SAMPLES = FS * WIN_SEC

# STFT / Spectrogram (paper/stable choices)
STFT_NPERSEG  = 512   # paper window size
STFT_NOVERLAP = 256   # 50% overlap (paper not explicit; standard)
SPEC_SIZE     = 256   # spectrogram image size (HxW)

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")

# ------------------------------------------
# 1) Read signals & extract labeled segments
# ------------------------------------------
def read_signal(record_num, fs_target=FS):
    """Read one MIT-BIH record and resample to fs_target."""
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names:
        ch = names.index("v5")
    elif "mlii" in names:
        ch = names.index("mlii")
    else:
        ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
        fs = fs_target
    return sig, fs

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    """
    Extract 10s windows centered on annotated beats of a given label.

    Label mapping (MIT-BIH symbols):
      NOR: 'N'
      LBB: 'L'
      RBB: 'R'
      PVC: 'V'
      APC: 'A' or 'a'
    """
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        if   label == "NOR" and sym == "N": center = idx
        elif label == "LBB" and sym == "L": center = idx
        elif label == "RBB" and sym == "R": center = idx
        elif label == "PVC" and sym == "V": center = idx
        elif label == "APC" and sym in ("A","a"): center = idx
        else:
            continue

        # 10s window centered on the beat
        start = max(center - win_samples//2, 0)
        end   = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(f"{cls}: segments extracted = {len(pool[cls])}")

# ---------------------------------------
# 2) Paper's fixed train/test split (1D)
# ---------------------------------------
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)

    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

# Arrays
train_X_1d = np.stack(train_X_1d)  # [N, 3600]
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y, dtype=np.int64)
test_y     = np.array(test_y, dtype=np.int64)

print("Train:", train_X_1d.shape, {k:int((train_y==v).sum()) for k,v in class_to_idx.items()})
print("Test :", test_X_1d.shape,  {k:int((test_y==v).sum()) for k,v in class_to_idx.items()})

# Optional: small counts table
try:
    import pandas as pd
    df = pd.DataFrame({
        "Train":[int((train_y==class_to_idx[c]).sum()) for c in class_to_idx],
        "Test":[int((test_y==class_to_idx[c]).sum()) for c in class_to_idx]
    }, index=list(class_to_idx.keys()))
    df["Total"] = df["Train"]+df["Test"]
    display(df)
except Exception:
    pass

# ------------------------------------------
# 3) STFT -> 256x256 grayscale spectrograms
# ------------------------------------------
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

def stft_to_image(seg, fs=FS):
    """
    STFT with symmetric Hann (Hanning) window of 512 (paper),
    50% overlap, magnitude -> dB -> [0,255], resized to 256x256.
    """
    hann_sym = signal.windows.hann(STFT_NPERSEG, sym=True)
    f, t, Z = signal.stft(
        seg, fs=fs,
        window=hann_sym,
        nperseg=STFT_NPERSEG,
        noverlap=STFT_NOVERLAP,
        nfft=STFT_NPERSEG,
        boundary=None,
        padded=False
    )
    S = np.abs(Z) + 1e-12
    S_db = 20*np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min() + 1e-12)
    img = Image.fromarray((S01*255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE), Image.BICUBIC)
    return np.array(img, np.uint8)

def make_spec_set(X_1d, y):
    imgs = [stft_to_image(seg, FS) for seg in X_1d]
    X = np.expand_dims(np.array(imgs, np.float32)/255.0, axis=-1)  # [N,H,W,1]
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)
print("Train 2D:", train_X_2d.shape, "Test 2D:", test_X_2d.shape)

# Optional: free 1D arrays to save RAM
import gc
del train_X_1d, test_X_1d
gc.collect()

# (Optional) Save spectrogram tensors so restarts don't hurt
# np.savez_compressed("ecg_spectros.npz",
#     train_X_2d=train_X_2d, train_y_oh=train_y_oh,
#     test_X_2d=test_X_2d,   test_y_oh=test_y_oh
# )

# ---------------------------
# 4) 2D-CNN (paper structure)
# ---------------------------
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 1), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)  # dense width not fixed by paper; 64 is a sensible choice
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out, name="ECG_2D_CNN")

model_2d = make_2d_cnn()
model_2d.summary()

# ------------------------------------
# 5) Train (paper LR/epochs; safe Bsz)
# ------------------------------------
LR = 0.001       # paper
EPOCHS = 100     # paper "iterations"
BATCH = 250    # practical; paper used 2500 (too big for most GPUs)

# (Optional) Mixed precision if VRAM is tight (uncomment next 3 lines)
# from tensorflow.keras import mixed_precision
# mixed_precision.set_global_policy('mixed_float16')
# train_X_2d, test_X_2d = train_X_2d.astype('float16'), test_X_2d.astype('float16')

# Safety: ensure tensors exist if the runtime was restarted
try:
    _ = train_X_2d.shape; _ = test_X_2d.shape; _ = train_y_oh.shape; _ = test_y_oh.shape
except NameError:
    # Uncomment this if you saved earlier:
    # data = np.load("ecg_spectros.npz")
    # train_X_2d, train_y_oh = data["train_X_2d"], data["train_y_oh"]
    # test_X_2d,  test_y_oh  = data["test_X_2d"],  data["test_y_oh"]
    # Or rebuild:
    train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
    test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)

# Helpful callbacks (optional but recommended)
cbs = [
    #keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]

model_2d.compile(optimizer=keras.optimizers.Adam(learning_rate=LR),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])

hist_2d = model_2d.fit(
    train_X_2d, train_y_oh,
    validation_data=(test_X_2d, test_y_oh),
    epochs=EPOCHS, batch_size=BATCH, verbose=1,
    callbacks=cbs
)

# -------------------------
# 6) Report & quick plots
# -------------------------
# Averages / best epoch
tr_acc = np.array(hist_2d.history['accuracy'])
vl_acc = np.array(hist_2d.history['val_accuracy'])
tr_loss = np.array(hist_2d.history['loss'])
vl_loss = np.array(hist_2d.history['val_loss'])

print(f"\nFinal train acc: {tr_acc[-1]*100:.2f}% | Final val acc: {vl_acc[-1]*100:.2f}%")
print(f"Avg  train acc : {tr_acc.mean()*100:.2f}% | Avg  val acc : {vl_acc.mean()*100:.2f}%")
print(f"Best val acc   : {vl_acc.max()*100:.2f}% (epoch {vl_acc.argmax()+1})")
print(f"Final loss     : train {tr_loss[-1]:.4f} | val {vl_loss[-1]:.4f}")

# Optional: exact paper metrics on TEST set
y_prob = model_2d.predict(test_X_2d, verbose=0)
y_pred = y_prob.argmax(axis=1)
y_true = test_y_oh.argmax(axis=1)

acc_pct = (y_pred == y_true).mean()*100.0
eps = 1e-7
ce = -np.sum(test_y_oh * np.log(np.clip(y_prob, eps, 1-eps)), axis=1).mean()

print(f"\n[Paper metrics] Test accuracy: {acc_pct:.2f}% | Cross-entropy: {ce:.6f}")

# Confusion matrix (uses sklearn if available)
try:
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(y_true, y_pred, labels=[0,1,2,3,4])
    print("\nClassification report:\n",
          classification_report(y_true, y_pred, target_names=[idx_to_class[i] for i in range(5)], digits=4))
    # Basic text CM (no seaborn)
    print("Confusion matrix:\n", cm)
except Exception:
    pass


Config ready.
NOR: segments extracted = 7936
LBB: segments extracted = 6600
RBB: segments extracted = 5513
PVC: segments extracted = 1347
APC: segments extracted = 1860
Train: (2100, 3600) {'NOR': 450, 'LBB': 450, 'RBB': 450, 'PVC': 300, 'APC': 450}
Test : (420, 3600) {'NOR': 90, 'LBB': 90, 'RBB': 90, 'PVC': 60, 'APC': 90}


Unnamed: 0,Train,Test,Total
NOR,450,90,540
LBB,450,90,540
RBB,450,90,540
PVC,300,60,360
APC,450,90,540


Train 2D: (2100, 256, 256, 1) Test 2D: (420, 256, 256, 1)


Epoch 1/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 437ms/step - accuracy: 0.2107 - loss: 1.6340 - val_accuracy: 0.2190 - val_loss: 1.5951 - learning_rate: 0.0010
Epoch 2/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 127ms/step - accuracy: 0.2123 - loss: 1.5925 - val_accuracy: 0.3714 - val_loss: 1.5853 - learning_rate: 0.0010
Epoch 3/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 123ms/step - accuracy: 0.4351 - loss: 1.5759 - val_accuracy: 0.3476 - val_loss: 1.5610 - learning_rate: 0.0010
Epoch 4/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 124ms/step - accuracy: 0.4029 - loss: 1.5433 - val_accuracy: 0.4690 - val_loss: 1.5025 - learning_rate: 0.0010
Epoch 5/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 124ms/step - accuracy: 0.5259 - loss: 1.4641 - val_accuracy: 0.5738 - val_loss: 1.3737 - learning_rate: 0.0010
Epoch 6/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1

# adaptive filter

In [None]:
# ============================================================
# ECG Arrhythmia Classification (Paper-faithful full pipeline)
# MIT-BIH: Adaptive STFT (128/256/512 Hann) -> 256x256x3 spectrogram -> 2D-CNN
# ============================================================

# -------------------------
# 0) Config & dependencies
# -------------------------
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Records per class (from the paper)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target split (paper): NOR/LBB/RBB/APC (450 train, 90 test), PVC (300 train, 60 test)
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling
FS = 360           # Hz
WIN_SEC = 10       # seconds
WIN_SAMPLES = FS * WIN_SEC

# Adaptive STFT / Spectrogram (multi-window front-end)
# We use 3 window sizes: 128, 256, 512 (Hann), each with 50% overlap
STFT_WINDOW_SIZES = [128, 256, 512]
SPEC_SIZE         = 256   # spectrogram image size (HxW)

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")

# ------------------------------------------
# 1) Read signals & extract labeled segments
# ------------------------------------------
def read_signal(record_num, fs_target=FS):
    """Read one MIT-BIH record and resample to fs_target."""
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names:
        ch = names.index("v5")
    elif "mlii" in names:
        ch = names.index("mlii")
    else:
        ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
        fs = fs_target
    return sig, fs

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    """
    Extract 10s windows centered on annotated beats of a given label.

    Label mapping (MIT-BIH symbols):
      NOR: 'N'
      LBB: 'L'
      RBB: 'R'
      PVC: 'V'
      APC: 'A' or 'a'
    """
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        if   label == "NOR" and sym == "N": center = idx
        elif label == "LBB" and sym == "L": center = idx
        elif label == "RBB" and sym == "R": center = idx
        elif label == "PVC" and sym == "V": center = idx
        elif label == "APC" and sym in ("A","a"): center = idx
        else:
            continue

        # 10s window centered on the beat
        start = max(center - win_samples//2, 0)
        end   = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(f"{cls}: segments extracted = {len(pool[cls])}")

# ---------------------------------------
# 2) Paper's fixed train/test split (1D)
# ---------------------------------------
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)

    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

# Arrays
train_X_1d = np.stack(train_X_1d)  # [N, 3600]
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y, dtype=np.int64)
test_y     = np.array(test_y, dtype=np.int64)

print("Train:", train_X_1d.shape, {k:int((train_y==v).sum()) for k,v in class_to_idx.items()})
print("Test :", test_X_1d.shape,  {k:int((test_y==v).sum()) for k,v in class_to_idx.items()})

# Optional: small counts table
try:
    import pandas as pd
    df = pd.DataFrame({
        "Train":[int((train_y==class_to_idx[c]).sum()) for c in class_to_idx],
        "Test":[int((test_y==class_to_idx[c]).sum()) for c in class_to_idx]
    }, index=list(class_to_idx.keys()))
    df["Total"] = df["Train"]+df["Test"]
    display(df)
except Exception:
    pass

# -------------------------------------------------
# 3) Adaptive STFT -> 256x256x3 spectrogram images
# -------------------------------------------------
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

def stft_single_window_to_image(seg, fs, nperseg, noverlap):
    """
    STFT with symmetric Hann window of size nperseg,
    'noverlap' overlap, magnitude -> dB -> [0,255], resized to 256x256.
    Returns a 2D uint8 image.
    """
    hann_sym = signal.windows.hann(nperseg, sym=True)
    f, t, Z = signal.stft(
        seg, fs=fs,
        window=hann_sym,
        nperseg=nperseg,
        noverlap=noverlap,
        nfft=nperseg,
        boundary=None,
        padded=False
    )
    S = np.abs(Z) + 1e-12
    S_db = 20*np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min() + 1e-12)
    img = Image.fromarray((S01*255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE), Image.BICUBIC)
    return np.array(img, np.uint8)  # [H,W]

def stft_multi_to_image(seg, fs=FS):
    """
    Adaptive front-end:
      - Compute STFT spectrograms with 3 different Hann windows
        (128, 256, 512 samples; each 50% overlap).
      - Convert each to a 256x256 grayscale image.
      - Stack as 3 channels -> 256x256x3.
    """
    chans = []
    for nperseg in STFT_WINDOW_SIZES:
        noverlap = nperseg // 2   # 50% overlap
        img2d = stft_single_window_to_image(seg, fs, nperseg, noverlap)  # [H,W]
        chans.append(img2d)
    img3 = np.stack(chans, axis=-1)  # [H,W,3], uint8
    return img3

def make_spec_set(X_1d, y):
    # Each segment -> 3-channel spectrogram image
    imgs = [stft_multi_to_image(seg, FS) for seg in X_1d]
    X = np.array(imgs, np.float32) / 255.0  # [N,H,W,3]
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)
print("Train 2D:", train_X_2d.shape, "Test 2D:", test_X_2d.shape)
# Expect: (N, 256, 256, 3)

# Optional: free 1D arrays to save RAM
import gc
del train_X_1d, test_X_1d
gc.collect()

# (Optional) Save spectrogram tensors so restarts don't hurt
# np.savez_compressed("ecg_spectros_multiwin.npz",
#     train_X_2d=train_X_2d, train_y_oh=train_y_oh,
#     test_X_2d=test_X_2d,   test_y_oh=test_y_oh
# )

# ---------------------------
# 4) 2D-CNN (paper structure)
#     Now with 3-channel input
# ---------------------------
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 3), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)  # dense width not fixed by paper; 64 is a sensible choice
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out, name="ECG_2D_CNN_MultiWindowSTFT")

model_2d = make_2d_cnn()
model_2d.summary()

# ------------------------------------
# 5) Train (paper LR/epochs; safe Bsz)
# ------------------------------------
LR = 0.001       # paper
EPOCHS = 100     # paper "iterations"
BATCH = 250      # practical; paper used 2500 (too big for most GPUs)

# (Optional) Mixed precision if VRAM is tight (uncomment next 3 lines)
# from tensorflow.keras import mixed_precision
# mixed_precision.set_global_policy('mixed_float16')
# train_X_2d, test_X_2d = train_X_2d.astype('float16'), test_X_2d.astype('float16')

# Safety: ensure tensors exist if the runtime was restarted
try:
    _ = train_X_2d.shape; _ = test_X_2d.shape; _ = train_y_oh.shape; _ = test_y_oh.shape
except NameError:
    # Uncomment this if you saved earlier:
    # data = np.load("ecg_spectros_multiwin.npz")
    # train_X_2d, train_y_oh = data["train_X_2d"], data["train_y_oh"]
    # test_X_2d,  test_y_oh  = data["test_X_2d"],  data["test_y_oh"]
    # Or rebuild:
    train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
    test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)

# Helpful callbacks (optional but recommended)
cbs = [
    # keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]

model_2d.compile(optimizer=keras.optimizers.Adam(learning_rate=LR),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])

hist_2d = model_2d.fit(
    train_X_2d, train_y_oh,
    validation_data=(test_X_2d, test_y_oh),
    epochs=EPOCHS, batch_size=BATCH, verbose=1,
    callbacks=cbs
)

# -------------------------
# 6) Report & quick plots
# -------------------------
# Averages / best epoch
tr_acc = np.array(hist_2d.history['accuracy'])
vl_acc = np.array(hist_2d.history['val_accuracy'])
tr_loss = np.array(hist_2d.history['loss'])
vl_loss = np.array(hist_2d.history['val_loss'])

print(f"\nFinal train acc: {tr_acc[-1]*100:.2f}% | Final val acc: {vl_acc[-1]*100:.2f}%")
print(f"Avg  train acc : {tr_acc.mean()*100:.2f}% | Avg  val acc : {vl_acc.mean()*100:.2f}%")
print(f"Best val acc   : {vl_acc.max()*100:.2f}% (epoch {vl_acc.argmax()+1})")
print(f"Final loss     : train {tr_loss[-1]:.4f} | val {vl_loss[-1]:.4f}")

# Optional: exact paper metrics on TEST set
y_prob = model_2d.predict(test_X_2d, verbose=0)
y_pred = y_prob.argmax(axis=1)
y_true = test_y_oh.argmax(axis=1)

acc_pct = (y_pred == y_true).mean()*100.0
eps = 1e-7
ce = -np.sum(test_y_oh * np.log(np.clip(y_prob, eps, 1-eps)), axis=1).mean()

print(f"\n[Paper metrics] Test accuracy: {acc_pct:.2f}% | Cross-entropy: {ce:.6f}")

# Confusion matrix (uses sklearn if available)
try:
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(y_true, y_pred, labels=[0,1,2,3,4])
    print("\nClassification report:\n",
          classification_report(y_true, y_pred, target_names=[idx_to_class[i] for i in range(5)], digits=4))
    # Basic text CM (no seaborn)
    print("Confusion matrix:\n", cm)
except Exception:
    pass


Config ready.
NOR: segments extracted = 7936
LBB: segments extracted = 6600
RBB: segments extracted = 5513
PVC: segments extracted = 1347
APC: segments extracted = 1860
Train: (2100, 3600) {'NOR': 450, 'LBB': 450, 'RBB': 450, 'PVC': 300, 'APC': 450}
Test : (420, 3600) {'NOR': 90, 'LBB': 90, 'RBB': 90, 'PVC': 60, 'APC': 90}


Unnamed: 0,Train,Test,Total
NOR,450,90,540
LBB,450,90,540
RBB,450,90,540
PVC,300,60,360
APC,450,90,540


Train 2D: (2100, 256, 256, 3) Test 2D: (420, 256, 256, 3)


Epoch 1/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 561ms/step - accuracy: 0.2258 - loss: 1.6137 - val_accuracy: 0.4476 - val_loss: 1.5612 - learning_rate: 0.0010
Epoch 2/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 232ms/step - accuracy: 0.4495 - loss: 1.5302 - val_accuracy: 0.5762 - val_loss: 1.4422 - learning_rate: 0.0010
Epoch 3/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 207ms/step - accuracy: 0.6351 - loss: 1.3699 - val_accuracy: 0.6381 - val_loss: 1.1956 - learning_rate: 0.0010
Epoch 4/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 209ms/step - accuracy: 0.7536 - loss: 1.0837 - val_accuracy: 0.8190 - val_loss: 0.8147 - learning_rate: 0.0010
Epoch 5/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 204ms/step - accuracy: 0.8615 - loss: 0.7244 - val_accuracy: 0.8405 - val_loss: 0.5410 - learning_rate: 0.0010
Epoch 6/100
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2

# 1000 batch size

In [None]:
# ============================================================
# ECG Arrhythmia Classification (Paper-faithful full pipeline)
# MIT-BIH: STFT (512 Hann) -> 256x256 spectrogram -> 2D-CNN
# ============================================================

# -------------------------
# 0) Config & dependencies
# -------------------------
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Records per class (from the paper)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target split (paper): NOR/LBB/RBB/APC (450 train, 90 test), PVC (300 train, 60 test)
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling
FS = 360           # Hz
WIN_SEC = 10       # seconds
WIN_SAMPLES = FS * WIN_SEC

# STFT / Spectrogram (paper/stable choices)
STFT_NPERSEG  = 512   # paper window size
STFT_NOVERLAP = 256   # 50% overlap (paper not explicit; standard)
SPEC_SIZE     = 256   # spectrogram image size (HxW)

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")

# ------------------------------------------
# 1) Read signals & extract labeled segments
# ------------------------------------------
def read_signal(record_num, fs_target=FS):
    """Read one MIT-BIH record and resample to fs_target."""
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names:
        ch = names.index("v5")
    elif "mlii" in names:
        ch = names.index("mlii")
    else:
        ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
        fs = fs_target
    return sig, fs

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    """
    Extract 10s windows centered on annotated beats of a given label.

    Label mapping (MIT-BIH symbols):
      NOR: 'N'
      LBB: 'L'
      RBB: 'R'
      PVC: 'V'
      APC: 'A' or 'a'
    """
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        if   label == "NOR" and sym == "N": center = idx
        elif label == "LBB" and sym == "L": center = idx
        elif label == "RBB" and sym == "R": center = idx
        elif label == "PVC" and sym == "V": center = idx
        elif label == "APC" and sym in ("A","a"): center = idx
        else:
            continue

        # 10s window centered on the beat
        start = max(center - win_samples//2, 0)
        end   = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(f"{cls}: segments extracted = {len(pool[cls])}")

# ---------------------------------------
# 2) Paper's fixed train/test split (1D)
# ---------------------------------------
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)

    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

# Arrays
train_X_1d = np.stack(train_X_1d)  # [N, 3600]
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y, dtype=np.int64)
test_y     = np.array(test_y, dtype=np.int64)

print("Train:", train_X_1d.shape, {k:int((train_y==v).sum()) for k,v in class_to_idx.items()})
print("Test :", test_X_1d.shape,  {k:int((test_y==v).sum()) for k,v in class_to_idx.items()})

# Optional: small counts table
try:
    import pandas as pd
    df = pd.DataFrame({
        "Train":[int((train_y==class_to_idx[c]).sum()) for c in class_to_idx],
        "Test":[int((test_y==class_to_idx[c]).sum()) for c in class_to_idx]
    }, index=list(class_to_idx.keys()))
    df["Total"] = df["Train"]+df["Test"]
    display(df)
except Exception:
    pass

# ------------------------------------------
# 3) STFT -> 256x256 grayscale spectrograms
# ------------------------------------------
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

def stft_to_image(seg, fs=FS):
    """
    STFT with symmetric Hann (Hanning) window of 512 (paper),
    50% overlap, magnitude -> dB -> [0,255], resized to 256x256.
    """
    hann_sym = signal.windows.hann(STFT_NPERSEG, sym=True)
    f, t, Z = signal.stft(
        seg, fs=fs,
        window=hann_sym,
        nperseg=STFT_NPERSEG,
        noverlap=STFT_NOVERLAP,
        nfft=STFT_NPERSEG,
        boundary=None,
        padded=False
    )
    S = np.abs(Z) + 1e-12
    S_db = 20*np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min() + 1e-12)
    img = Image.fromarray((S01*255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE), Image.BICUBIC)
    return np.array(img, np.uint8)

def make_spec_set(X_1d, y):
    imgs = [stft_to_image(seg, FS) for seg in X_1d]
    X = np.expand_dims(np.array(imgs, np.float32)/255.0, axis=-1)  # [N,H,W,1]
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)
print("Train 2D:", train_X_2d.shape, "Test 2D:", test_X_2d.shape)

# Optional: free 1D arrays to save RAM
import gc
del train_X_1d, test_X_1d
gc.collect()

# (Optional) Save spectrogram tensors so restarts don't hurt
# np.savez_compressed("ecg_spectros.npz",
#     train_X_2d=train_X_2d, train_y_oh=train_y_oh,
#     test_X_2d=test_X_2d,   test_y_oh=test_y_oh
# )

# ---------------------------
# 4) 2D-CNN (paper structure)
# ---------------------------
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 1), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)  # dense width not fixed by paper; 64 is a sensible choice
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out, name="ECG_2D_CNN")

model_2d = make_2d_cnn()
model_2d.summary()

# ------------------------------------
# 5) Train (paper LR/epochs; safe Bsz)
# ------------------------------------
LR = 0.001       # paper
EPOCHS = 100     # paper "iterations"
BATCH = 1000   # practical; paper used 2500 (too big for most GPUs)

# (Optional) Mixed precision if VRAM is tight (uncomment next 3 lines)
# from tensorflow.keras import mixed_precision
# mixed_precision.set_global_policy('mixed_float16')
# train_X_2d, test_X_2d = train_X_2d.astype('float16'), test_X_2d.astype('float16')

# Safety: ensure tensors exist if the runtime was restarted
try:
    _ = train_X_2d.shape; _ = test_X_2d.shape; _ = train_y_oh.shape; _ = test_y_oh.shape
except NameError:
    # Uncomment this if you saved earlier:
    # data = np.load("ecg_spectros.npz")
    # train_X_2d, train_y_oh = data["train_X_2d"], data["train_y_oh"]
    # test_X_2d,  test_y_oh  = data["test_X_2d"],  data["test_y_oh"]
    # Or rebuild:
    train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
    test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)

# Helpful callbacks (optional but recommended)
cbs = [
    #keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]

model_2d.compile(optimizer=keras.optimizers.Adam(learning_rate=LR),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])

hist_2d = model_2d.fit(
    train_X_2d, train_y_oh,
    validation_data=(test_X_2d, test_y_oh),
    epochs=EPOCHS, batch_size=BATCH, verbose=1,
    callbacks=cbs
)

# -------------------------
# 6) Report & quick plots
# -------------------------
# Averages / best epoch
tr_acc = np.array(hist_2d.history['accuracy'])
vl_acc = np.array(hist_2d.history['val_accuracy'])
tr_loss = np.array(hist_2d.history['loss'])
vl_loss = np.array(hist_2d.history['val_loss'])

print(f"\nFinal train acc: {tr_acc[-1]*100:.2f}% | Final val acc: {vl_acc[-1]*100:.2f}%")
print(f"Avg  train acc : {tr_acc.mean()*100:.2f}% | Avg  val acc : {vl_acc.mean()*100:.2f}%")
print(f"Best val acc   : {vl_acc.max()*100:.2f}% (epoch {vl_acc.argmax()+1})")
print(f"Final loss     : train {tr_loss[-1]:.4f} | val {vl_loss[-1]:.4f}")

# Optional: exact paper metrics on TEST set
y_prob = model_2d.predict(test_X_2d, verbose=0)
y_pred = y_prob.argmax(axis=1)
y_true = test_y_oh.argmax(axis=1)

acc_pct = (y_pred == y_true).mean()*100.0
eps = 1e-7
ce = -np.sum(test_y_oh * np.log(np.clip(y_prob, eps, 1-eps)), axis=1).mean()

print(f"\n[Paper metrics] Test accuracy: {acc_pct:.2f}% | Cross-entropy: {ce:.6f}")

# Confusion matrix (uses sklearn if available)
try:
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(y_true, y_pred, labels=[0,1,2,3,4])
    print("\nClassification report:\n",
          classification_report(y_true, y_pred, target_names=[idx_to_class[i] for i in range(5)], digits=4))
    # Basic text CM (no seaborn)
    print("Confusion matrix:\n", cm)
except Exception:
    pass


Config ready.
NOR: segments extracted = 7936
LBB: segments extracted = 6600
RBB: segments extracted = 5513
PVC: segments extracted = 1347
APC: segments extracted = 1860
Train: (2100, 3600) {'NOR': 450, 'LBB': 450, 'RBB': 450, 'PVC': 300, 'APC': 450}
Test : (420, 3600) {'NOR': 90, 'LBB': 90, 'RBB': 90, 'PVC': 60, 'APC': 90}


Unnamed: 0,Train,Test,Total
NOR,450,90,540
LBB,450,90,540
RBB,450,90,540
PVC,300,60,360
APC,450,90,540


Train 2D: (2100, 256, 256, 1) Test 2D: (420, 256, 256, 1)


Epoch 1/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 5s/step - accuracy: 0.2186 - loss: 1.6148 - val_accuracy: 0.2143 - val_loss: 1.6012 - learning_rate: 0.0010
Epoch 2/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 327ms/step - accuracy: 0.2141 - loss: 1.6010 - val_accuracy: 0.2143 - val_loss: 1.5984 - learning_rate: 0.0010
Epoch 3/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 325ms/step - accuracy: 0.2141 - loss: 1.5963 - val_accuracy: 0.2143 - val_loss: 1.5933 - learning_rate: 0.0010
Epoch 4/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 327ms/step - accuracy: 0.2186 - loss: 1.5909 - val_accuracy: 0.4143 - val_loss: 1.5870 - learning_rate: 0.0010
Epoch 5/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 325ms/step - accuracy: 0.4079 - loss: 1.5837 - val_accuracy: 0.3167 - val_loss: 1.5791 - learning_rate: 0.0010
Epoch 6/100
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 423

# full code with different batch sizes

In [None]:
# ============================================================
# ECG Arrhythmia Classification (Paper-faithful full pipeline)
# MIT-BIH: STFT (512 Hann) -> 256x256 spectrogram -> 2D-CNN
# ============================================================

# -------------------------
# 0) Config & dependencies
# -------------------------
import os, random
import numpy as np
import tensorflow as tf
import wfdb
from wfdb import processing

# Reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

# Records per class (from the paper)
RECORDS = {
    "NOR": [100, 105, 215],
    "LBB": [109, 111, 214],
    "RBB": [118, 124, 212],
    "PVC": [106, 233],
    "APC": [207, 209, 232]
}

# Target split
TARGET_TRAIN = {"NOR":450, "LBB":450, "RBB":450, "APC":450, "PVC":300}
TARGET_TEST  = {"NOR": 90, "LBB": 90, "RBB": 90, "APC": 90, "PVC": 60}

# Sampling
FS = 360
WIN_SEC = 10
WIN_SAMPLES = FS * WIN_SEC

# STFT params
STFT_NPERSEG  = 512
STFT_NOVERLAP = 256
SPEC_SIZE     = 256

# Class mapping
class_to_idx = {"NOR":0, "LBB":1, "RBB":2, "PVC":3, "APC":4}
idx_to_class = {v:k for k,v in class_to_idx.items()}

print("Config ready.")

# ------------------------------------------
# 1) Read signals & extract labeled segments
# ------------------------------------------
def read_signal(record_num, fs_target=FS):
    rec = wfdb.rdrecord(str(record_num), pn_dir="mitdb")
    names = [n.lower() for n in rec.sig_name]
    if "v5" in names: ch = names.index("v5")
    elif "mlii" in names: ch = names.index("mlii")
    else: ch = 0
    sig = rec.p_signal[:, ch].astype(np.float32)
    fs = int(rec.fs)
    if fs != fs_target:
        sig = processing.resample_sig(sig, fs, fs_target)[0]
    return sig, fs_target

def extract_segments(record_num, label, win_samples=WIN_SAMPLES):
    sig, fs = read_signal(record_num)
    ann = wfdb.rdann(str(record_num), "atr", pn_dir="mitdb")

    out = []
    for idx, sym in zip(ann.sample, ann.symbol):
        if   label=="NOR" and sym=="N": center = idx
        elif label=="LBB" and sym=="L": center = idx
        elif label=="RBB" and sym=="R": center = idx
        elif label=="PVC" and sym=="V": center = idx
        elif label=="APC" and sym in ("A","a"): center = idx
        else: continue

        start = max(center - win_samples//2, 0)
        end   = start + win_samples
        if end <= len(sig):
            out.append(sig[start:end])
    return out

# Build pools per class
pool = {cls: [] for cls in RECORDS}
for cls, recs in RECORDS.items():
    for r in recs:
        pool[cls].extend(extract_segments(r, cls))
    print(f"{cls}: segments extracted = {len(pool[cls])}")

# ---------------------------------------
# 2) Paper's fixed train/test split (1D)
# ---------------------------------------
rng = np.random.default_rng(SEED)

def take_n(lst, n):
    idx = rng.choice(len(lst), size=min(n, len(lst)), replace=False)
    return [lst[i] for i in idx]

train_X_1d, train_y = [], []
test_X_1d,  test_y  = [], []

for cls in ["NOR","LBB","RBB","APC","PVC"]:
    tr_n, te_n = TARGET_TRAIN[cls], TARGET_TEST[cls]
    cls_pool = pool[cls]

    train_segs = take_n(cls_pool, tr_n)
    chosen = set(id(x) for x in train_segs)
    remaining = [x for x in cls_pool if id(x) not in chosen]
    test_segs = take_n(remaining, te_n)

    train_X_1d += train_segs
    train_y    += [class_to_idx[cls]] * len(train_segs)
    test_X_1d  += test_segs
    test_y     += [class_to_idx[cls]] * len(test_segs)

train_X_1d = np.stack(train_X_1d)
test_X_1d  = np.stack(test_X_1d)
train_y    = np.array(train_y)
test_y     = np.array(test_y)

print("Train:", train_X_1d.shape)
print("Test :", test_X_1d.shape)

# ------------------------------------------
# 3) STFT -> 256x256 grayscale spectrograms
# ------------------------------------------
from PIL import Image
from scipy import signal
from tensorflow.keras.utils import to_categorical

def stft_to_image(seg, fs=FS):
    hann_sym = signal.windows.hann(STFT_NPERSEG, sym=True)
    f, t, Z = signal.stft(seg, fs=fs, window=hann_sym,
                          nperseg=STFT_NPERSEG, noverlap=STFT_NOVERLAP,
                          nfft=STFT_NPERSEG, boundary=None, padded=False)
    S = np.abs(Z) + 1e-12
    S_db = 20*np.log10(S / S.max())
    S01 = (S_db - S_db.min()) / (S_db.max() - S_db.min() + 1e-12)
    img = Image.fromarray((S01*255).astype(np.uint8)).resize((SPEC_SIZE, SPEC_SIZE))
    return np.array(img, np.uint8)

def make_spec_set(X_1d, y):
    imgs = [stft_to_image(seg, FS) for seg in X_1d]
    X = np.expand_dims(np.array(imgs, np.float32)/255.0, axis=-1)
    Y = to_categorical(y, num_classes=5)
    return X, Y

train_X_2d, train_y_oh = make_spec_set(train_X_1d, train_y)
test_X_2d,  test_y_oh  = make_spec_set(test_X_1d,  test_y)

# free RAM
import gc
del train_X_1d, test_X_1d
gc.collect()

# ---------------------------
# 4) 2D-CNN (paper structure)
# ---------------------------
from tensorflow import keras
from tensorflow.keras import layers

def make_2d_cnn(input_shape=(SPEC_SIZE, SPEC_SIZE, 1), num_classes=5):
    x = inp = keras.Input(shape=input_shape)
    x = layers.Conv2D(8, (4,4), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(13, (2,2), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inp, out)

# =================================================================
# 5) *** BATCH SIZE SWEEP EXPERIMENT (PAPER REPLICATION) ***
# =================================================================

import matplotlib.pyplot as plt
import pandas as pd

LR = 0.001
EPOCHS = 100
batch_sizes = [100, 250, 500, 750, 1000, 1500]

results = []
acc_curves = {}
loss_curves = {}

for BATCH in batch_sizes:
    print(f"\n========== Running Batch Size {BATCH} ==========\n")

    model = make_2d_cnn()
    model.compile(
        optimizer=keras.optimizers.Adam(LR),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    hist = model.fit(
        train_X_2d, train_y_oh,
        validation_data=(test_X_2d, test_y_oh),
        epochs=EPOCHS,
        batch_size=BATCH,
        verbose=1
    )

    acc = hist.history["val_accuracy"]
    loss = hist.history["val_loss"]

    acc_curves[BATCH] = acc
    loss_curves[BATCH] = loss

    # Test accuracy on final model
    y_prob = model.predict(test_X_2d, verbose=0)
    y_pred = y_prob.argmax(axis=1)
    y_true = test_y_oh.argmax(axis=1)
    test_acc = (y_pred == y_true).mean() * 100

    results.append([BATCH, np.mean(acc)*100, np.mean(loss), test_acc])

    # save individual plots
    plt.figure(); plt.plot(acc); plt.title(f"Accuracy (Batch {BATCH})")
    plt.xlabel("Epoch"); plt.ylabel("Val Accuracy"); plt.grid()
    plt.savefig(f"acc_batch_{BATCH}.png"); plt.close()

    plt.figure(); plt.plot(loss); plt.title(f"Loss (Batch {BATCH})")
    plt.xlabel("Epoch"); plt.ylabel("Val Loss"); plt.grid()
    plt.savefig(f"loss_batch_{BATCH}.png"); plt.close()

# Save combined plots
plt.figure(figsize=(10,6))
for B in batch_sizes:
    plt.plot(acc_curves[B], label=f"batch={B}")
plt.legend(); plt.grid()
plt.title("Combined Accuracy (LR=0.001)")
plt.savefig("combined_accuracy.png"); plt.close()

plt.figure(figsize=(10,6))
for B in batch_sizes:
    plt.plot(loss_curves[B], label=f"batch={B}")
plt.legend(); plt.grid()
plt.title("Combined Loss (LR=0.001)")
plt.savefig("combined_loss.png"); plt.close()

# Save CSV
df = pd.DataFrame(results, columns=["Batch Size","Avg Val Acc (%)","Avg Val Loss","Final Test Acc (%)"])
df.to_csv("batch_sweep_results.csv", index=False)
print(df)


Config ready.
NOR: segments extracted = 7936
LBB: segments extracted = 6600
RBB: segments extracted = 5513
PVC: segments extracted = 1347
APC: segments extracted = 1860
Train: (2100, 3600)
Test : (420, 3600)


Epoch 1/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 150ms/step - accuracy: 0.2017 - loss: 1.6367 - val_accuracy: 0.2143 - val_loss: 1.5947
Epoch 2/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 56ms/step - accuracy: 0.2702 - loss: 1.5865 - val_accuracy: 0.3048 - val_loss: 1.5662
Epoch 3/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 61ms/step - accuracy: 0.3748 - loss: 1.5445 - val_accuracy: 0.3714 - val_loss: 1.4798
Epoch 4/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 59ms/step - accuracy: 0.4575 - loss: 1.4073 - val_accuracy: 0.5310 - val_loss: 1.2289
Epoch 5/100
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 58ms/step - accuracy: 0.5762 - loss: 1.1752 - val_accuracy: 0.60