In [None]:
from google.colab import drive
drive.mount('/content/drive')
!pip install mne
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, iirnotch, decimate
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, Dropout, Dense, BatchNormalization
from sklearn.model_selection import KFold
from tensorflow.keras.regularizers import l2

# Constants
NUM_CHANNELS = 16  # Multi-channel EEG
ORIGINAL_SAMPLING_RATE = 1000  # Hz
DOWNSAMPLED_RATE = 250  # Hz
SEGMENT_DURATION_SEC = 30
N_SAMPLES_PER_SEGMENT = SEGMENT_DURATION_SEC * DOWNSAMPLED_RATE
TRANSIENT_REMOVAL_SAMPLES = 1000
TOTAL_RECORDING_TIME_SEC = 600  # 5 minutes

# Directories
DATA_DIR = "/content/drive/MyDrive/UBCData"
LABELS = {"Female": 1, "Males": 0}

def read_eeg_files(base_folder):
    all_files = []
    for label, value in LABELS.items():
        folder_path = os.path.join(base_folder, label)
        if os.path.exists(folder_path):
            for file in os.listdir(folder_path):
                if file.endswith(".txt"):
                    file_path = os.path.join(folder_path, file)
                    all_files.append((file_path, value))
    return all_files

def butter_bandpass_filter(data, lowcut=1, highcut=50, fs=DOWNSAMPLED_RATE, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data, axis=1)  # Process all channels

def notch_filter(data, freq=60, fs=DOWNSAMPLED_RATE, quality=30):
    nyquist = 0.5 * fs
    notch_freq = freq / nyquist
    b, a = iirnotch(notch_freq, quality)
    return filtfilt(b, a, data, axis=1)  # Process all channels

def remove_transients(data, transient_samples=1000):
    return data[:, transient_samples:-transient_samples] if data.shape[1] > 2 * transient_samples else data

def downsample_signal(signal, original_rate=ORIGINAL_SAMPLING_RATE, target_rate=DOWNSAMPLED_RATE):
    factor = original_rate // target_rate  # 1000 -> 250 → factor = 4
    return decimate(signal, factor, axis=1)

def remove_outliers(signal):
    for ch in range(signal.shape[0]):  # Process each channel separately
        q1, q3 = np.percentile(signal[ch, :], [25, 75])
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        for i in range(1, signal.shape[1] - 1):
            if signal[ch, i] < lower_bound or signal[ch, i] > upper_bound:
                signal[ch, i] = (signal[ch, i - 1] + signal[ch, i + 1]) / 2
    return signal

def rescale_amplitude(signal):
    return (signal - np.mean(signal, axis=1, keepdims=True)) / np.std(signal, axis=1, keepdims=True)

def process_signal(signal):
    signal = downsample_signal(signal)  # Downsample from 1000Hz to 250Hz
    signal = butter_bandpass_filter(signal)
    signal = notch_filter(signal)
    signal = remove_outliers(signal)
    return rescale_amplitude(signal)

def segment_signal(signal, segment_duration=30, fs=DOWNSAMPLED_RATE):
    segments = []
    total_samples = TOTAL_RECORDING_TIME_SEC * fs  # First 5 minutes
    segment_samples = segment_duration * fs  # 90-second segments

    for start in range(0, total_samples - segment_samples + 1, segment_samples):
        segment = signal[:, start:start + segment_samples]
        segments.append(segment.T)  # Transpose to shape (samples, channels)

    return segments

def load_eeg_data(file_path):
    try:
        data = np.genfromtxt(file_path, delimiter=",", dtype=np.float32)
        if data.size == 0 or len(data.shape) == 0:
            raise ValueError("Empty or malformed file")

        if data.shape[1] != NUM_CHANNELS:  # Ensure we have 16 channels
            raise ValueError(f"Skipping {file_path}: Expected {NUM_CHANNELS} channels, found {data.shape[1]}")

        return data.T  # Transpose to (channels, time)

    except Exception as e:
        print(f"Skipping {file_path}: {e}")
        return None

def prepare_data():
    X, y = [], []
    files = read_eeg_files(DATA_DIR)

    for file_path, label in files:
        signal = load_eeg_data(file_path)
        if signal is None:
            continue

        signal = remove_transients(signal)
        processed_signal = process_signal(signal)
        segments = segment_signal(processed_signal)

        for segment in segments:
            if segment.shape == (N_SAMPLES_PER_SEGMENT, NUM_CHANNELS):  # Ensure correct shape
                X.append(segment)
                y.append(label)
            else:
                print(f"Skipping segment with unexpected shape: {segment.shape}")

    if len(X) == 0:
        print("No valid EEG segments were found!")
        return np.array([]), np.array([])

    print(f"Final dataset shape: {len(X)} samples of shape {X[0].shape}")

    return np.array(X, dtype=np.float32), np.array(y, dtype=np.int32)

def cross_validate_model(X_augmented, y_augmented, k=5, epochs=40, batch_size=32):
    """Perform 5-fold cross-validation with LSTM model."""
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    fold_accuracies = []

    for fold, (train_idx, test_idx) in enumerate(kfold.split(X_augmented)):
        print(f"Starting fold {fold + 1}/{k}...")

        X_train, X_test = X_augmented[train_idx], X_augmented[test_idx]
        y_train, y_test = y_augmented[train_idx], y_augmented[test_idx]

        model = Sequential([
            Bidirectional(LSTM(128, return_sequences=True), input_shape=(X_train.shape[1], NUM_CHANNELS)),
            BatchNormalization(),
            Dropout(0.3),
            Bidirectional(LSTM(128)),
            BatchNormalization(),
            Dropout(0.3),
            Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
            Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
            Dense(1, activation='sigmoid', kernel_regularizer=l2(0.01)),
        ])

        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
                      loss='binary_crossentropy',
                      metrics=['accuracy'])

        model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0)

        loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
        print(f"Fold {fold + 1} accuracy: {accuracy * 100:.2f}%")
        fold_accuracies.append(accuracy)

    print(f"Average accuracy across {k} folds: {np.mean(fold_accuracies) * 100:.2f}%")
    return np.mean(fold_accuracies)

def main():
    X, y = prepare_data()
    if len(X) > 0:
        model = cross_validate_model(X, y, k=5, epochs=40, batch_size=16)
        print(f"Training complete. {len(X)} EEG segments used.")
    else:
        print("No valid EEG segments found.")

if __name__ == "__main__":
    main()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Skipping segment with unexpected shape: (3924, 16)
Skipping segment with unexpected shape: (6711, 16)
Skipping segment with unexpected shape: (6699, 16)
Skipping segment with unexpected shape: (6621, 16)
Skipping segment with unexpected shape: (6600, 16)
Skipping segment with unexpected shape: (6718, 16)
Skipping segment with unexpected shape: (6726, 16)
Skipping segment with unexpected shape: (8722, 16)
Skipping segment with unexpected shape: (0, 16)
Skipping segment with unexpected shape: (0, 16)
Skipping segment with unexpected shape: (0, 16)
Skipping segment with unexpected shape: (0, 16)
Skipping segment with unexpected shape: (6440, 16)
Skipping segment with unexpected shape: (6553, 16)
Skipping segment with unexpected shape: (6667, 16)
Skipping segment with unexpected shape: (6902, 16)
Skipping segment with unexpected shape: (6712, 16)
Skipping segment

  super().__init__(**kwargs)


Fold 1 accuracy: 64.52%
Starting fold 2/5...
Fold 2 accuracy: 56.67%
Starting fold 3/5...


In [2]:
!pip install mne

Collecting mne
  Downloading mne-1.9.0-py3-none-any.whl.metadata (20 kB)
Downloading mne-1.9.0-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m59.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mne
Successfully installed mne-1.9.0


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
