#Get Necessary Packages

In [None]:
import os
import numpy as np
import shutil
from scipy.io import loadmat
from scipy.signal import cheby1, filtfilt, resample
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Define Main Preprocessing Functions

In [None]:
def chebyshev_bandpass(lowcut, highcut, fs, order=4, rp=0.5):
    """
    Design a Chebyshev Type I bandpass filter.

    Parameters:
    - lowcut: Low frequency cut-off for the bandpass filter.
    - highcut: High frequency cut-off for the bandpass filter.
    - fs: Sampling frequency of the EEG data.
    - order: The order of the filter (default: 4).
    - rp: Maximum ripple in the passband (default: 0.5 dB).

    Returns:
    - b, a: Numerator (b) and denominator (a) polynomials of the filter.
    """
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = cheby1(order, rp, [low, high], btype='band')
    return b, a

In [None]:
def preprocess_single_subject(file_name, data_dir,output_dir,training_dir,test_dir,val_dir,
                              n_channels=64, n_timepoints=1500, n_classes=40,
                              lowcut=6.0, highcut=90.0, fs=1000,target_fs=250, window_length=250):
    """
    Preprocess EEG data for a single subject, segment into 250-sample windows, and save as .npy files.

    Parameters:
    - data_dir: Directory containing the .mat file.
    - file_name: Name of the .mat file (e.g., "1.mat").
    - output_dir: Directory to save processed .npy files.
    - n_channels: Number of EEG channels (default: 64).
    - n_timepoints: Number of time points per trial (default: 1500).
    - n_classes: Number of target classes (default: 40).
    - lowcut: Low frequency cut-off for Chebyshev bandpass filter (default: 6 Hz).
    - highcut: High frequency cut-off for Chebyshev bandpass filter (default: 90 Hz).
    - fs: Original sampling frequency of the EEG data (default: 1000 Hz).
    - target_fs: Target sampling frequency after downsampling (default: 250 Hz).
    - window_length: Length of each segment window in samples (default: 250 samples for 1 second).

    Saves:
    - Individual .npy files for each segment.
    """

    # Create output directory for the subject
    subject_output_dir = os.path.join(output_dir, file_name.split('.')[0])
    os.makedirs(subject_output_dir, exist_ok=True)

    # Load .mat file
    mat_data = loadmat(os.path.join(data_dir, file_name.split('.')[0]))

    # Assuming data is stored in 'data' variable with shape [64, 1500, 40, 6]
    eeg_data = mat_data['data']  # Shape: [64, 1500, 40, 6]
    b, a = chebyshev_bandpass(lowcut, highcut, fs, order=4)  # Define Chebyshev filter

    file_counter = 1  # Counter for saved files

    # Process each trial
    for target_idx in range(eeg_data.shape[2]):  # Iterate over 40 target classes
        for block_idx in range(eeg_data.shape[3]):  # Iterate over 6 blocks per target
            trial_data = eeg_data[:, :, target_idx, block_idx]  # Shape: [64, 1500]

            # Apply bandpass filter
            trial_data_filtered = np.zeros_like(trial_data)
            for ch in range(n_channels):
                trial_data_filtered[ch, :] = filtfilt(b, a, trial_data[ch, :])

            # Downsample to target sampling rate
            trial_data_downsampled = resample(trial_data_filtered,
                                              int(trial_data_filtered.shape[1] * target_fs / fs),
                                              axis=1)  # Shape: [64, new_timepoints]

            # Segment the trial data into 250-sample windows
            for start in range(0, trial_data_downsampled.shape[1] - window_length + 1, window_length):
                segment = trial_data_downsampled[:, start:start + window_length]  # Shape: [64, 250]

                # Normalize each channel
                segment_normalized = (segment - segment.mean(axis=1, keepdims=True)) / segment.std(axis=1, keepdims=True)

                # Save segment as .npy file
                save_path = os.path.join(subject_output_dir, f"{file_name.split('.')[0]}_{file_counter}.npy")
                np.save(save_path, {
                    'data': segment_normalized,  # Shape: [64, 250]
                    'label': target_idx + 1,  # Target label (1-40)
                    'block': block_idx + 1,  # Block index (1-6)
                    'segment_id': file_counter,  # Segment ID
                    'subject_id': file_name.split('.')[0]  # Subject ID from filename
                })
                file_counter += 1

    print(f"Processed and saved {file_counter - 1} files for subject {file_name.split('.')[0]} in {subject_output_dir}")

    all_files = [os.path.join(subject_output_dir, f) for f in os.listdir(subject_output_dir) if f.endswith('.npy')]
    train_files, temp_files = train_test_split(all_files, test_size=0.2)
    val_files, test_files = train_test_split(temp_files, test_size=0.5)

    # Move files to corresponding folders
    for f in train_files:
        shutil.move(f, os.path.join(training_dir, os.path.basename(f)))
    for f in val_files:
        shutil.move(f, os.path.join(val_dir, os.path.basename(f)))
    for f in test_files:
        shutil.move(f, os.path.join(test_dir, os.path.basename(f)))

    print(f"Split {len(all_files)} files into training ({len(train_files)}), val ({len(val_files)}), and test ({len(test_files)}) folders.")


# Run Preprocessing

In [None]:
# Use this if datasets are stored in Google Drive
import google.colab.drive
google.colab.drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [None]:
# Choose data_dir for raw Benchmark data
data_dir = '/content/drive/MyDrive/11785-IDL-Project-Team19/data_1s/raw'

# Choose output_dir for preprocessed data
output_dir = '/content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/'

# Choose directories for the partitioned train-val-test datasets
train_dir='/content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/train'
test_dir='/content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/test'
val_dir='/content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/val'
mat_files = [f for f in os.listdir(data_dir) if f.endswith('.mat')]
for file_name in mat_files:
    preprocess_single_subject(file_name,data_dir , output_dir,train_dir, test_dir,val_dir)

Processed and saved 240 files for subject S2 in /content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/S2
Split 240 files into training (192), val (24), and test (24) folders.
Processed and saved 240 files for subject S3 in /content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/S3
Split 240 files into training (192), val (24), and test (24) folders.
Processed and saved 240 files for subject S4 in /content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/S4
Split 240 files into training (192), val (24), and test (24) folders.
Processed and saved 240 files for subject S6 in /content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/S6
Split 240 files into training (192), val (24), and test (24) folders.
Processed and saved 240 files for subject S7 in /content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/prep/S7
Split 240 files into training (192), val (24), and test (24) folders.
Processed and saved 240 files for subject S8 in /content/drive/MyDrive/11785-IDL-Project-T

In [None]:
# Test that preprocessing worked
directory = '/content/drive/MyDrive/11785-IDL-Project-Team19/data_4s/train'
file_count = sum(1 for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file)))
print(f"Total number of files in the directory: {file_count}")

Total number of files in the directory: 6336
