In [1]:
!unzip -q /content/training2017.zip -d ./

In [27]:
import pandas as pd

file_path = '/content/REFERENCE.csv'
data = pd.read_csv(file_path, header=None, names=['Name', 'Class'])


class_distribution = data['Class'].value_counts()
print(class_distribution)

N    5050
O    2456
A     738
~     284
Name: Class, dtype: int64


# 1- Load Data

In [28]:
import os
import pandas as pd
import numpy as np
import scipy.io

def load_and_crop_ecg_data(class_label, reference_csv_path, data_dir, target_length=9000):
    reference_data = pd.read_csv(reference_csv_path, header=None, names=['Name', 'Class'])
    class_samples = reference_data[reference_data['Class'] == class_label]

    ecg_data_list = []

    for _, row in class_samples.iterrows():
        file_path = os.path.join(data_dir, f"{row['Name']}.mat")
        ecg_data = scipy.io.loadmat(file_path)['val'][0]

        # Ensure each sequence is exactly target_length long
        if len(ecg_data) >= target_length:
            ecg_data = ecg_data[:target_length]
        else:
            # Pad sequences shorter than target_length with zeros
            padding = target_length - len(ecg_data)
            ecg_data = np.pad(ecg_data, (0, padding), 'constant', constant_values=(0))

        ecg_data_list.append(ecg_data)

    ecg_data_array = np.array(ecg_data_list)
    return ecg_data_array



# 2- Build GAN Model

In [29]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers

ecg_shape = (9000,)  # Updated to reflect cropped data shape
latent_dim = 100  # Example latent dimension for generator input

# Generator Model
def build_generator(latent_dim):
    model = models.Sequential([
        layers.Dense(256, activation='relu', input_dim=latent_dim),
        layers.BatchNormalization(momentum=0.8),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(momentum=0.8),
        layers.Dense(1024, activation='relu'),
        layers.BatchNormalization(momentum=0.8),
        layers.Dense(np.prod(ecg_shape), activation='tanh'),
        layers.Reshape(ecg_shape)
    ])
    return model

# Discriminator Model
def build_discriminator():
    model = models.Sequential([
        layers.Flatten(input_shape=ecg_shape),
        layers.Dense(512, activation='relu'),
        layers.Dense(256, activation='relu'),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

# GAN Model
def build_gan(generator, discriminator):
    discriminator.trainable = False
    model = models.Sequential([generator, discriminator])
    return model


# 3- Compile the model

In [30]:
# Initialize and compile the discriminator
discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=optimizers.Adam(0.0002, 0.5), metrics=['accuracy'])

# Initialize the generator
generator = build_generator(latent_dim)

# The generator is only trained through the GAN model
gan = build_gan(generator, discriminator)
gan.compile(loss='binary_crossentropy', optimizer=optimizers.Adam(0.0002, 0.5))


# 4- Train the model

In [1]:
import numpy as np

def train_gan(class_data, generator, discriminator, gan, epochs, batch_size, latent_dim):
    half_batch = batch_size // 2
    for epoch in range(epochs):
        # ---------------------
        #  Train Discriminator
        # ---------------------
        # Randomly select real ECG samples
        idx = np.random.randint(0, class_data.shape[0], half_batch)
        real_ecgs = class_data[idx]

        # Generate fake ECG data
        noise = np.random.normal(0, 1, (half_batch, latent_dim))
        gen_ecgs = generator.predict(noise)

        # Labels for real and fake data
        real_y = np.ones((half_batch, 1))
        fake_y = np.zeros((half_batch, 1))

        # Train the discriminator
        d_loss_real = discriminator.train_on_batch(real_ecgs, real_y)
        d_loss_fake = discriminator.train_on_batch(gen_ecgs, fake_y)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # ---------------------
        #  Train Generator
        # ---------------------
        noise = np.random.normal(0, 1, (batch_size, latent_dim))
        valid_y = np.ones((batch_size, 1))  # Labels for generator training

        # Train the generator (to have the discriminator label samples as valid)
        g_loss = gan.train_on_batch(noise, valid_y)

        # Print the progress
        print(f"{epoch} [D loss: {d_loss[0]}, acc.: {100*d_loss[1]}%] [G loss: {g_loss}]")


# 5- Generate new data from trained model

In [33]:
def generate_synthetic_ecg(generator, num_samples, latent_dim):
    noise = np.random.normal(0, 1, (num_samples, latent_dim))
    synthetic_ecg = generator.predict(noise)
    return synthetic_ecg

# Read data of class N

In [1]:
import numpy as np


class_label = 'N'
reference_csv_path = '/content/REFERENCE.csv'
data_dir = '/content/training2017'

first_class_data = load_and_crop_ecg_data(class_label, reference_csv_path, data_dir, target_length=9000)

print("\n\n ****************************************")
print("Original data shape of class N:\n")
print(first_class_data.shape)

#  Generate new data for class A

In [1]:
import numpy as np


class_label = 'A'
reference_csv_path = '/content/REFERENCE.csv'
data_dir = '/content/training2017'

class_data = load_and_crop_ecg_data(class_label, reference_csv_path, data_dir, target_length=9000)

epochs = 5  # Number of epochs to train for
batch_size = 32  # Size of the batch
latent_dim = 100  # Dimensionality of the random noise input to the generator

# Assuming `class_data` is your loaded and preprocessed ECG data
train_gan(class_data, generator, discriminator, gan, epochs, batch_size, latent_dim)


num_samples = 4312
latent_dim = 100

synthetic_ecg_data = generate_synthetic_ecg(generator, num_samples, latent_dim)


print("\n\n ****************************************")
print("synthetic data shape of class A:\n")
print(synthetic_ecg_data.shape)


print("\n\n ****************************************")
print("Original data shape of class A:\n")
print(class_data.shape)

print("\n\n ****************************************")
print("all data shape of class A: \n")
second_class_data = np.concatenate((synthetic_ecg_data, class_data), axis=0)
print(second_class_data.shape)


#  Generate new data for class ~



In [1]:
import numpy as np


class_label = '~'
reference_csv_path = '/content/REFERENCE.csv'
data_dir = '/content/training2017'

class_data = load_and_crop_ecg_data(class_label, reference_csv_path, data_dir, target_length=9000)

epochs = 5  # Number of epochs to train for
batch_size = 32  # Size of the batch
latent_dim = 100  # Dimensionality of the random noise input to the generator

# Assuming `class_data` is your loaded and preprocessed ECG data
train_gan(class_data, generator, discriminator, gan, epochs, batch_size, latent_dim)


num_samples = 4766
latent_dim = 100

synthetic_ecg_data = generate_synthetic_ecg(generator, num_samples, latent_dim)

print("\n\n ****************************************")
print("synthetic data shape of class ~:\n")
print(synthetic_ecg_data.shape)



print("\n\n ****************************************")
print("Original data shape of class ~:\n")
print(class_data.shape)


print("\n\n ****************************************")
print("all data shape of class ~: \n")
third_class_data = np.concatenate((synthetic_ecg_data, class_data), axis=0)
print(third_class_data.shape)

#  Generate new data for class O

In [1]:
import numpy as np


class_label = 'O'
reference_csv_path = '/content/REFERENCE.csv'
data_dir = '/content/training2017'

class_data = load_and_crop_ecg_data(class_label, reference_csv_path, data_dir, target_length=9000)

epochs = 5
batch_size = 32
latent_dim = 100

train_gan(class_data, generator, discriminator, gan, epochs, batch_size, latent_dim)


num_samples = 2594
latent_dim = 100

synthetic_ecg_data = generate_synthetic_ecg(generator, num_samples, latent_dim)

print("\n\n ****************************************")
print("synthetic data shape of class O:\n")
print(synthetic_ecg_data.shape)


print("\n\n ****************************************")
print("Original data shape of class O:\n")
print(class_data.shape)

print("\n\n ****************************************")
print("all data shape of class O: \n")
fourth_class_data = np.concatenate((synthetic_ecg_data, class_data), axis=0)
print(fourth_class_data.shape)

# Merge all data

In [1]:
import pandas as pd

#  shapes of our class data arrays are as follows
# first_class_data.shape -> (5050, 9000) for class 'N'
# second_class_data.shape -> (5050, 9000) for class 'A'
# third_class_data.shape -> (5050, 9000) for class '~'
# fourth_class_data.shape -> (5050, 9000) for class 'O'

merge_data = np.concatenate((first_class_data, second_class_data, third_class_data, fourth_class_data), axis=0)
print(merge_data.shape)


# Create a list of labels corresponding to each class
labels = ['N'] * 5050 + ['A'] * 5050 + ['~'] * 5050 + ['O'] * 5050

# Create a DataFrame for labels
df_labels = pd.DataFrame(labels, columns=['label'])

# If you want to include an identifier for each sample, you can do so as follows:
df_labels['file_name'] = range(1, len(df_labels) + 1)

# Display the first few rows of the DataFrame
print(df_labels.shape)

# Save all data in new directory

In [1]:
import os
import scipy.io
import pandas as pd

# Define the new directory path
new_dir = '/content/final_data_directory'

# Create the new directory if it doesn't already exist
if not os.path.exists(new_dir):
    os.makedirs(new_dir)

# Save each sample in merge_data as a .mat file in the new directory
for i, data in enumerate(merge_data):
    file_path = os.path.join(new_dir, f'{i+1}.mat')
    scipy.io.savemat(file_path, {'data': data.reshape((9000, 1))})  # Reshaping data to match expected input shape for CNN-LSTM

# Save the labels DataFrame as a CSV file in the new directory
labels_csv_path = os.path.join(new_dir, 'class_labels.csv')
df_labels.to_csv(labels_csv_path, index=False)

print(f"All data has been saved to {new_dir}")
