<a href="https://colab.research.google.com/github/rohan30497/EEG_Signal_Processing/blob/main/EEG_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
  pip install mne

Collecting mne
  Downloading mne-1.5.1-py3-none-any.whl (7.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m28.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mne
Successfully installed mne-1.5.1


In [None]:
from glob import glob
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mne


In [None]:
all_file_path=glob('/content/drive/MyDrive/data_files/*.set')
print(len(all_file_path))

24


In [None]:
healthy_file_path = [i for i in all_file_path if 'C' in i]
len(healthy_file_path)

4

In [None]:
patient_file_path = [i for i in all_file_path if 'A' in i]
len(patient_file_path)

20

In [None]:
def read_data(file_path):
    data = mne.io.read_raw_eeglab(file_path, preload=True)
    data.set_eeg_reference()
    data.filter(l_freq=0.5, h_freq=45)
    epochs = mne.make_fixed_length_epochs(data, duration=5, overlap=1)
    array = epochs.get_data()
    return array

In [None]:
sample_data = read_data(healthy_file_path[0])


EEG channel type selected for re-referencing
Applying average reference.
Applying a custom ('EEG',) reference.
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 45 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 45.00 Hz
- Upper transition bandwidth: 11.25 Hz (-6 dB cutoff frequency: 50.62 Hz)
- Filter length: 3301 samples (6.602 s)

Not setting metadata
194 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 194 events and 2500 original time points ...
0 bad epochs dropped


[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.3s


In [None]:
print(sample_data.shape) #no.of epochs,no.of channels,length of signal

(194, 19, 2500)


In [None]:
%%capture
control_epochs_array=[read_data(i) for i in healthy_file_path]
patient_epochs_array=[read_data(i) for i in patient_file_path]

In [None]:
import mne

def read_data(file_path):
    try:
        if file_path.endswith('.set'):
            data = mne.io.read_raw_eeglab(file_path, preload=True)
        elif file_path.endswith('.mat'):
            # Read the MATLAB .mat file using mne.io.read_raw_eeglab
            data = mne.io.read_raw_eeglab(file_path, preload=True)

        data.set_eeg_reference()
        data.filter(l_freq=0.5, h_freq=45)
        epochs = mne.make_fixed_length_epochs(data, duration=5, overlap=1)
        array = epochs.get_data()
        return array
    except Exception as e:
        print(f"Error processing file: {file_path}")
        print(f"Error message: {str(e)}")
        return []  # Return an empty list if there is an error


In [None]:
control_epochs_array[0].shape

(194, 19, 2500)

In [None]:
patient_epochs_array[1].shape

(198, 19, 2500)

In [None]:
control_epochs_labels=[len(i)*[0] for i in control_epochs_array]


In [None]:
patient_epochs_labels=[len(i)*[1] for i in patient_epochs_array]

In [None]:
data_list=patient_epochs_array+control_epochs_array
label_list=patient_epochs_labels+control_epochs_labels

In [None]:
group_list=[[i]*len(j) for i,j in enumerate(data_list)]

In [None]:
import numpy as np

# Assuming patient_epochs_array and control_epochs_array contain the EEG data arrays

# Ensure patient_epochs_array contains only 3-dimensional arrays (excluding the first element)
patient_epochs_array_3d = [array for array in patient_epochs_array if isinstance(array, np.ndarray) and array.ndim == 3]

# Ensure control_epochs_array contains only 3-dimensional arrays
control_epochs_array_3d = [array for array in control_epochs_array if array.ndim == 3]

# Combine the patient and control arrays
data_list = patient_epochs_array_3d[0:] + control_epochs_array_3d

# Vertically stack the arrays
data_array = np.vstack(data_list)


In [None]:

label_array=np.hstack(label_list)
group_array=np.hstack(group_list)

In [None]:
data_array.shape,label_array.shape,group_array.shape

((4515, 19, 2500), (4515,), (4515,))

In [None]:
from scipy import stats
import numpy as np

# Function to calculate the mean of the signal
def calculate_mean(signal):
    return np.mean(signal, axis=-1)

# Function to calculate the standard deviation of the signal
def calculate_std(signal):
    return np.std(signal, axis=-1)

# Function to calculate the range (peak-to-peak) of the signal
def calculate_ptp(signal):
    return np.ptp(signal, axis=-1)

# Function to calculate the variance of the signal
def calculate_variance(signal):
    return np.var(signal, axis=-1)

# Function to calculate the minimum value of the signal
def calculate_minimum(signal):
    return np.min(signal, axis=-1)

# Function to calculate the maximum value of the signal
def calculate_maximum(signal):
    return np.max(signal, axis=-1)

# Function to get the index of the minimum value of the signal
def get_argmin(signal):
    return np.argmin(signal, axis=-1)

# Function to get the index of the maximum value of the signal
def get_argmax(signal):
    return np.argmax(signal, axis=-1)

# Function to calculate the root mean square (RMS) of the signal
def calculate_rms(signal):
    return np.sqrt(np.mean(signal**2, axis=-1))

# Function to calculate the absolute difference between consecutive signal values
def calculate_abs_diff_signal(signal):
    return np.sum(np.abs(np.diff(signal, axis=-1)), axis=-1)

# Function to calculate the skewness of the signal
def calculate_skewness(signal):
    return stats.skew(signal, axis=-1)

# Function to calculate the kurtosis of the signal
def calculate_kurtosis(signal):
    return stats.kurtosis(signal, axis=-1)

def concatenate_features(signal):
    # Calculate various features
    mean_features = calculate_mean(signal)
    std_features = calculate_std(signal)
    ptp_features = calculate_ptp(signal)
    variance_features = calculate_variance(signal)
    minimum_features = calculate_minimum(signal)
    maximum_features = calculate_maximum(signal)
    argmin_features = get_argmin(signal)
    argmax_features = get_argmax(signal)
    rms_features = calculate_rms(signal)
    abs_diff_signal_features = calculate_abs_diff_signal(signal)
    skewness_features = calculate_skewness(signal)
    kurtosis_features = calculate_kurtosis(signal)

    # Concatenate all the features using np.concatenate
    concatenated_features = np.concatenate((mean_features, std_features, ptp_features, variance_features, minimum_features,
                                             maximum_features, argmin_features, argmax_features, rms_features,
                                             abs_diff_signal_features, skewness_features, kurtosis_features), axis=-1)
    return concatenated_features

features = []
for d in data_array:
    features.append(concatenate_features(d))


In [None]:
features_array=np.array(features)
features_array.shape

NameError: ignored

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, GroupKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

# Define the LogisticRegression classifier
clf = LogisticRegression()

# Define the GroupKFold cross-validator
gkf = GroupKFold(n_splits=5)

# Define the pipeline with steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', clf)
])

# Define the parameter grid for the GridSearchCV
param_grid = {'clf__C': [0.1, 0.5, 0.7, 1, 3, 7]}

# Create the GridSearchCV object
gscv = GridSearchCV(pipe, param_grid, cv=gkf, n_jobs=12, scoring='accuracy')  # Use the desired scoring metric

# Fit the GridSearchCV with the data
gscv.fit(features_array, label_array, groups=group_array)


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [None]:
label_array.shape

(2729,)

##**Implementation of CNN**

In [None]:
#epochs_array=data_array
#epochs_labels=label_array
#group_array

In [None]:
data_array=np.moveaxis(data_array,1,2)
data_array.shape

(4515, 2500, 19)

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout


model = Sequential()
model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(2500, 19)))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(4, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))
# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
model.summary()



Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d (Conv1D)             (None, 2498, 64)          3712      
                                                                 
 max_pooling1d (MaxPooling1D  (None, 1249, 64)         0         
 )                                                               
                                                                 
 flatten (Flatten)           (None, 79936)             0         
                                                                 
 dense (Dense)               (None, 4)                 319748    
                                                                 
 dropout (Dropout)           (None, 4)                 0         
                                                                 
 dense_1 (Dense)             (None, 1)                 5         
                                                        

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

model_1 = Sequential()
# Block 1
model_1.add(Conv1D(64, kernel_size=3, activation='relu', padding='same', input_shape=(2500, 19)))
model_1.add(Conv1D(64, kernel_size=3, activation='relu', padding='same'))
model_1.add(MaxPooling1D(pool_size=2))

# Block 2
model_1.add(Conv1D(128, kernel_size=3, activation='relu', padding='same'))
model_1.add(Conv1D(128, kernel_size=3, activation='relu', padding='same'))
model_1.add(MaxPooling1D(pool_size=2))

# Block 3
model_1.add(Conv1D(256, kernel_size=3, activation='relu', padding='same'))
model_1.add(Conv1D(256, kernel_size=3, activation='relu', padding='same'))
model_1.add(Conv1D(256, kernel_size=3, activation='relu', padding='same'))
model_1.add(MaxPooling1D(pool_size=2))

model_1.add(Flatten())
model_1.add(Dense(256, activation='relu'))
model_1.add(Dropout(0.5))
model_1.add(Dense(1, activation='sigmoid'))

# Compile the model
model_1.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print model summary
model_1.summary()


Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_1 (Conv1D)           (None, 2500, 64)          3712      
                                                                 
 conv1d_2 (Conv1D)           (None, 2500, 64)          12352     
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 1250, 64)         0         
 1D)                                                             
                                                                 
 conv1d_3 (Conv1D)           (None, 1250, 128)         24704     
                                                                 
 conv1d_4 (Conv1D)           (None, 1250, 128)         49280     
                                                                 
 max_pooling1d_2 (MaxPooling  (None, 625, 128)         0         
 1D)                                                  

In [None]:
from sklearn.model_selection import GroupKFold,LeaveOneGroupOut
from sklearn.preprocessing import StandardScaler
gkf = GroupKFold(n_splits=3)

In [None]:
# Initialize lists to store accuracy values for each split
accuracy_train = []
accuracy_val = []
accuracy_test = []

for train_index, val_test_index in gkf.split(data_array, label_array, group_array):
    # Split into training and validation/test sets
    train_features, train_labels = data_array[train_index], label_array[train_index]
    val_test_features, val_test_labels = data_array[val_test_index], label_array[val_test_index]

    # Further split the validation/test set into validation and test sets
    val_index, test_index = next(GroupKFold(n_splits=2).split(val_test_features, val_test_labels, group_array[val_test_index]))
    val_features, val_labels = val_test_features[val_index], val_test_labels[val_index]
    test_features, test_labels = val_test_features[test_index], val_test_labels[test_index]

    # Standardize the data using the same scaler for training, validation, and test sets
    scaler = StandardScaler()
    train_features = scaler.fit_transform(train_features.reshape(-1, train_features.shape[-1])).reshape(train_features.shape)
    val_features = scaler.transform(val_features.reshape(-1, val_features.shape[-1])).reshape(val_features.shape)
    test_features = scaler.transform(test_features.reshape(-1, test_features.shape[-1])).reshape(test_features.shape)

In [None]:
 # Fit the model on the training data
model.fit(train_features, train_labels, epochs=5, batch_size=50, validation_data=(val_features, val_labels))

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7b8de8a13760>

In [None]:
# Evaluate the model on the validation and test data
_, acc_train = model.evaluate(train_features, train_labels)
_, acc_val = model.evaluate(val_features, val_labels)
_, acc_test = model.evaluate(test_features, test_labels)

accuracy_train.append(acc_train)
accuracy_val.append(acc_val)
accuracy_test.append(acc_test)



Fitting the VGGNet16 architecture

In [None]:
accuracy = []
for train_index, val_index in gkf.split(data_array, label_array, group_array):
    train_features, train_labels = data_array[train_index], label_array[train_index]
    val_features, val_labels = data_array[val_index], label_array[val_index]
    scaler = StandardScaler()
    train_features = scaler.fit_transform(train_features.reshape(-1, train_features.shape[-1])).reshape(train_features.shape)
    val_features = scaler.transform(val_features.reshape(-1, val_features.shape[-1])).reshape(val_features.shape)
        # Fit the model
    model.fit(train_features, train_labels, epochs=5, batch_size=50, validation_data=(val_features, val_labels))
    _, acc = model.evaluate(val_features, val_labels)
    accuracy.append(acc)


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [None]:
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, LSTM
from keras.layers import Reshape
from keras.optimizers import Adam
from keras.models import Sequential


# Now, let's build the combined LSTM-CNN model
combined_model = Sequential()

# Add the CNN layers from your existing model
combined_model.add(model)

# Reshape the output of the CNN to match LSTM input
combined_model.add(Reshape((model.output_shape[1], 1)))

# Add the LSTM layer on top with return_sequences=False and 32 units
combined_model.add(LSTM(32, return_sequences=False))

# Compile the combined model
custom_optimizer = Adam(learning_rate=0.01)
combined_model.compile(optimizer=custom_optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# Print the combined model summary
combined_model.summary()


Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 sequential (Sequential)     (None, 1)                 323465    
                                                                 
 reshape (Reshape)           (None, 1, 1)              0         
                                                                 
 lstm (LSTM)                 (None, 32)                4352      
                                                                 
Total params: 327,817
Trainable params: 327,817
Non-trainable params: 0
_________________________________________________________________


In [None]:
# Initialize lists to store accuracy values for each split
accuracy_train = []
accuracy_val = []
accuracy_test = []

for train_index, val_test_index in gkf.split(data_array, label_array, group_array):
    # Split into training and validation/test sets
    train_features, train_labels = data_array[train_index], label_array[train_index]
    val_test_features, val_test_labels = data_array[val_test_index], label_array[val_test_index]

    # Further split the validation/test set into validation and test sets
    val_index, test_index = next(GroupKFold(n_splits=2).split(val_test_features, val_test_labels, group_array[val_test_index]))
    val_features, val_labels = val_test_features[val_index], val_test_labels[val_index]
    test_features, test_labels = val_test_features[test_index], val_test_labels[test_index]

    # Standardize the data using the same scaler for training, validation, and test sets
    scaler = StandardScaler()
    train_features = scaler.fit_transform(train_features.reshape(-1, train_features.shape[-1])).reshape(train_features.shape)
    val_features = scaler.transform(val_features.reshape(-1, val_features.shape[-1])).reshape(val_features.shape)
    test_features = scaler.transform(test_features.reshape(-1, test_features.shape[-1])).reshape(test_features.shape)

In [None]:
 # Fit the model on the training data
combined_model.fit(train_features, train_labels, epochs=5, batch_size=50, validation_data=(val_features, val_labels))

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7b8bf381b100>

In [None]:
# Evaluate the model on the validation and test data
_, acc_train = combined_model.evaluate(train_features, train_labels)
_, acc_val = combined_model.evaluate(val_features, val_labels)
_, acc_test = combined_model.evaluate(test_features, test_labels)

accuracy_train.append(acc_train)
accuracy_val.append(acc_val)
accuracy_test.append(acc_test)

