In [1]:
!pip install mne

Collecting mne
  Downloading mne-1.6.1-py3-none-any.whl (8.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.3/8.3 MB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mne
Successfully installed mne-1.6.1


In [26]:
import gdown
import os
import numpy as np
import pandas as pd
import mne
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import glob
from sklearn.preprocessing import LabelBinarizer
import tensorflow as tf
import keras
from tensorflow.keras import layers
import sys
os.environ["TF_CPP_MIN_LOG_LEVEL"]="3"
import matplotlib.pyplot as plt

from tensorflow.keras import layers, models, optimizers
from sklearn.utils import shuffle
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.layers.experimental.preprocessing import RandomFlip, RandomRotation, RandomZoom
from scipy import signal

from tensorflow.keras import regularizers

from tensorflow.keras.layers import BatchNormalization
from sklearn.model_selection import KFold
from sklearn.model_selection import LeaveOneOut


In [3]:
def load_data():
  path="dataset"
  if not os.path.exists(path):
      os.mkdir(path)
      print(f"Folder {path} created!")

      file_id = "1hG5v_COjPNzejRaL9XJAFERee9i2_V04"  # Replace this with your file's ID
      output_file = path+"/eeg.zip"  # Replace "data_file.ext" with the desired output filename and extension
      gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file)
      !unzip "dataset/eeg.zip" -d "dataset"
      os.remove("dataset/eeg.zip")

  else:
      print(f"Folder {path} already exists")


  ds_dir = '/content/dataset/'
  scaler = StandardScaler()
  all_eeg=[]

  for (i, item) in enumerate(glob.glob(ds_dir + '*.edf')):
    print(item)
    raw = mne.io.read_raw_edf(item)
    # Filter EEG channels ('P3' and 'C3')
    eeg_channels = [ch for ch in raw.info['ch_names'] if ch in channels]
    if len(eeg_channels) != len(channels):
        print(f"Error: channels not found in {item}. Skipping...")
        continue

    # Create a new Raw object with only EEG channels
    raw = raw.copy().pick_channels(eeg_channels)

    # Apply bandpass filter (example: 0.1 Hz - 40 Hz)
    raw.load_data()  # Load the data into memory
    raw.filter(l_freq=0.1, h_freq=40)
   # print("mean of data is  {:.6f}".format(np.mean(np.mean(raw))))
    # Segment data into epochs (e.g., 1-second epochs)
    events = mne.make_fixed_length_events(raw, duration=1.0)
    epochs = mne.Epochs(raw, events, tmin=0, tmax=1.0, baseline=None)

    X = epochs.get_data()
    # Standardize features (Z-score normalization)
    n_samples, n_channels, n_time_points = X.shape

    # Reshape to 2D (n_samples x (n_channels * n_time_points))
    #X_reshaped = X.reshape(n_samples, -1)
    X_reshaped=X
    # Apply StandardScaler

    # Compute mean and standard deviation along the time axis (axis 2)
    mean = np.mean(X_reshaped, axis=(0, 1, 2))
    std = np.std(X_reshaped, axis=(0, 1, 2))
    print("mean of data is  {:.6f}".format(np.mean(mean)))
     # Perform scaling
    eeg_data_scaled = (X_reshaped - mean) / std
    X_scaled = eeg_data_scaled

    all_eeg.append(X_scaled)


  #all_eeg=np.array(all_eeg)

  lb=LabelBinarizer()
  all_labels = pd.read_excel("/content/dataset/states.xlsx",usecols=["status"])
  all_labels=lb.fit_transform(all_labels)


  return all_eeg, all_labels, n_samples, n_channels ,n_time_points
  #return  X_train, X_test, y_train, y_test, n_samples, n_channels ,n_time_points

In [4]:
def create_vit_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape, name="input_layer")

    # Patch creation
    patch_size = 4
    #patches = layers.Conv2D(filters=64, kernel_size=patch_size, strides=patch_size, activation='relu')(inputs)
    patches = layers.Conv2D(filters=16, kernel_size=patch_size, strides=patch_size, activation='relu')(inputs)

    # Flatten patches
    flattened_patches = layers.Flatten()(patches)

    # MLP head
    #mlp_output = layers.Dense(256, activation='gelu')(flattened_patches)
    mlp_output= layers.Dense(32, kernel_regularizer=regularizers.l2(0.01))(flattened_patches)
    mlp_output = BatchNormalization()(mlp_output)


    mlp_output = layers.Dropout(0.2)(mlp_output)

    # Classification head
    outputs = layers.Dense(num_classes, activation='softmax', name='output_layer')(mlp_output)

    model = models.Model(inputs=inputs, outputs=outputs)

    return model

In [5]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super().__init__()
        self.patch_size = patch_size

    def call(self, images):
        input_shape = tf.shape(images)
        batch_size = input_shape[0]
        height = input_shape[1]
        width = input_shape[2]
        channels = input_shape[3]  # Assuming RGB images (3 channels)

        num_patches_h = height // self.patch_size
        num_patches_w = width // self.patch_size

        patches = tf.image.extract_patches(images, sizes=[1, self.patch_size, self.patch_size, 1], strides=[1, self.patch_size, self.patch_size, 1], rates=[1, 1, 1, 1], padding="VALID")
        patches = tf.reshape(patches, (batch_size, num_patches_h * num_patches_w, self.patch_size * self.patch_size * channels))

        return patches


In [6]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation="relu")(x)
        x = layers.Dropout(dropout_rate)(x)
    return x


In [7]:
def apply_augmentation(data, noise_level=0.2):
  np.random.seed(42)
  noise = np.random.normal(scale=noise_level, size=np.shape(data))
  augmented_data=data + noise

  return np.array(augmented_data)

In [8]:
def lr_scheduler(epoch, lr):
    if epoch < 5:
        return lr
    else:
        return lr * tf.math.exp(-0.1)


In [9]:
channels= ['T3', 'T5', 'T4', 'T6','Fp2','F3','Fz','Pz','C3','P3','O1','O2']
all_eeg, all_labels, n_samples, n_channels ,n_time_points = load_data()
#X_train, X_test, y_train, y_test, n_samples, n_channels ,n_time_points = load_data()


Folder dataset created!


Downloading...
From (original): https://drive.google.com/uc?id=1hG5v_COjPNzejRaL9XJAFERee9i2_V04
From (redirected): https://drive.google.com/uc?id=1hG5v_COjPNzejRaL9XJAFERee9i2_V04&confirm=t&uuid=c58b8415-6aac-47be-a723-6543ff5bd5ed
To: /content/dataset/eeg.zip
100%|██████████| 383M/383M [00:02<00:00, 164MB/s]


Archive:  dataset/eeg.zip
  inflating: dataset/15.edf          
  inflating: dataset/16.edf          
  inflating: dataset/17.edf          
  inflating: dataset/18.edf          
  inflating: dataset/19.edf          
  inflating: dataset/20.edf          
  inflating: dataset/21.edf          
  inflating: dataset/22.edf          
  inflating: dataset/23.edf          
  inflating: dataset/24.edf          
  inflating: dataset/25.edf          
  inflating: dataset/26.edf          
  inflating: dataset/27.edf          
  inflating: dataset/1.edf           
  inflating: dataset/2.edf           
  inflating: dataset/3.edf           
  inflating: dataset/4.edf           
  inflating: dataset/5.edf           
  inflating: dataset/6.edf           
  inflating: dataset/7.edf           
  inflating: dataset/8.edf           
  inflating: dataset/9.edf           
  inflating: dataset/10.edf          
  inflating: dataset/11.edf          
  inflating: dataset/12.edf          
  inflating: dataset/13.

In [12]:
print( "n_samples= {} and  n_channels = {} and n_time_points= {}".format(n_samples, n_channels ,n_time_points))

n_samples= 1810 and  n_channels = 12 and n_time_points= 257


In [13]:
all_eeg=np.array(all_eeg)

  all_eeg=np.array(all_eeg)


In [14]:
for x in range(len(all_eeg)):
  all_eeg[x]=all_eeg[x][0:1802,0:12,0:257]


In [15]:
all_eeg=np.stack( all_eeg, axis=0 )




In [24]:
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
fold_accuracies = []

#X_train, X_test, y_train, y_test = train_test_split(all_eeg, all_labels, test_size=0.2, random_state=42)

In [19]:
mlp_head_units =[16, 8] #=[32, 16]#[64, 32] [128, 64] #   [256, 128]
input_shape =np.shape(all_eeg[0])#  (1802, 12, 257)

In [16]:
np.shape(all_eeg)

(27, 1802, 12, 257)

In [None]:
print("Shapes of training data and labels:")
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)

print("Shapes of testing data and labels:")
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

Shapes of training data and labels:
X_train shape: (21, 1802, 12, 257)
y_train shape: (21, 1)
Shapes of testing data and labels:
X_test shape: (6, 1802, 12, 257)
y_test shape: (6, 1)


In [20]:
lr_callback = LearningRateScheduler(lr_scheduler)


In [21]:
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)


In [22]:
all_eeg_augmented = apply_augmentation(all_eeg)


In [25]:
for fold, (train_index, test_index) in enumerate(kf.split(all_eeg)):
    print(f"Fold {fold+1}/{k_folds}")

    # Split data into training and testing sets
    X_train, X_test = all_eeg[train_index], all_eeg[test_index]
    y_train, y_test = all_labels[train_index], all_labels[test_index]

    # Apply data preprocessing if needed

    # Define and compile your model
    vit_model = create_vit_model(input_shape, 2)
    optimizer = keras.optimizers.Adam(learning_rate=0.001)
    vit_model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    # Train the model
    history = vit_model.fit(X_train, y_train, batch_size=16, epochs=20, validation_data=(X_test, y_test), verbose=0)

    # Evaluate the model on the test set
    test_loss, test_accuracy = vit_model.evaluate(X_test, y_test)
    fold_accuracies.append(test_accuracy)
    print(f"Test Accuracy for Fold {fold+1}: {test_accuracy:.4f}")

# Calculate and print the average accuracy across all folds
avg_accuracy = sum(fold_accuracies) / len(fold_accuracies)
print(f"Average Test Accuracy across {k_folds} folds: {avg_accuracy:.4f}")

Fold 1/5
Test Accuracy for Fold 1: 0.5000
Fold 2/5
Test Accuracy for Fold 2: 0.8333
Fold 3/5
Test Accuracy for Fold 3: 0.4000
Fold 4/5
Test Accuracy for Fold 4: 0.0000
Fold 5/5
Test Accuracy for Fold 5: 0.4000
Average Test Accuracy across 5 folds: 0.4267


In [None]:

loo = LeaveOneOut()

# Initialize lists to store evaluation metrics for each fold
fold_accuracies = []

# Iterate over each fold
for fold, (train_index, test_index) in enumerate(loo.split(all_eeg)):
    print(f"Fold {fold+1}/{len(all_eeg)}")

    # Split data into training and testing sets
    X_train, X_test = all_eeg[train_index], all_eeg[test_index]
    y_train, y_test = all_labels[train_index], all_labels[test_index]

    # Apply data preprocessing if needed

    # Define and compile your model
    vit_model = create_vit_model(input_shape, 2)
    optimizer = keras.optimizers.Adam(learning_rate=0.001)
    vit_model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    # Train the model
    history = vit_model.fit(X_train, y_train, batch_size=16, epochs=20, validation_data=(X_test, y_test), verbose=0)

    # Evaluate the model on the test set
    test_loss, test_accuracy = vit_model.evaluate(X_test, y_test)
    fold_accuracies.append(test_accuracy)
    print(f"Test Accuracy for Fold {fold+1}: {test_accuracy:.4f}")

# Calculate and print the average accuracy across all folds
avg_accuracy = sum(fold_accuracies) / len(fold_accuracies)
print(f"Average Test Accuracy across {len(all_eeg)} folds: {avg_accuracy:.4f}")


Fold 1/27
Test Accuracy for Fold 1: 1.0000
Fold 2/27
Test Accuracy for Fold 2: 0.0000
Fold 3/27
Test Accuracy for Fold 3: 0.0000
Fold 4/27
Test Accuracy for Fold 4: 0.0000
Fold 5/27
Test Accuracy for Fold 5: 0.0000
Fold 6/27
Test Accuracy for Fold 6: 0.0000
Fold 7/27
Test Accuracy for Fold 7: 1.0000
Fold 8/27
Test Accuracy for Fold 8: 0.0000
Fold 9/27
Test Accuracy for Fold 9: 1.0000
Fold 10/27
Test Accuracy for Fold 10: 1.0000
Fold 11/27
Test Accuracy for Fold 11: 1.0000
Fold 12/27
Test Accuracy for Fold 12: 0.0000
Fold 13/27
Test Accuracy for Fold 13: 1.0000
Fold 14/27
