In [2]:
import numpy as np
import os
import pickle
import tensorflow as tf
from tensorflow.keras.layers import Dense, Dropout
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import layers, models, regularizers
from sklearn.metrics import accuracy_score
from scipy.signal import butter, lfilter
import pywt

# Load DEAP dataset
def load_deap_data(data_dir):
    eeg_data = []
    labels = []

    for file in os.listdir(data_dir):
        if file.endswith('.dat'):
            file_path = os.path.join(data_dir, file)
            with open(file_path, 'rb') as f:
                subject_data = pickle.load(f, encoding='latin1')
                eeg_data.append(subject_data['data'])
                labels.append(subject_data['labels'])

    eeg_data = np.concatenate(eeg_data, axis=0)
    labels = np.concatenate(labels, axis=0)
    return eeg_data, labels

# Preprocess data: normalize, filter, and extract delta rhythm
def preprocess_data(eeg_data, lowcut=0.5, highcut=4.0, fs=128, order=5):
    def butter_bandpass(lowcut, highcut, fs, order=5):
        nyquist = 0.5 * fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(order, [low, high], btype='band')
        return b, a

    def bandpass_filter(data, lowcut, highcut, fs, order=5):
        b, a = butter_bandpass(lowcut, highcut, fs, order=order)
        y = lfilter(b, a, data)
        return y

    # Normalizing the data
    eeg_data = eeg_data / np.max(np.abs(eeg_data), axis=(1, 2), keepdims=True)
    filtered_data = []
    for trial in eeg_data:
        filtered_trial = np.array([bandpass_filter(channel, lowcut, highcut, fs, order) for channel in trial])
        filtered_data.append(filtered_trial)
    return np.array(filtered_data)

# Convert EEG signals to images using Continuous Wavelet Transform (CWT)
def eeg_to_cwt_images(eeg_data, scales, waveletname='morl'):
    eeg_images = []
    for trial in eeg_data:
        trial_images = []
        for channel in trial:
            coeffs, _ = pywt.cwt(channel, scales, waveletname)
            trial_images.append(coeffs)
        trial_images = np.stack(trial_images[:3], axis=-1)  # Stacking first 3 channels along the last dimension
        eeg_images.append(trial_images)
    return np.array(eeg_images)

# Label the dataset into four classes based on valence and arousal
def label_data(labels):
    valence = labels[:, 0]
    arousal = labels[:, 1]
    combined_labels = []
    for v, a in zip(valence, arousal):
        if v >= 5 and a >= 5:
            combined_labels.append('Excited')
        elif v >= 5 and a < 5:
            combined_labels.append('Calm')
        elif v < 5 and a >= 5:
            combined_labels.append('Stressed')
        else:
            combined_labels.append('Sad')
    return np.array(combined_labels)

# Generator function to yield batches of data
def data_generator(eeg_data, labels, scales, batch_size, augment_fn=None):
    num_samples = eeg_data.shape[0]
    while True:
        for offset in range(0, num_samples, batch_size):
            batch_data = eeg_data[offset:offset + batch_size]
            batch_labels = labels[offset:offset + batch_size]
            batch_images = eeg_to_cwt_images(batch_data, scales)
            batch_images_resized = np.array([tf.image.resize(img, (224, 224)).numpy() for img in batch_images])
            if batch_images_resized.shape[-1] != 3:
                batch_images_rgb = np.repeat(batch_images_resized[..., np.newaxis], 3, axis=-1)
            else:
                batch_images_rgb = batch_images_resized
            if augment_fn:
                batch_images_rgb = augment_fn(batch_images_rgb)
            yield batch_images_rgb, batch_labels

# Load and preprocess the data
data_dir = '/content/drive/My Drive/data_preprocessed_python'  # Update this path to your dataset in Google Drive
eeg_data, labels = load_deap_data(data_dir)
eeg_data = preprocess_data(eeg_data)
scales = np.arange(1, 129)

# Label the dataset
combined_labels = label_data(labels)
encoder = LabelEncoder()
encoded_labels = encoder.fit_transform(combined_labels)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(eeg_data, encoded_labels, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Data Augmentation
data_augmentation = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
])



In [None]:
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from scipy.signal import butter, lfilter
# Build the custom model using EfficientNet
base_model = EfficientNetB0(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
base_model.trainable = False

model = models.Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)),
    Dropout(0.5),
    Dense(4, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model using the data generator
batch_size = 4
train_generator = data_generator(X_train, y_train, scales, batch_size, augment_fn=data_augmentation)
test_generator = data_generator(X_test, y_test, scales, batch_size)
val_generator = data_generator(X_val, y_val, scales, batch_size)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

steps_per_epoch = len(X_train) // batch_size
validation_steps = len(X_test) // batch_size

# Training the model
history = model.fit(train_generator, epochs=8, steps_per_epoch=steps_per_epoch,
                    validation_data=val_generator, validation_steps=validation_steps, callbacks=[early_stopping])

# Fine-tune the model
base_model.trainable = True
model.compile(optimizer=tf.keras.optimizers.Adam(1e-5), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
history_finetune = model.fit(train_generator, epochs=4, steps_per_epoch=steps_per_epoch,
                             validation_data=val_generator, validation_steps=validation_steps, callbacks=[early_stopping])

# Evaluate the model
test_accuracy = model.evaluate(test_generator, steps=len(X_test) // batch_size)
print("Accuracy:", test_accuracy)


# Print epochs and train/validation accuracy and loss
def print_epoch_metrics(history, history_finetune=None):
    print("Epoch\tTrain Loss\tTrain Accuracy\tValidation Loss\tValidation Accuracy")
    for i in range(len(history.history['loss'])):
        print(f"{i+1}\t{history.history['loss'][i]:.4f}\t\t{history.history['accuracy'][i]:.4f}\t\t{history.history['val_loss'][i]:.4f}\t\t{history.history['val_accuracy'][i]:.4f}")
    if history_finetune:
        for i in range(len(history_finetune.history['loss'])):
            print(f"{i+1 + len(history.history['loss'])}\t{history_finetune.history['loss'][i]:.4f}\t\t{history_finetune.history['accuracy'][i]:.4f}\t\t{history_finetune.history['val_loss'][i]:.4f}\t\t{history_finetune.history['val_accuracy'][i]:.4f}")

print_epoch_metrics(history, history_finetune)

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
Epoch 1/8
Epoch 2/8
Epoch 3/8
Epoch 4/8
Epoch 5/8
Epoch 6/8
Epoch 7/8
Epoch 8/8
Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4
Accuracy: 0.4015
