In [2]:
import numpy as np

# Suppose your cleaned data is in the variable 'preprocessed_data'
eeg_data_cleaned = np.load('eeg_data_cleaned.npy')
y = np.load('y.npy')


In [3]:
from sklearn.model_selection import train_test_split

  from scipy.sparse import csr_matrix, issparse


In [4]:
X_temp, X_test, y_temp, y_test = train_test_split(
eeg_data_cleaned, y, test_size=0.15, random_state=42, stratify=y
)

# Now split the remaining data into training (70%) and validation (15%)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.1765, random_state=42, stratify=y_temp
)

In [5]:
print(y_train.shape)

(7739,)


In [6]:
print(X_train.shape)

(7739, 61, 256)


In [7]:
import numpy as np

# --- 3D EEG Augmentation Functions ---

def add_gaussian_noise(X, noise_level=0.02):
    noise = np.random.normal(0, noise_level, X.shape)
    return X + noise

def random_time_shift(X, max_shift=8):
    shifted = np.zeros_like(X)
    for i in range(X.shape[0]):
        shift = np.random.randint(-max_shift, max_shift)
        if shift > 0:
            shifted[i, :, shift:] = X[i, :, :-shift]
        elif shift < 0:
            shifted[i, :, :shift] = X[i, :, -shift:]
        else:
            shifted[i] = X[i]
    return shifted

def random_amplitude_scaling(X, scale_range=(0.95, 1.05)):
    # X shape: (samples, channels, timepoints, 1)
    scales = np.random.uniform(scale_range[0], scale_range[1], (X.shape[0], 1, 1))
    return X * scales

def channel_reflection(X):
    # Reflect channels (reverse channel order)
    return X[:, ::-1, :]

def channel_masking(X, mask_prob=0.1):
    X_masked = X.copy()
    for i in range(X.shape[0]):
        for ch in range(X.shape[1]):
            if np.random.rand() < mask_prob:
                X_masked[i, ch, :] = 0
    return X_masked

def signal_flipping(X):
    # Reverse time axis
    return X[:, :, ::-1]

def mixup(X, y, alpha=0.2):
    lam = np.random.beta(alpha, alpha)
    batch_size = X.shape[0]
    index = np.random.permutation(batch_size)
    X_mix = lam * X + (1 - lam) * X[index]
    y_mix = lam * y + (1 - lam) * y[index]
    return X_mix, y_mix

# --- Apply Augmentations ---

# Apply augmentations sequentially
# Assume you already split your data:
# X_train, X_val, X_test, y_train, y_val, y_test

# Apply augmentations to X_train only
X_aug = add_gaussian_noise(X_train)
X_aug = random_time_shift(X_aug)
X_aug = random_amplitude_scaling(X_aug)
X_aug = channel_reflection(X_aug)
X_aug = channel_masking(X_aug)
X_aug = signal_flipping(X_aug)

# Mixup (on training set only)
X_mix, y_mix = mixup(X_train, y_train, alpha=0.2)

# Combine all data
X_train_combined = np.concatenate([X_train, X_aug, X_mix], axis=0)
y_train_combined = np.concatenate([y_train, y_train, y_mix], axis=0)


print("Original:", eeg_data_cleaned.shape)
print("Augmented:", X_aug.shape)
print("Mixup:", X_mix.shape)
print("Combined:", X_train_combined.shape)
print("Labels:", y_train_combined.shape)

Original: (11057, 61, 256)
Augmented: (7739, 61, 256)
Mixup: (7739, 61, 256)
Combined: (23217, 61, 256)
Labels: (23217,)


In [8]:
# import numpy as np

# # For training, validation, and test sets
# X_train_combined = np.transpose(X_train_combined, (0, 2, 1))  # (samples, 256, 61)
# X_val = np.transpose(X_val, (0, 2, 1))
# X_test = np.transpose(X_test, (0, 2, 1))


In [9]:
print(X_train_combined.shape)

(23217, 61, 256)


In [10]:
import tensorflow as tf
from tensorflow.keras import layers, models

input_shape = (61, 256, 1)  # (channels, timepoints, 1)

model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.3),  # Dropout after pooling

    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.3),

    layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.4),  # Increased dropout for deeper layers

    layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.4),

    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.BatchNormalization(),  # Batch norm before dense
    layers.Dropout(0.5),
    layers.Dense(1, activation='sigmoid')
])


In [11]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 61, 256, 32)       320       
                                                                 
 batch_normalization (BatchN  (None, 61, 256, 32)      128       
 ormalization)                                                   
                                                                 
 max_pooling2d (MaxPooling2D  (None, 30, 128, 32)      0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 30, 128, 32)       0         
                                                                 
 conv2d_1 (Conv2D)           (None, 30, 128, 64)       18496     
                                                                 
 batch_normalization_1 (Batc  (None, 30, 128, 64)      2

In [12]:
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

In [13]:
import tensorflow as tf
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',         # You can also use 'val_accuracy'
    patience=7,                 # Number of epochs with no improvement after which training will be stopped
    restore_best_weights=True   # Restores model weights from the epoch with the best value of the monitored quantity
)

In [14]:
import numpy as np

X_train_combined = np.array(X_train_combined, dtype=np.float32)
y_train_combined = np.array(y_train_combined, dtype=np.float32)  # or int32 for classification
X_val = np.array(X_val, dtype=np.float32)
y_val = np.array(y_val, dtype=np.float32)


In [15]:
print(X_train_combined.shape)
print(y_train_combined.shape)
print(X_val.shape)
print(y_val.shape)


(23217, 61, 256)
(23217,)
(1659, 61, 256)
(1659,)


In [16]:
import tensorflow as tf
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available: 1


In [17]:
history = model.fit(
    X_train_combined, y_train_combined,
    epochs=50,                
    batch_size=32,
    validation_data=(X_val, y_val),
    callbacks=[early_stopping]
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50


In [18]:
test_loss, test_acc = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_acc:.4f}")

Test accuracy: 0.9596


In [19]:
from sklearn.metrics import confusion_matrix, classification_report

y_pred = (model.predict(X_test) > 0.5).astype("int32")
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))

[[ 566   38]
 [  29 1026]]
              precision    recall  f1-score   support

           0       0.95      0.94      0.94       604
           1       0.96      0.97      0.97      1055

    accuracy                           0.96      1659
   macro avg       0.96      0.95      0.96      1659
weighted avg       0.96      0.96      0.96      1659



In [20]:
model.save('best_eeg_cnn_model_with_preprocessing_augmentation.keras')