<a href="https://colab.research.google.com/github/savindu29/NeuralNet/blob/main/2dcnnlstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [50]:
%%capture
!pip install mne
!cp /content/drive/MyDrive/nuralnet/BCICIV_2a_gdf.zip /content
!unzip /content//BCICIV_2a_gdf.zip -d data

In [3]:
import os
import numpy as np
import mne
from mne.preprocessing import ICA
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import StandardScaler


In [4]:

class EEGPreprocessor:
    def __init__(self, l_freq=8.0, h_freq=30.0, notch_freq=50, tmin=0, tmax=2, overlap=0.5):
        """
        Initialize the EEG Preprocessor with adjustable parameters.
        :param l_freq: Lower bound of the bandpass filter (Hz).
        :param h_freq: Upper bound of the bandpass filter (Hz).
        :param notch_freq: Frequency to apply the notch filter (e.g., 50 or 60 Hz).
        :param tmin: Start time for epoching (seconds).
        :param tmax: End time for epoching (seconds).
        :param overlap: Overlap ratio for epoching (0.0 to 1.0).
        """
        self.l_freq = l_freq
        self.h_freq = h_freq
        self.notch_freq = notch_freq
        self.tmin = tmin
        self.tmax = tmax
        self.overlap = overlap

    def read_data(self, path):
        """
        Load and preprocess EEG data from a GDF file, including filtering, applying ICA,
        epoching, and extracting spectral features.
        """
        # Load raw EEG data from GDF file
        raw = mne.io.read_raw_gdf(path, preload=True)

        # Apply bandpass filter to remove frequencies outside the EEG range
        raw = raw.filter(l_freq=self.l_freq, h_freq=self.h_freq)

        # Apply notch filter to remove power line noise (50Hz or 60Hz depending on region)
        raw.notch_filter(freqs=self.notch_freq)

        # Apply ICA for artifact rejection (eye blinks, muscle artifacts, etc.)
        ica = ICA(n_components=20, random_state=97, max_iter=800)
        ica.fit(raw)

        # Dynamically detect and handle EOG channels if present
        try:
            eog_indices, scores = ica.find_bads_eog(raw)
            print(f"EOG component indices identified: {eog_indices}")
            ica.exclude = eog_indices
        except RuntimeError as e:
            print(f"Warning: No EOG channels found. Skipping EOG artifact removal. Error: {e}")

        # Apply ICA to remove the artifacts (based on the ICA components)
        raw_cleaned = ica.apply(raw)

        # Set the EEG reference (common average reference)
        raw_cleaned.set_eeg_reference()

        # Extract events and annotations
        events, _ = mne.events_from_annotations(raw_cleaned)

        # Print out available event IDs for debugging
        print(f"Available events for {path}: {set(events[:, -1])}")

        # Dynamically set valid event IDs based on the available events in the data
        available_event_ids = set(events[:, -1])
        valid_event_ids = list(available_event_ids)

        if not valid_event_ids:
            raise ValueError(f"No valid event IDs found in the data for {path}. Available event IDs: {available_event_ids}")

        # Create epochs with overlap based on valid event IDs
        epochs = mne.make_fixed_length_epochs(
            raw_cleaned, duration=(self.tmax - self.tmin), overlap=self.overlap, preload=True
        )

        # Get labels and features
        labels = epochs.events[:, -1]  # Last column contains labels
        features = epochs.get_data()   # EEG data from the epochs

        return features, labels

    def process_multiple_files(self, directory_path):
        """
        Process only motor imagery (task) .gdf files in the given directory.
        Filters out resting (eyes-closed) files and processes only task files.
        """
        features = []
        labels = []
        groups = []

        for filename in os.listdir(directory_path):
            if filename.endswith('.gdf') and 'T' in filename:  # Only motor imagery files
                file_path = os.path.join(directory_path, filename)
                print(f"Processing file: {file_path}")
                feature, label = self.read_data(file_path)
                features.append(feature)
                labels.append(label)
                subject_group = filename[:3]  # Extract subject group (e.g., 'A01')
                groups.append([subject_group] * len(label))

        features = np.concatenate(features, axis=0)
        labels = np.concatenate(labels, axis=0)
        groups = np.concatenate(groups, axis=0)

        # Normalize the features (standardization)
        features = self.standardize_data(features)

        # Encode labels to integers
        label_encoder = LabelEncoder()
        labels = label_encoder.fit_transform(labels)

        # Convert labels to one-hot encoding for multi-class classification
        labels = to_categorical(labels)

        # Reshape features for CNN-LSTM hybrid input
        features = features.reshape(features.shape[0], features.shape[1], features.shape[2], 1)

        return features, labels, groups

    def standardize_data(self, data):
        """
        Standardize the EEG data: mean=0, std=1 across each feature (channel).
        This helps normalize the signal amplitude and speed up training convergence.
        """
        num_samples, num_channels, num_time_points = data.shape
        standardized_data = np.zeros_like(data)

        # Apply standardization across each channel
        for i in range(num_channels):
            for j in range(num_samples):
                standardized_data[j, i, :] = (data[j, i, :] - np.mean(data[j, i, :])) / np.std(data[j, i, :])

        return standardized_data

    def extract_psd_features(self, raw_data):
        """
        Extract Power Spectral Density (PSD) features from the raw EEG data.
        This provides a frequency-domain representation of the EEG signal.
        """
        psd, freqs = mne.time_frequency.psd_welch(raw_data, fmin=self.l_freq, fmax=self.h_freq)
        return psd

    def extract_connectivity_features(self, raw_data):
        """
        Extract functional connectivity features such as coherence or correlation between EEG channels.
        This measures how synchronously different channels are working together.
        """
        # For simplicity, let's extract coherence between pairs of channels (you can extend to other measures)
        connectivity_matrix = mne.connectivity.envelope_correlation(raw_data.get_data())
        return connectivity_matrix


In [5]:
%%capture

data_directory = '/content/data'  # Path to the directory containing .gdf files
preprocessor = EEGPreprocessor()

# Process all .gdf files in a directory
features, labels, groups = preprocessor.process_multiple_files(data_directory)



In [6]:
# Define the path to save processed data
save_path = '/content/eeg_preprocessed_data'  # Local path in Colab

# Ensure the directory exists
os.makedirs(save_path, exist_ok=True)

# Save the processed data to disk
np.savez_compressed(f"{save_path}/eeg_data.npz", features=features, labels=labels, groups=groups)
print("Preprocessed data saved to disk.")

Preprocessed data saved to disk.


In [8]:
# To load the saved data in future sessions, use the following:
loaded_data = np.load(f"{save_path}/eeg_data.npz")
features = loaded_data['features']
labels = loaded_data['labels']
groups = loaded_data['groups']

# Print shapes to verify data loading
print(f"Features shape: {features.shape}")
print(f"Labels shape: {labels.shape}")
print(f"Groups shape: {groups.shape}")

Features shape: (16010, 25, 500, 1)
Labels shape: (16010, 1)
Groups shape: (16010,)


In [9]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Dropout, Flatten, Reshape, BatchNormalization
from tensorflow.keras.optimizers import Adam

def create_cnn_lstm_model(input_shape, num_classes):
    model = Sequential()

    # 2D Convolutional Layer
    model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.3))  # Reduce overfitting

    # Additional Convolutional Layer
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.3))

    # Additional Convolutional Layer for deeper feature extraction
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.4))

    # Flatten the output and prepare for LSTM
    model.add(Flatten())
    model.add(Reshape((-1, 128)))  # Reshape for LSTM (batch_size, time_steps, features)

    # LSTM Layer
    model.add(LSTM(128, return_sequences=False, activation='tanh'))
    model.add(Dropout(0.5))  # Dropout to prevent overfitting

    # Fully connected output layer for multi-class classification
    model.add(Dense(num_classes, activation='softmax'))

    # Compile the model with categorical cross-entropy loss for multi-class classification
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

    return model


In [None]:
from sklearn.model_selection import train_test_split

# Assuming features and labels are already loaded as NumPy arrays

# Set input shape for the model
input_shape = features.shape[1:]  # (time_steps, channels, 1)
num_classes = labels.shape[1]     # Number of classes

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

# Create the model
model = create_cnn_lstm_model(input_shape, num_classes)

# Train the model
history = model.fit(X_train, y_train, epochs=30, batch_size=64, validation_data=(X_test, y_test))


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/30


  return self.fn(y_true, y_pred, **self._fn_kwargs)


[1m 63/201[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m7:09[0m 3s/step - accuracy: 1.0000 - loss: 0.0000e+00