In [2]:
import numpy as np

# Define file paths (update if needed)
linear_synthetic_eeg_eog_path = "/home/tulgaa/Desktop/denoisenet/Linear_Mixing/EEG+EOG/Linear_synthetic_eeg_eog.npy"
eeg_all_epochs_path = "/home/tulgaa/Desktop/denoisenet/Linear_Mixing/EEG+EOG/EEG_all_epochs.npy"

# Load datasets
linear_synthetic_data = np.load(linear_synthetic_eeg_eog_path, allow_pickle=True).item()
eeg_clean_data = np.load(eeg_all_epochs_path)

# Print dataset keys (SNR levels)
print("Noisy EEG+EOG SNR Levels:", linear_synthetic_data.keys())

# Check dataset shapes
print("Shape of Clean EEG Data:", eeg_clean_data.shape)
for snr in linear_synthetic_data.keys():
    print(f"Shape of Noisy EEG+EOG Data at SNR {snr}: {linear_synthetic_data[snr].shape}")


Noisy EEG+EOG SNR Levels: dict_keys([-7, -6, -5, -4, -3, -2, -1, 0, 1, 2])
Shape of Clean EEG Data: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -7: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -6: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -5: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -4: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -3: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -2: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR -1: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR 0: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR 1: (3400, 512)
Shape of Noisy EEG+EOG Data at SNR 2: (3400, 512)


In [3]:
from sklearn.model_selection import train_test_split
import numpy as np

# Initialize lists to store training and testing data
X_train, X_test, Y_train, Y_test = [], [], [], []

# Loop through each SNR level and perform train-test split
for snr in linear_synthetic_data.keys():
    noisy_signals = linear_synthetic_data[snr]  # Noisy EEG+EOG
    clean_signals = eeg_clean_data  # Ground truth (same for all SNRs)

    # Train-test split (80% train, 20% test)
    X_train_snr, X_test_snr, Y_train_snr, Y_test_snr = train_test_split(
        noisy_signals, clean_signals, test_size=0.2, random_state=42
    )

    # Append to lists
    X_train.append(X_train_snr)
    X_test.append(X_test_snr)
    Y_train.append(Y_train_snr)
    Y_test.append(Y_test_snr)

# Convert lists to numpy arrays
X_train = np.concatenate(X_train, axis=0)
X_test = np.concatenate(X_test, axis=0)
Y_train = np.concatenate(Y_train, axis=0)
Y_test = np.concatenate(Y_test, axis=0)

# Print final dataset sizes
print("Final Dataset Sizes:")
print("X_train (Noisy):", X_train.shape)
print("Y_train (Clean):", Y_train.shape)
print("X_test  (Noisy):", X_test.shape)
print("Y_test  (Clean):", Y_test.shape)


Final Dataset Sizes:
X_train (Noisy): (27200, 512)
Y_train (Clean): (27200, 512)
X_test  (Noisy): (6800, 512)
Y_test  (Clean): (6800, 512)


In [4]:
import torch
import torch.nn as nn

# Corrected 1D-ResCNN Model
class ResCNN1D(nn.Module):
    def __init__(self):
        super(ResCNN1D, self).__init__()

        # Parallel Convolutional Layers (Different Kernel Sizes)
        self.conv3 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=5, stride=1, padding=2)
        self.conv9 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=9, stride=1, padding=4)

        # Batch Normalization
        self.bn3 = nn.BatchNorm1d(64)
        self.bn5 = nn.BatchNorm1d(64)
        self.bn9 = nn.BatchNorm1d(64)

        # Activation Function
        self.relu = nn.ReLU()

        # Residual Connection (Merging Parallel Conv Outputs)
        self.res_conv = nn.Conv1d(in_channels=192, out_channels=64, kernel_size=1, stride=1)

        # Second Convolutional Block (Refining Features)
        self.conv_out = nn.Conv1d(in_channels=64, out_channels=1, kernel_size=9, stride=1, padding=4)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension (Batch, 1, 512)

        # Apply Parallel Convolutions
        x3 = self.relu(self.bn3(self.conv3(x)))
        x5 = self.relu(self.bn5(self.conv5(x)))
        x9 = self.relu(self.bn9(self.conv9(x)))

        # Concatenate Parallel Outputs
        x = torch.cat([x3, x5, x9], dim=1)  # Shape: (Batch, 192, 512)

        # Residual Connection (Merging Features)
        x = self.res_conv(x)  # Shape: (Batch, 64, 512)

        # Output Layer
        x = self.conv_out(x)  # Shape: (Batch, 1, 512)
        x = x.squeeze(1)  # Remove channel dimension (Back to (Batch, 512))
        
        return x

# Initialize Model and Print Summary
model = ResCNN1D()
print(model)


ResCNN1D(
  (conv3): Conv1d(1, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv5): Conv1d(1, 64, kernel_size=(5,), stride=(1,), padding=(2,))
  (conv9): Conv1d(1, 64, kernel_size=(9,), stride=(1,), padding=(4,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn5): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn9): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
  (res_conv): Conv1d(192, 64, kernel_size=(1,), stride=(1,))
  (conv_out): Conv1d(64, 1, kernel_size=(9,), stride=(1,), padding=(4,))
)


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np

# Define EEG dataset class
class EEGDataset(Dataset):
    def __init__(self, X, Y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.Y = torch.tensor(Y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

# Convert data to PyTorch tensors
train_dataset = EEGDataset(X_train, Y_train)
test_dataset = EEGDataset(X_test, Y_test)

# Create DataLoaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResCNN1D().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Early stopping parameters
patience = 5  # Stop training if validation loss does not improve for 'patience' epochs
min_val_loss = float('inf')
patience_counter = 0

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    train_loss = 0

    for X_batch, Y_batch in train_loader:
        X_batch, Y_batch = X_batch.to(device), Y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, Y_batch)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    # Compute average training loss
    train_loss /= len(train_loader)

    # Validation Phase
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_val, Y_val in test_loader:
            X_val, Y_val = X_val.to(device), Y_val.to(device)
            val_outputs = model(X_val)
            val_loss += criterion(val_outputs, Y_val).item()

    # Compute average validation loss
    val_loss /= len(test_loader)

    # Print epoch results
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.6f} - Val Loss: {val_loss:.6f}")

    # Early stopping check
    if val_loss < min_val_loss:
        min_val_loss = val_loss
        patience_counter = 0  # Reset counter
        torch.save(model.state_dict(), "1D_ResCNN_Best.pth")  # Save best model
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("Early stopping triggered! 🚀")
        break

print("Training complete! Best model saved as '1D_ResCNN_Best.pth'.")


NameError: name 'X_train' is not defined

In [4]:
import torch
import numpy as np
from scipy.fft import fft
import pandas as pd

# Load the best-trained model (with early stopping)
model.load_state_dict(torch.load("1D_ResCNN_Best.pth"))
model.eval()

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Storage for results per SNR
snr_results = {}

# Loop over each SNR level and compute evaluation metrics
for snr in sorted(linear_synthetic_data.keys()):  # Ensure SNR levels are in order
    print(f"Evaluating SNR Level: {snr}")

    # Get noisy signals at this SNR
    X_test_snr = linear_synthetic_data[snr]
    Y_test_snr = eeg_clean_data  # Clean EEG remains the same

    # Convert to PyTorch tensors
    X_test_tensor = torch.tensor(X_test_snr, dtype=torch.float32).to(device)
    Y_test_tensor = torch.tensor(Y_test_snr, dtype=torch.float32).to(device)

    # Get model predictions
    with torch.no_grad():
        Y_pred_tensor = model(X_test_tensor)

    # Convert tensors to numpy for evaluation
    Y_clean = Y_test_tensor.cpu().numpy()
    Y_denoised = Y_pred_tensor.cpu().numpy()

    # Compute RRMSE_Temporal (Time Domain)
    rrmse_t = np.linalg.norm(Y_clean - Y_denoised) / np.linalg.norm(Y_clean)

    # Compute RRMSE_Spectral (Frequency Domain)
    clean_fft = np.abs(fft(Y_clean, axis=-1))  # Compute FFT of clean EEG
    denoised_fft = np.abs(fft(Y_denoised, axis=-1))  # Compute FFT of denoised EEG
    rrmse_s = np.linalg.norm(clean_fft - denoised_fft) / np.linalg.norm(clean_fft)

    # Compute Correlation Coefficient (CC)
    num = np.sum((Y_clean - np.mean(Y_clean)) * (Y_denoised - np.mean(Y_denoised)))
    den = np.sqrt(np.sum((Y_clean - np.mean(Y_clean))**2) * np.sum((Y_denoised - np.mean(Y_denoised))**2))
    cc = num / den

    # Compute T&S Metric
    T_S = 10 * np.log10(np.sum(Y_clean**2) / np.sum((Y_clean - Y_denoised)**2))

    # Store results
    snr_results[snr] = {
        "RRMSE_T (Temporal)": rrmse_t,
        "RRMSE_S (Spectral)": rrmse_s,
        "CC (Correlation Coefficient)": cc,
        "T&S Metric (dB)": T_S
    }

    # Print results for this SNR
    print(f"  RRMSE_T: {rrmse_t:.4f}")
    print(f"  RRMSE_S: {rrmse_s:.4f}")
    print(f"  Correlation Coefficient (CC): {cc:.4f}")
    print(f"  T&S Metric: {T_S:.4f} dB\n")

# Convert results to Pandas DataFrame for easy viewing
df_results = pd.DataFrame.from_dict(snr_results, orient="index")
df_results.index.name = "SNR Level"
df_results = df_results.sort_index()  # Sort by SNR level

# Compute Average RRMSE_T and RRMSE_S across all SNR levels
avg_rrmse_t = df_results["RRMSE_T (Temporal)"].mean()
avg_rrmse_s = df_results["RRMSE_S (Spectral)"].mean()

# Print Final Summary
print("\n==== Final Evaluation Summary ====")
print(f"Average RRMSE_T (Temporal): {avg_rrmse_t:.4f}")
print(f"Average RRMSE_S (Spectral): {avg_rrmse_s:.4f}")

# Display final results as a table
print("\n==== Evaluation Results per SNR ====")
print(df_results)

NameError: name 'model' is not defined