In [1]:
import pickle
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, BatchNormalization, MaxPooling1D, Flatten, Dense, Input, LSTM, Dropout
from tensorflow.keras.models import Model
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score
import seaborn as sns
from scipy.signal import welch
from scipy.stats import entropy, skew, kurtosis
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import LabelBinarizer

2024-12-15 08:03:08.831224: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-12-15 08:03:08.831285: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-12-15 08:03:08.833276: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:

def read_data(filename):
    with open(filename, 'rb') as f:
        data = pickle.load(f, encoding='latin1')
    return data

files = [f"s{n:02d}" for n in range(1, 33)]

labels, data = [], []

for file in files:
    filepath = f"/kaggle/input/deap-dataset/data_preprocessed_python/{file}.dat"
    d = read_data(filepath)
    labels.append(d['labels'])
    data.append(d['data'])

labels = np.array(labels)
data = np.array(data)

print("Labels shape: ", labels.shape)
print("Data shape: ", data.shape)

eeg_data = data[:, :, :32, -7680:]
print("EEG data shape: ", eeg_data.shape)

Labels shape:  (32, 40, 4)
Data shape:  (32, 40, 40, 8064)
EEG data shape:  (32, 40, 32, 7680)


In [3]:
def extract_psd_feature(data):
    psd_features = []
    for channel_data in data:
        f, psd = welch(channel_data, fs=128)
        psd_features.append(psd)
    return np.array(psd_features)

def extract_entropy_feature(data):
    entropy_features = []
    for channel_data in data:
        prob_dist, _ = np.histogram(channel_data, bins=128, density=True)
        prob_dist = prob_dist / np.sum(prob_dist)
        entropy_features.append(entropy(prob_dist))
    return np.array(entropy_features)

def extract_sd_feature(data):
    return np.std(data, axis=1)

def extract_mean_feature(data):
    return np.mean(data, axis=1)
def extract_variance_feature(data):
    return np.var(data, axis=1)

def extract_skewness_feature(data):
    return skew(data, axis=1)

def extract_kurtosis_feature(data):
    return kurtosis(data, axis=1)

def preprocess_data(eeg_data, labels, feature_type):
    data_features, data_labels, data_labels_valence = [], [], []
    
    for subject in range(eeg_data.shape[0]):
        for trial in range(eeg_data.shape[1]):
            trial_data = eeg_data[subject, trial, :, :]
            if feature_type == 'psd':
                features = extract_psd_feature(trial_data)
            elif feature_type == 'entropy':
                features = extract_entropy_feature(trial_data)
            elif feature_type == 'sd':
                features = extract_sd_feature(trial_data)
            elif feature_type == 'mean':
                features = extract_mean_feature(trial_data)
            elif feature_type == 'variance':
                features = extract_variance_feature(trial_data)
            elif feature_type == 'skewness':
                features = extract_skewness_feature(trial_data)
            elif feature_type == 'kurtosis':
                features = extract_kurtosis_feature(trial_data)
            else:
                raise ValueError("Unknown feature type")
            
            data_features.append(features)
            arousal_label = labels[subject, trial, 0]
            data_labels.append(1 if arousal_label > 5 else 0)
            valence_label = labels[subject, trial, 1]
            data_labels_valence.append(1 if valence_label > 5 else 0)
    
    return np.array(data_features), np.array(data_labels), np.array(data_labels_valence)

In [4]:
feature_types = ['psd', 'entropy', 'sd', 'mean', 'variance', 'skewness', 'kurtosis']

all_features = {}
for feature_type in feature_types:
    features, labels_processed, labels_processed_valence = preprocess_data(eeg_data, labels, feature_type)
    all_features[feature_type] = (features, labels_processed, labels_processed_valence)
    print(f"{feature_type.capitalize()} Features shape: ", features.shape)
    print(f"{feature_type.capitalize()} Arousal Labels shape: ", labels_processed.shape)
    print(f"{feature_type.capitalize()} Valence Labels shape: ", labels_processed_valence.shape)


Psd Features shape:  (1280, 32, 129)
Psd Arousal Labels shape:  (1280,)
Psd Valence Labels shape:  (1280,)
Entropy Features shape:  (1280, 32)
Entropy Arousal Labels shape:  (1280,)
Entropy Valence Labels shape:  (1280,)
Sd Features shape:  (1280, 32)
Sd Arousal Labels shape:  (1280,)
Sd Valence Labels shape:  (1280,)
Mean Features shape:  (1280, 32)
Mean Arousal Labels shape:  (1280,)
Mean Valence Labels shape:  (1280,)
Variance Features shape:  (1280, 32)
Variance Arousal Labels shape:  (1280,)
Variance Valence Labels shape:  (1280,)
Skewness Features shape:  (1280, 32)
Skewness Arousal Labels shape:  (1280,)
Skewness Valence Labels shape:  (1280,)
Kurtosis Features shape:  (1280, 32)
Kurtosis Arousal Labels shape:  (1280,)
Kurtosis Valence Labels shape:  (1280,)


In [19]:
# CNN + LSTM
def create_cnn_lstm_model(input_shape):
    # inputs = Input(shape=input_shape)
    
    # # 1st Conv Block
    # x = Conv1D(filters=128, kernel_size=3, strides=1, padding='same', activation='relu')(inputs)
    # x = BatchNormalization()(x)
    # x = MaxPooling1D(pool_size=2)(x)
    
    # # 2nd Conv Block
    # x = Conv1D(filters=128, kernel_size=3, strides=1, padding='same', activation='relu')(x)
    # x = BatchNormalization()(x)
    # x = MaxPooling1D(pool_size=2)(x)
    
    # # 3rd Conv Block 
    # x = Conv1D(filters=256, kernel_size=3, strides=1, padding='same', activation='relu')(x)
    # x = BatchNormalization()(x)
    # x = MaxPooling1D(pool_size=2)(x)
    
    # # LSTM
    # x = LSTM(256, return_sequences=False)(x)
    
    # x = Dense(units=512, activation='relu')(x)
    # x = Dropout(0.5)(x)  
    # x = Dense(units=256, activation='relu')(x)
    
    # outputs = Dense(units=1, activation='sigmoid')(x)
    inputs = Input(shape=input_shape)

    # LSTM Layer
    x1 = LSTM(input_shape[0])(inputs)  # Số units của LSTM bằng số timestep (input_shape[0])
    
    # Dense Layer 1
    x2 = Dense(input_shape[0])(x1)  # Dense layer không có activation
    
    # Dense Layer 2
    x3 = Dense(12)(x2)  # Dense layer với 12 units, không có activation
    
    # Output Layer
    outputs = Dense(1, activation='sigmoid')(x2)
        
    model = Model(inputs, outputs)
    return model

feature_types = ['psd']

In [17]:

X_all = []
y_all = []

for feature_type in feature_types:
    features, arousal_labels, _ = all_features[feature_type]
    
    for channel in range(features.shape[2]):
        channel_data = features[:, :, channel]  
        X_all.append(channel_data)
        y_all.append(arousal_labels)

X_all = np.concatenate(X_all, axis=0)  
y_all = np.concatenate(y_all, axis=0)  

print("Concatenated Features Shape: ", X_all.shape)
print("Concatenated Arousal Labels Shape: ", y_all.shape)


kf = KFold(n_splits=5, shuffle=True, random_state=42)

Concatenated Features Shape:  (165120, 32)
Concatenated Arousal Labels Shape:  (165120,)


In [26]:
import os
import wandb

# Đọc API key từ biến môi trường
api_key = os.environ.get("88aea2f639fada32b4501731dc07e972dc732128")
wandb.login(key=api_key)
wandb.init(project="Recognition Emotion", config={
               "learning_rate": 0.001,
               "epochs": 150,
               "batch_size": 32,
               "feature_type": "psd", # Sẽ được cập nhật sau
               "model": "CNN-LSTM"
           })



VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

0,1
fold_1/epoch,▁
fold_1/f1_score,▁
fold_1/train_accuracy,▁
fold_1/train_loss,▁
fold_1/val_accuracy,▁
fold_1/val_loss,▁
fold_2/epoch,▁
fold_2/f1_score,▁
fold_2/train_accuracy,▁
fold_2/train_loss,▁

0,1
fold_1/epoch,0.0
fold_1/f1_score,0.68643
fold_1/train_accuracy,0.56403
fold_1/train_loss,0.68231
fold_1/val_accuracy,0.57364
fold_1/val_loss,0.67579
fold_2/epoch,0.0
fold_2/f1_score,0.68231
fold_2/train_accuracy,0.56501
fold_2/train_loss,0.68152


In [None]:
config={
               "learning_rate": 0.001,
               "epochs": 150,
               "batch_size": 32,
               "feature_type": "psd", # Sẽ được cập nhật sau
               "model": "CNN-LSTM"
           }

In [27]:
f1_scores = []
def train_and_evaluate(model, X_train, y_train, X_test, y_test, fold):
    model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
    if fold == 0 : 
        model.summary()
    history = model.fit(X_train, y_train, epochs=config["epochs"], batch_size=32, validation_data=(X_test, y_test), verbose=1)
    # config["epochs"]
    for epoch in range(config["epochs"]):
        wandb.log({
            f"fold_{fold+1}/epoch": epoch,
            f"fold_{fold+1}/train_loss": history.history['loss'][epoch],
            f"fold_{fold+1}/train_accuracy": history.history['accuracy'][epoch],
            f"fold_{fold+1}/val_loss": history.history['val_loss'][epoch],
            f"fold_{fold+1}/val_accuracy": history.history['val_accuracy'][epoch]
        })
    y_pred = model.predict(X_test)
    print("y_ pred : ", y_pred, "  -------  y_test : ", y_test)
    y_pred = (y_pred > 0.5).astype(int)
    print("y_pred astype : ", y_pred)
    
    num_zeros = np.count_nonzero(y_pred == 0)
    num_ones = np.count_nonzero(y_pred == 1)

    # In ra kết quả
    print(f"Fold {fold + 1}:")
    print(f"  Số lượng dự đoán là 0: {num_zeros}")
    print(f"  Số lượng dự đoán là 1: {num_ones}")

    num_zeros_test = np.count_nonzero(y_test == 0)
    num_ones_test = np.count_nonzero(y_test == 1)
    
    print(f" test Số lượng dự đoán là 0: {num_zeros_test}")
    print(f" test Số lượng dự đoán là 1: {num_ones_test}")
    
    f1 = f1_score(y_test, y_pred)
    
    test_score = model.evaluate(X_test, y_test)

    wandb.log({f"fold_{fold+1}/f1_score": f1})
    f1_scores.append(f1)
    
    return test_score, f1


In [28]:
# from wandb.keras import WandbCallback

In [None]:
for fold, (train_idx, val_idx) in enumerate(kf.split(X_all)):
    print(f"Training fold {fold + 1}")
    
    X_train, X_val = X_all[train_idx], X_all[val_idx]
    y_train, y_val = y_all[train_idx], y_all[val_idx]
    
    X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_val = X_val.reshape((X_val.shape[0], X_val.shape[1], 1))
    
    # CNN + LSTM
    input_shape = (X_train.shape[1], 1)
    model = create_cnn_lstm_model(input_shape)
    
    test_score, f1 = train_and_evaluate(model, X_train, y_train, X_val, y_val, fold)
    
    print(f"Fold {fold + 1} Test Score: ", test_score)
    print(f"Fold {fold + 1} F1 Score: ", f1)

Training fold 1


Epoch 1/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 4ms/step - accuracy: 0.5524 - loss: 0.2468 - val_accuracy: 0.5777 - val_loss: 0.2414
Epoch 2/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 4ms/step - accuracy: 0.5800 - loss: 0.2407 - val_accuracy: 0.5863 - val_loss: 0.2376
Epoch 3/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 4ms/step - accuracy: 0.5885 - loss: 0.2366 - val_accuracy: 0.5951 - val_loss: 0.2343
Epoch 4/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 4ms/step - accuracy: 0.6012 - loss: 0.2332 - val_accuracy: 0.5941 - val_loss: 0.2343
Epoch 5/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 4ms/step - accuracy: 0.6053 - loss: 0.2314 - val_accuracy: 0.6007 - val_loss: 0.2308
Epoch 6/150
[1m4128/4128[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 4ms/step - accuracy: 0.6087 - loss: 0.2295 - val_accuracy: 0.6003 - val_loss: 0.2314
Epoc

In [None]:
mean_f1_score = np.mean(f1_scores)
std_f1_score = np.std(f1_scores)

# Log the mean and std F1-score
wandb.log({
    "mean_f1_score": mean_f1_score,
    "std_f1_score": std_f1_score
})
wandb.finish()

print(f"Mean F1 Score: {mean_f1_score}")
print(f"Standard Deviation of F1 Score: {std_f1_score}")