# Cognitive Load Estimation Using Muse EEG

The paper focuses on EEG recordings for classifying baseline vs cognitive load states, primarily using raw EEG data from specific channels.
The raw data at each time stamp dn consists of dn = {pi, tp9, af7, af8, tp10}. So this would be columns:

- RAW_TP9
- RAW_AF7
- RAW_AF8
- RAW_TP10

Additionally, the paper emphasizes:
- Theta (4-7.5Hz) and Alpha (8-13Hz) frequency bands as important for identifying cognitive load.

Therefore, in addition to the raw data, we consider extracting features from the following columns:
Theta: Theta_TP9, Theta_AF7, Theta_AF8, Theta_TP10
Alpha: Alpha_TP9, Alpha_AF7, Alpha_AF8, Alpha_TP10

## Import Necessary Libraries

In [1]:
import pandas as pd
import os
import numpy as np
from scipy.signal import butter, lfilter, iirnotch

## Data Loading and Preprocessing
Using the pilot data set first for training.

In [2]:
#define paths and init empty lists to store data
base_path = 'pilot'
participants = range(11)
columns_to_keep = [
    'Delta_TP9', 'Delta_AF7', 'Delta_AF8', 'Delta_TP10',
    'Theta_TP9', 'Theta_AF7', 'Theta_AF8', 'Theta_TP10',
    'Alpha_TP9', 'Alpha_AF7', 'Alpha_AF8', 'Alpha_TP10',
    'Beta_TP9', 'Beta_AF7', 'Beta_AF8', 'Beta_TP10',
    'Gamma_TP9', 'Gamma_AF7', 'Gamma_AF8', 'Gamma_TP10'
]
sampling_rate = 256  # Hz, from CogWear data

In [3]:
# Filters
# To remove 50Hz powerline noise
def notch_filter(data, freq=50.0, Q=30.0, fs=256.0):
    w0 = freq / (fs / 2) # Normalize
    b, a = iirnotch(w0, Q)
    return lfilter(b, a, data)

# Band-pass filter function (1-50 Hz)
def bandpass_filter(data, lowcut=1.0, highcut=50.0, fs=256.0, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data)

In [4]:
# Filtering
def apply_filters(eeg_data):
    filtered_data = pd.DataFrame()
    for column in eeg_data.columns:
        eeg_filtered = notch_filter(eeg_data[column], freq=50.0, fs=sampling_rate)
        eeg_filtered = bandpass_filter(eeg_filtered, lowcut=1.0, highcut=50.0,  fs=sampling_rate)
        filtered_data[column] = eeg_filtered
    return filtered_data

In [5]:
# load and Process Data
def load_and_filter_data(participant_id, state):
    file_path = os.path.join(base_path, str(participant_id), state, 'muse_eeg.csv')
    eeg_data = pd.read_csv(file_path)
    # Select columns to keep
    eeg_data = eeg_data[columns_to_keep]
    
    # Check for missing values and drop them
    missing_values_before = eeg_data.isnull().sum().sum()
    eeg_data.dropna(inplace=True)
    missing_values_after = eeg_data.isnull().sum().sum()
    
    # Apply filters to each channel in the EEG data
    eeg_data_filtered = apply_filters(eeg_data)
    return eeg_data_filtered

In [6]:
# Divide the data into 1 second epochs as mentioned in the paper
# Assign labels, baseline (0) & cognitive_load (1)
# Discard values that do not create a consistent epoch size of 256
def create_epochs_and_labels(eeg_data, label, segment_duration=1):
    num_samples_per_epoch = int(sampling_rate * segment_duration)
    total_samples = len(eeg_data)
    
    truncated_length = total_samples - (total_samples % num_samples_per_epoch)

    data_epochs = []
    labels = []

    for i in range(truncated_length // num_samples_per_epoch):
        start = i * num_samples_per_epoch
        end = start + num_samples_per_epoch
        data_epochs.append(eeg_data.iloc[start:end].to_numpy())
        labels.append(label)
        
    return np.array(data_epochs), np.array(labels)

## Feature Extraction
Based on paper "Wearable EEG-Based Cognitive Load Classification by Personalized and
Generalized Model Using Brain Asymmetry"

In [7]:
def extract_features(filtered_data):
    """Extract features from the filtered data using PSD values."""
    features = {}

    # Calculate mean power for each frequency band across the specified channels
    features['Delta'] = filtered_data[['Delta_TP9', 'Delta_AF7', 'Delta_AF8', 'Delta_TP10']].mean(axis=1)
    features['Theta'] = filtered_data[['Theta_TP9', 'Theta_AF7', 'Theta_AF8', 'Theta_TP10']].mean(axis=1)
    features['Alpha'] = filtered_data[['Alpha_TP9', 'Alpha_AF7', 'Alpha_AF8', 'Alpha_TP10']].mean(axis=1)
    features['Beta'] = filtered_data[['Beta_TP9', 'Beta_AF7', 'Beta_AF8', 'Beta_TP10']].mean(axis=1)
    features['Gamma'] = filtered_data[['Gamma_TP9', 'Gamma_AF7', 'Gamma_AF8', 'Gamma_TP10']].mean(axis=1)

    # Calculate ratios
    features['Alpha_Beta_Ratio'] = features['Alpha'] / features['Beta']
    features['Theta_Beta_Ratio'] = features['Theta'] / features['Beta']
    features['Theta_Alpha_Beta_Ratio'] = (features['Theta'] + features['Alpha']) / features['Beta']
    features['Theta_Alpha_Alpha_Beta_Ratio'] = (features['Theta'] + features['Alpha']) / (features['Beta'] + features['Alpha'])
    features['Theta_Alpha_Ratio'] = features['Theta'] / features['Alpha']
    features['Alpha_Theta_Ratio'] = features['Alpha'] / features['Theta']

    # Log-transform alpha power for AF8 and AF7
    features['Log_Alpha_AF7'] = np.log(np.where(filtered_data['Alpha_AF7'] > 0, filtered_data['Alpha_AF7'], 1e-10))
    features['Log_Alpha_AF8'] = np.log(np.where(filtered_data['Alpha_AF8'] > 0, filtered_data['Alpha_AF8'], 1e-10))
    features['Asymmetry_Score'] = features['Log_Alpha_AF7'] - features['Log_Alpha_AF8']

    return pd.DataFrame(features)


In [8]:
# Main processing loop
muse_data = []  # To store all epochs and labels
X_features = []  # To store all extracted features
y_labels = []    # To store all corresponding labels


# Loop through all participants and process their data
for participant in participants:
    # Load and filter baseline and cognitive load data
    baseline_data_filtered = load_and_filter_data(participant, 'baseline')
    cognitive_load_data_filtered = load_and_filter_data(participant, 'cognitive_load')

    # Extract features from the entire filtered data (before chunking)
    baseline_features = extract_features(baseline_data_filtered)
    cognitive_load_features = extract_features(cognitive_load_data_filtered)

    # Create epochs and assign labels for baseline and cognitive load data
    baseline_data_epochs, baseline_labels = create_epochs_and_labels(baseline_features, label=0)
    cognitive_load_data_epochs, cognitive_load_labels = create_epochs_and_labels(cognitive_load_features, label=1)

    # Append both baseline and cognitive load data epochs for the participant
    muse_data.append((baseline_data_epochs, baseline_labels))
    muse_data.append((cognitive_load_data_epochs, cognitive_load_labels))

# Check for empty epochs before concatenation
X_list = [epoch for epoch, _ in muse_data if epoch.size > 0]
y_list = [label for _, label in muse_data]

# Ensure we have the same number of dimensions and valid data
if X_list:
    X = np.concatenate(X_list)
    y = np.concatenate(y_list)
else:
    X = np.array([])  # Handle the case where no data is present
    y = np.array([])

print("Preprocessing complete.")
print(f"Shape of input data (X): {X.shape}")
print(f"Shape of labels (y): {y.shape}")

  eeg_data = pd.read_csv(file_path)


Preprocessing complete.
Shape of input data (X): (4179, 256, 14)
Shape of labels (y): (4179,)


In [12]:
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold

In [13]:
# Flatten the time dimension
X_flat = X.reshape(X.shape[0], -1)  # Shape: (4179, 3584)

# Now X_flat and y are ready for classification

In [14]:
# Use Stratified K-Fold for balanced class distribution
kf = StratifiedKFold(n_splits=8, shuffle=True)

In [None]:
# Initialize classifiers with default parameters
svm = SVC(kernel='linear')  # You can change to 'rbf' if desired
rf = RandomForestClassifier()

# For storing results
svm_accuracies = []
rf_accuracies = []

# Loop over the folds for SVM
for train_index, test_index in kf.split(X_flat, y):
    X_train, X_test = X_flat[train_index], X_flat[test_index]
    y_train, y_test = y[train_index], y[test_index]

    # Train SVM
    svm.fit(X_train, y_train)
    svm_accuracy = svm.score(X_test, y_test)
    svm_accuracies.append(svm_accuracy)

# Loop over the folds for Random Forest
for train_index, test_index in kf.split(X_flat, y):
    X_train, X_test = X_flat[train_index], X_flat[test_index]
    y_train, y_test = y[train_index], y[test_index]

    # Train Random Forest
    rf.fit(X_train, y_train)
    rf_accuracy = rf.score(X_test, y_test)
    rf_accuracies.append(rf_accuracy)

# Calculate mean accuracies
mean_svm_accuracy = sum(svm_accuracies) / len(svm_accuracies)
mean_rf_accuracy = sum(rf_accuracies) / len(rf_accuracies)

print(f"Mean SVM Accuracy: {mean_svm_accuracy:.2f}")
print(f"Mean Random Forest Accuracy: {mean_rf_accuracy:.2f}")