In [1]:
# Required Packages
import numpy as np
import pandas as pd
import wfdb
import pywt
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")
import random
random.seed(42)

# Path to MIT-BIH data
data = 'mitdb/'

# List of Patients
patients = ['100','101','102','103','104','105','106','107',
           '108','109','111','112','113','114','115','116',
           '117','118','119','121','122','123','124','200',
           '201','202','203','205','207','208','209','210',
           '212','213','214','215','217','219','220','221',
           '222','223','228','230','231','232','233','234']

# Abnormal & Normal Beat Symbols
abnormal = ['L','R','V','/','A','f','F','j','a','E','J','e','S']
normal = ['N']

# Wavelet denoising
def wavelet_denoising(signal, wavelet='db6', level=1):
    coeff = pywt.wavedec(signal, wavelet, mode='per')
    sigma = np.median(np.abs(coeff[-level])) / 0.6745
    uthresh = sigma * np.sqrt(2 * np.log(len(signal)))
    coeff[1:] = [pywt.threshold(i, value=uthresh, mode='soft') for i in coeff[1:]]
    return pywt.waverec(coeff, wavelet, mode='per')

# Apply denoising to dataset
def denoise_ecg_signals(signals):
    return np.array([wavelet_denoising(signal) for signal in signals])

# Load ECG signal & annotation
def load_ecg(file):
    record = wfdb.rdrecord(file)
    annotation = wfdb.rdann(file, 'atr')
    p_signal = record.p_signal[:, 0]  # First channel
    return p_signal, annotation.symbol, annotation.sample

# Extract X (ECG segment), Y (label), sym (beat symbol)
def build_XY(p_signal, df_ann, num_cols, abnormal):
    num_rows = len(df_ann)
    X = np.zeros((num_rows, num_cols))
    Y = np.zeros((num_rows, 1))
    sym = []
    max_row = 0

    for atr_sample, atr_sym in zip(df_ann.atr_sample.values, df_ann.atr_sym.values):
        left = max(0, atr_sample - num_sec * fs)
        right = min(len(p_signal), atr_sample + num_sec * fs)
        x = p_signal[left:right]
        if len(x) == num_cols:
            X[max_row, :] = x
            Y[max_row, :] = int(atr_sym in abnormal)
            sym.append(atr_sym)
            max_row += 1

    return X[:max_row, :], Y[:max_row, :], sym

# Create dataset from all patients
def make_dataset(patients, num_sec, fs, abnormal):
    num_cols = 2 * num_sec * fs
    X_all = []
    Y_all = []
    sym_all = []

    for pt in patients:
        file = data + pt
        p_signal, atr_sym, atr_sample = load_ecg(file)
        df_ann = pd.DataFrame({'atr_sym': atr_sym, 'atr_sample': atr_sample})
        df_ann = df_ann[df_ann.atr_sym.isin(abnormal + ['N'])]
        X, Y, sym = build_XY(p_signal, df_ann, num_cols, abnormal)
        if len(X) > 0:
            X_all.append(X)
            Y_all.append(Y)
            sym_all += sym

    X_all = np.vstack(X_all)
    Y_all = np.vstack(Y_all)
    return X_all, Y_all, sym_all

# Parameters
num_sec = 3
fs = 360

# Prepare dataset
X_all, Y_all, sym_all = make_dataset(patients, num_sec, fs, abnormal)

# Denoise
X_all_denoised = denoise_ecg_signals(X_all)

# Normalize
scaler = MinMaxScaler()
X_all_normalized = scaler.fit_transform(X_all_denoised)


In [2]:
from collections import Counter

# Your sym_all list contains beat symbols
# Example: sym_all = ['N', 'N', 'V', 'F', '/', 'N', 'V', 'A', ...]

# Step 1: Count occurrences of each beat type
beat_counts = Counter(sym_all)

# Step 2: Set target number of samples per class
TARGET = 8000

# Step 3: Compute number of synthetic samples needed per class
synth_needed = {}
for beat, count in beat_counts.items():
    if beat != 'N':  # We only synthesize abnormal beats
        need = TARGET - count
        synth_needed[beat] = need if need > 0 else 0

# Step 4: Display result
print("Original Beat Counts:")
for beat, count in beat_counts.items():
    print(f"  {beat}: {count}")

print("\nSynthetic Samples Needed (to reach 8000):")
for beat, need in synth_needed.items():
    print(f"  {beat}: {need}")

Original Beat Counts:
  N: 74795
  A: 2536
  V: 7113
  /: 6999
  f: 982
  F: 801
  j: 229
  L: 8052
  a: 150
  J: 83
  R: 7235
  E: 106
  S: 2
  e: 16

Synthetic Samples Needed (to reach 8000):
  A: 5464
  V: 887
  /: 1001
  f: 7018
  F: 7199
  j: 7771
  L: 0
  a: 7850
  J: 7917
  R: 765
  E: 7894
  S: 7998
  e: 7984


In [3]:
import os
import numpy as np

# Create output folder
output_dir = 'generated_S_class'
os.makedirs(output_dir, exist_ok=True)

# Extract class 'S'
X_S = X_all_normalized[Y_all.ravel() == 'S']

print(f"Shape of real class 'S' samples: {X_S.shape}")

Shape of real class 'S' samples: (0, 2160)


In [4]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Generator
class Generator(nn.Module):
    def __init__(self, noise_dim, output_dim):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim),
        )

    def forward(self, z):
        return self.model(z)

# Discriminator
class Discriminator(nn.Module):
    def __init__(self, input_dim):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.model(x)

# Dimensions
signal_len = X_S.shape[1]
noise_dim = 100

# Initialize
G = Generator(noise_dim, signal_len).to(device)
D = Discriminator(signal_len).to(device)

# Loss and Optimizers
criterion = nn.BCELoss()
optimizer_G = torch.optim.Adam(G.parameters(), lr=0.0002)
optimizer_D = torch.optim.Adam(D.parameters(), lr=0.0002)


ModuleNotFoundError: No module named 'torch'