In [4]:
import numpy as np
import wfdb
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import os

class ECGDataset(Dataset):
    def __init__(self, raw_signals, noisy_signals):
        self.raw_signals = raw_signals
        self.noisy_signals = noisy_signals

    def __len__(self):
        return len(self.raw_signals)

    def __getitem__(self, idx):
        raw_signal = self.raw_signals[idx]
        noisy_signal = self.noisy_signals[idx]
        return torch.tensor(raw_signal, dtype=torch.float32), torch.tensor(noisy_signal, dtype=torch.float32)

def add_noise(signal, noise, snr):
    signal_power = np.mean(signal ** 2)
    noise_power = np.mean(noise ** 2)
    factor = (signal_power / noise_power) / (10 ** (snr / 10))
    noisy_signal = signal + noise * np.sqrt(factor)
    return noisy_signal

def load_mit_bih_data(records, noise_type, snr_levels, target_length=650000):
    raw_signals = []
    noisy_signals_dict = {snr: [] for snr in snr_levels}
    
    for record in records:
        raw_record = wfdb.rdrecord(f'M:\\Dissertation\\New folder\\mit-bih-arrhythmia-database-1.0.0/{record}')
        raw_signal = raw_record.p_signal[:, 0]  # Use the first channel for simplicity
        
        # Load noise and add it to the raw signal
        noise_record = wfdb.rdrecord(f'M:\\Dissertation\\New folder\\mit-bih-noise-stress-test-database-1.0.0/{noise_type}')
        noise_signal = noise_record.p_signal[:, 0]
        
        # Ensure the signals are of the same length
        min_length = min(len(raw_signal), len(noise_signal), target_length)
        raw_signal = raw_signal[:min_length]
        noise_signal = noise_signal[:min_length]
        
        # Pad signals to target length
        if min_length < target_length:
            raw_signal = np.pad(raw_signal, (0, target_length - min_length), 'constant')
            noise_signal = np.pad(noise_signal, (0, target_length - min_length), 'constant')
        
        raw_signals.append(raw_signal)
        
        for snr in snr_levels:
            noisy_signal = add_noise(raw_signal, noise_signal, snr)
            noisy_signals_dict[snr].append(noisy_signal)
    
    return np.array(raw_signals), {snr: np.array(noisy_signals_dict[snr]) for snr in snr_levels}

# Select records and noise types for the experiment
records = ['103', '105', '111', '116', '122', '205', '213', '219', '223', '230']
noise_types = ['em', 'bw', 'ma']
combined_noise_types = ['em+bw', 'ma+em', 'ma+em+bw']
snr_levels = [0, 1, 2, 3, 4, 5]
target_length = 649984

raw_signals, noisy_signals_dict = {}, {}
for noise_type in noise_types:
    raw_signals[noise_type], noisy_signals_dict[noise_type] = load_mit_bih_data(records, noise_type, snr_levels, target_length)

# For combined noise types, combine the corresponding noises
for combined_noise in combined_noise_types:
    components = combined_noise.split('+')
    combined_raw_signals, combined_noisy_signals = [], {snr: [] for snr in snr_levels}
    for i in range(len(records)):
        combined_signal = np.zeros(target_length)
        for component in components:
            raw_signal = raw_signals[component][i]
            combined_signal += raw_signal / len(components)  # Average the signals
        combined_raw_signals.append(combined_signal)
        for snr in snr_levels:
            combined_noise_signal = np.zeros(target_length)
            for component in components:
                noise_signal = noisy_signals_dict[component][snr][i]
                combined_noise_signal += noise_signal / len(components)  # Average the noises
            combined_noisy_signals[snr].append(combined_noise_signal)
    raw_signals[combined_noise] = np.array(combined_raw_signals)
    noisy_signals_dict[combined_noise] = {snr: np.array(combined_noisy_signals[snr]) for snr in snr_levels}

# Create datasets and dataloaders
datasets = {}
for noise_type in noise_types + combined_noise_types:
    for snr in snr_levels:
        datasets[(noise_type, snr)] = ECGDataset(raw_signals[noise_type], noisy_signals_dict[noise_type][snr])

dataloaders = {key: DataLoader(dataset, batch_size=16, shuffle=True) for key, dataset in datasets.items()}


In [5]:
def calculate_snr(original, denoised):
    noise = original - denoised
    snr = 10 * np.log10(np.sum(original**2) / np.sum(noise**2))
    return snr

def calculate_rmse(original, denoised):
    mse = np.mean((original - denoised) ** 2)
    rmse = np.sqrt(mse)
    return rmse


In [6]:
import pandas as pd

def train(generator, discriminator, dataloaders, num_epochs=5, lr=0.0002):
    criterion = nn.BCELoss()
    optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))
    
    results = {'Noise_Type': [], 'SNR_Level': [], 'Epoch': [], 'Batch': [], 'D_loss': [], 'G_loss': [], 'SNR': [], 'RMSE': []}
    
    for (noise_type, snr), dataloader in dataloaders.items():
        for epoch in range(num_epochs):
            for i, (raw_signals, noisy_signals) in enumerate(dataloader):
                batch_size = raw_signals.size(0)
                
                # Ensure the signals have the same length
                min_length = min(raw_signals.shape[-1], noisy_signals.shape[-1])
                raw_signals = raw_signals[:, :min_length]
                noisy_signals = noisy_signals[:, :min_length]

                # Denoise the noisy signals
                noisy_signals = noisy_signals.unsqueeze(1)  # Add channel dimension
                raw_signals = raw_signals.unsqueeze(1)

                # Train Generator
                optimizer_G.zero_grad()
                gen_signals = generator(noisy_signals)
                
                # Update valid and fake labels to match the discriminator output size
                disc_output_size = discriminator(torch.cat((gen_signals, noisy_signals), 1)).size()
                valid = torch.ones(disc_output_size).to(gen_signals.device)
                fake = torch.zeros(disc_output_size).to(gen_signals.device)
                
                g_loss = criterion(discriminator(torch.cat((gen_signals, noisy_signals), 1)), valid)
                g_loss.backward()
                optimizer_G.step()

                # Train Discriminator
                optimizer_D.zero_grad()
                real_loss = criterion(discriminator(torch.cat((raw_signals, noisy_signals), 1)), valid)
                fake_loss = criterion(discriminator(torch.cat((gen_signals.detach(), noisy_signals), 1)), fake)
                d_loss = (real_loss + fake_loss) / 2
                d_loss.backward()
                optimizer_D.step()
                
                # Calculate SNR and RMSE
                snr_value = calculate_snr(raw_signals.squeeze().cpu().numpy(), gen_signals.squeeze().cpu().detach().numpy())
                rmse_value = calculate_rmse(raw_signals.squeeze().cpu().numpy(), gen_signals.squeeze().cpu().detach().numpy())

                # Store results
                results['Noise_Type'].append(noise_type)
                results['SNR_Level'].append(snr)
                results['Epoch'].append(epoch + 1)
                results['Batch'].append(i + 1)
                results['D_loss'].append(d_loss.item())
                results['G_loss'].append(g_loss.item())
                results['SNR'].append(snr_value)
                results['RMSE'].append(rmse_value)
                
                print(f"[{noise_type} SNR {snr}] [Epoch {epoch + 1}/{num_epochs}] [Batch {i + 1}/{len(dataloader)}] [D loss: {d_loss.item()}] [G loss: {g_loss.item()}] [SNR: {snr_value}] [RMSE: {rmse_value}]")

    return results

# Initialize models
generator = Generator()
discriminator = Discriminator()

# Train the models and collect results
results = train(generator, discriminator, dataloaders)

# Create a DataFrame to display results
df_results = pd.DataFrame(results)
print(df_results)

# Display the DataFrame as a table
import ace_tools as tools; tools.display_dataframe_to_user(name="Denoising Results of Proposed Method", dataframe=df_results)

NameError: name 'Generator' is not defined