In [22]:
import os
import json
import numpy as np
import mne
import warnings
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

In [None]:
# Ignore RuntimeWarning
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Enable CUDA
mne.utils.set_config('MNE_USE_CUDA', 'true')
mne.cuda.init_cuda(verbose=False)  # Set to True for debugging

In [None]:
def load_eeg_data(file_path):
    """Load EEG data from EEGLAB file"""
    try:
        raw = mne.io.read_raw_eeglab(file_path)
        return raw.get_data()
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

class EEGRFDataset:
    """
    Dataset handler for EEG data to be used with Random Forest models.
    Loads and preprocesses EEG data from files.
    """

    def __init__(self, data_dir, data_info=None, scaler=None):
        """
        Initialize the dataset handler.

        Args:
            data_dir (str): Directory containing the EEG data files
            data_info (list): List of dictionaries containing file information
                             (if None, will try to load from labels.json)
            scaler (StandardScaler): Pre-fitted scaler for feature normalization
                                    (if None, a new one will be created and fitted)
        """
        self.data_dir = data_dir

        # Load data info if not provided
        if data_info is None:
            with open(os.path.join(data_dir, 'labels.json'), 'r') as f:
                self.data_info = json.load(f)
        else:
            self.data_info = data_info

        self.scaler = scaler

        # Generate data and labels lists
        self.data = [d['file_name'] for d in self.data_info]
        self.labels = [0 if d['label'] == 'A' else 1 if d['label'] == 'C' else 2 for d in self.data_info]

    def load_data(self, data_type=None):
        """
        Load and preprocess the EEG data.

        Args:
            data_type (str, optional): Type of data to load (e.g., 'train', 'test_cross', 'test_within')
                                      If None, all data will be loaded.

        Returns:
            tuple: (X, y) where X is the feature matrix and y contains the labels
        """
        X = []
        y = []

        # Filter by data type if specified
        if data_type is not None:
            filtered_info = [d for d in self.data_info if d['type'] == data_type]
        else:
            filtered_info = self.data_info

        print(f"Loading {len(filtered_info)} EEG samples for {data_type}...")

        # Load each EEG file
        for item in filtered_info:
            file_path = os.path.join(self.data_dir, item['file_name'])
            label = 0 if item['label'] == 'A' else 1 if item['label'] == 'C' else 2

            # Load the EEG data from the file
            eeg_data = load_eeg_data(file_path)

            if eeg_data is not None:
                # Preprocess EEG data for Random Forest
                features = self._preprocess_eeg(eeg_data)

                X.append(features)
                y.append(label)

        # Convert lists to numpy arrays
        X = np.array(X)
        y = np.array(y)

        if len(X) == 0:
            raise ValueError(f"No valid data loaded for {data_type}. Please check file paths and data format.")

        # Create and fit scaler if not provided
        if self.scaler is None:
            self.scaler = StandardScaler()
            X = self.scaler.fit_transform(X)
        else:
            X = self.scaler.transform(X)

        return X, y

    def _preprocess_eeg(self, eeg_data):
        """
        Preprocess the EEG data for Random Forest input.

        Args:
            eeg_data (numpy.ndarray): Raw EEG data

        Returns:
            numpy.ndarray: Preprocessed features
        """
        # Assuming eeg_data shape is (channels, samples) or (samples, channels)
        # If data is (samples, channels), transpose it
        if eeg_data.shape[0] > eeg_data.shape[1]:
            eeg_data = eeg_data.T

        num_channels = eeg_data.shape[0]

        # Feature extraction
        features = []

        # Time domain and frequency domain features for Random Forest
        for channel in range(num_channels):
            channel_data = eeg_data[channel, :]

            # Time domain features
            features.extend([
                np.mean(channel_data),
                np.std(channel_data),
                np.max(channel_data),
                np.min(channel_data),
                np.percentile(channel_data, 75) - np.percentile(channel_data, 25)
            ])

            # Frequency domain features
            fft_data = np.abs(np.fft.rfft(channel_data))
            # Normalized frequency band powers
            total_power = np.sum(fft_data)
            features.extend([
                np.sum(fft_data[:5]) / total_power,  # Delta
                np.sum(fft_data[5:12]) / total_power,  # Theta
                np.sum(fft_data[12:30]) / total_power,  # Alpha
                np.sum(fft_data[30:80]) / total_power,  # Beta
                np.sum(fft_data[80:]) / total_power  # Gamma
            ])

        return np.array(features)

In [None]:
class EEGRF:
    """
    Random Forest model for EEG classification with hyperparameter optimization.
    """

    def __init__(self, n_estimators=100, max_depth=None, class_weight=None):
        """
        Initialize the Random Forest model.

        Args:
            n_estimators (int): Number of trees in the forest
            max_depth (int or None): The maximum depth of the trees
            class_weight (dict or 'balanced'): Weights associated with classes in the form {class_label: weight}
        """
        self.model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            class_weight=class_weight,
            random_state=42
        )

    def train(self, X_train, y_train):
        """
        Train the Random Forest model.

        Args:
            X_train (numpy.ndarray): Training features
            y_train (numpy.ndarray): Training labels

        Returns:
            EEGRF: Self for method chaining
        """
        print("Training Random Forest model...")
        self.model.fit(X_train, y_train)

        train_pred = self.model.predict(X_train)
        train_acc = accuracy_score(y_train, train_pred)
        print(f"Training accuracy: {train_acc:.4f}")

        return self

    def predict(self, X):
        """
        Make predictions with the trained model.

        Args:
            X (numpy.ndarray): Features

        Returns:
            numpy.ndarray: Predictions
        """
        return self.model.predict(X)

    def evaluate(self, X_test, y_test):
        """
        Evaluate the model on test data.

        Args:
            X_test (numpy.ndarray): Test features
            y_test (numpy.ndarray): Test labels

        Returns:
            dict: Dictionary of evaluation metrics
        """
        y_pred = self.predict(X_test)

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')
        cm = confusion_matrix(y_test, y_pred)

        results = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': cm
        }

        return results


In [23]:

class EEGSVM:
    """
    Support Vector Machine model for EEG classification with hyperparameter optimization.
    """

    def __init__(self, C=1.0, kernel='rbf', class_weight=None):
        """
        Initialize the SVM model.

        Args:
            C (float): Regularization parameter
            kernel (str): Kernel type to be used in the algorithm
            class_weight (dict or 'balanced'): Weights associated with classes
        """
        # Initialize the SVC with given hyperparameters
        self.model = SVC(
            C=C,
            kernel=kernel,
            class_weight=class_weight,
            probability=True,
            random_state=42
        )

    def train(self, X_train, y_train):
        """
        Train the SVM model.

        Args:
            X_train (numpy.ndarray): Training features
            y_train (numpy.ndarray): Training labels

        Returns:
            EEGSVM: Self for method chaining
        """
        print("Training SVM model...")
        self.model.fit(X_train, y_train)

        train_pred = self.model.predict(X_train)
        train_acc = accuracy_score(y_train, train_pred)
        print(f"Training accuracy: {train_acc:.4f}")

        return self

    def predict(self, X):
        """
        Make predictions with the trained model.

        Args:
            X (numpy.ndarray): Features

        Returns:
            numpy.ndarray: Predictions
        """
        return self.model.predict(X)

    def evaluate(self, X_test, y_test):
        """
        Evaluate the model on test data.

        Args:
            X_test (numpy.ndarray): Test features
            y_test (numpy.ndarray): Test labels

        Returns:
            dict: Dictionary of evaluation metrics
        """
        y_pred = self.predict(X_test)

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')
        cm = confusion_matrix(y_test, y_pred)

        results = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': cm
        }

        return results


In [24]:
class EEGGradientBoost:
    """
    Gradient Boosting model for EEG classification with hyperparameter optimization.
    """

    def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3, subsample=1.0):
        """
        Initialize the Gradient Boosting model.

        Args:
            n_estimators (int): Number of boosting stages
            learning_rate (float): Learning rate shrinks the contribution of each tree
            max_depth (int): Maximum depth of the individual regression estimators
            subsample (float): The fraction of samples to be used for fitting the individual base learners
        """
        # Initialize the GradientBoostingClassifier with given hyperparameters
        self.model = GradientBoostingClassifier(
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            max_depth=max_depth,
            subsample=subsample,
            random_state=42
        )

    def train(self, X_train, y_train):
        """
        Train the Gradient Boosting model.

        Args:
            X_train (numpy.ndarray): Training features
            y_train (numpy.ndarray): Training labels

        Returns:
            EEGGradientBoost: Self for method chaining
        """
        print("Training Gradient Boosting model...")
        self.model.fit(X_train, y_train)

        train_pred = self.model.predict(X_train)
        train_acc = accuracy_score(y_train, train_pred)
        print(f"Training accuracy: {train_acc:.4f}")

        return self

    def predict(self, X):
        """
        Make predictions with the trained model.

        Args:
            X (numpy.ndarray): Features

        Returns:
            numpy.ndarray: Predictions
        """
        return self.model.predict(X)

    def evaluate(self, X_test, y_test):
        """
        Evaluate the model on test data.

        Args:
            X_test (numpy.ndarray): Test features
            y_test (numpy.ndarray): Test labels

        Returns:
            dict: Dictionary of evaluation metrics
        """
        y_pred = self.predict(X_test)

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')
        cm = confusion_matrix(y_test, y_pred)

        results = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': cm
        }

        return results

In [None]:
def print_evaluation_results(results, data_type):
    """Print detailed evaluation results in a formatted way"""
    print(f"\n===== {data_type} Results =====")
    print(f"Accuracy: {results['accuracy']:.4f}")
    print(f"Precision: {results['precision']:.4f}")
    print(f"Recall: {results['recall']:.4f}")
    print(f"F1 Score: {results['f1']:.4f}")
    print("\nConfusion Matrix:")
    print(results['confusion_matrix'])
    
    # Calculate per-class metrics from confusion matrix
    cm = results['confusion_matrix']
    classes = ['A', 'C', 'F']
    
    print("\nPer-class metrics:")
    for i, cls in enumerate(classes):
        # True positives: diagonal elements
        tp = cm[i, i]
        # False positives: sum of column i minus diagonal element
        fp = np.sum(cm[:, i]) - tp
        # False negatives: sum of row i minus diagonal element
        fn = np.sum(cm[i, :]) - tp
        # True negatives: sum of all elements minus tp, fp, fn
        tn = np.sum(cm) - tp - fp - fn
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        print(f"Class {cls}:")
        print(f"  Precision: {precision:.4f}")
        print(f"  Recall: {recall:.4f}")
        print(f"  F1 Score: {f1:.4f}")


In [13]:
data_dir = "model-data"

# Create dataset handler
dataset = EEGRFDataset(data_dir)
dataset

<__main__.EEGRFDataset at 0x2b18322a7d0>

In [14]:
# Load training data
X_train, y_train = dataset.load_data(data_type='train')
print(f"Loaded training data: {X_train.shape} features, {len(y_train)} labels")


Loading 3219 EEG samples for train...
Loaded training data: (3219, 190) features, 3219 labels


In [20]:
# Initialize and train the model
rf_model = EEGRF(n_estimators=100, max_depth=None, class_weight='balanced')
rf_model.train(X_train, y_train)

# Evaluate on test_cross data
print("\nEvaluating on test_cross data (different subjects)...")
X_test_cross, y_test_cross = dataset.load_data(data_type='test_cross')
print(f"Loaded test_cross data: {X_test_cross.shape} features, {len(y_test_cross)} labels")

if len(y_test_cross) > 0:
    results_cross = rf_model.evaluate(X_test_cross, y_test_cross)
    print_evaluation_results(results_cross, "Test Cross-Subject")
else:
    print("No test_cross data available.")

# Evaluate on test_within data
print("\nEvaluating on test_within data (same subjects)...")
X_test_within, y_test_within = dataset.load_data(data_type='test_within')
print(f"Loaded test_within data: {X_test_within.shape} features, {len(y_test_within)} labels")

if len(y_test_within) > 0:
    results_within = rf_model.evaluate(X_test_within, y_test_within)
    print_evaluation_results(results_within, "Test Within-Subject")
else:
    print("No test_within data available.")

# Calculate and print overall metrics (combined test sets)
if len(y_test_cross) > 0 and len(y_test_within) > 0:
    print("\nEvaluating on combined test data...")
    X_test_combined = np.vstack([X_test_cross, X_test_within])
    y_test_combined = np.concatenate([y_test_cross, y_test_within])
    results_combined = rf_model.evaluate(X_test_combined, y_test_combined)
    print_evaluation_results(results_combined, "Combined Test")

print("\nRandom Forest evaluation complete.")

Training Random Forest model...
Training accuracy: 1.0000

Evaluating on test_cross data (different subjects)...
Loading 873 EEG samples for test_cross...
Loaded test_cross data: (873, 190) features, 873 labels

===== Test Cross-Subject Results =====
Accuracy: 0.4845
Precision: 0.5833
Recall: 0.4845
F1 Score: 0.4556

Confusion Matrix:
[[206 110   3]
 [123 180   4]
 [179  31  37]]

Per-class metrics:
Class A:
  Precision: 0.4055
  Recall: 0.6458
  F1 Score: 0.4982
Class C:
  Precision: 0.5607
  Recall: 0.5863
  F1 Score: 0.5732
Class F:
  Precision: 0.8409
  Recall: 0.1498
  F1 Score: 0.2543

Evaluating on test_within data (same subjects)...
Loading 344 EEG samples for test_within...
Loaded test_within data: (344, 190) features, 344 labels

===== Test Within-Subject Results =====
Accuracy: 0.7471
Precision: 0.7856
Recall: 0.7471
F1 Score: 0.7325

Confusion Matrix:
[[131  15   0]
 [ 26  99   1]
 [ 37   8  27]]

Per-class metrics:
Class A:
  Precision: 0.6753
  Recall: 0.8973
  F1 Score: 

In [25]:
# Initialize and train the SVM model
svm_model = EEGSVM(C=1.0, kernel='rbf', class_weight='balanced')
svm_model.train(X_train, y_train)

# Evaluate on test_cross data
print("\nEvaluating SVM on test_cross data (different subjects)...")
X_test_cross, y_test_cross = dataset.load_data(data_type='test_cross')
print(f"Loaded test_cross data: {X_test_cross.shape} features, {len(y_test_cross)} labels")

if len(y_test_cross) > 0:
    results_cross = svm_model.evaluate(X_test_cross, y_test_cross)
    print_evaluation_results(results_cross, "SVM Test Cross-Subject")
else:
    print("No test_cross data available for SVM.")

# Evaluate on test_within data
print("\nEvaluating SVM on test_within data (same subjects)...")
X_test_within, y_test_within = dataset.load_data(data_type='test_within')
print(f"Loaded test_within data: {X_test_within.shape} features, {len(y_test_within)} labels")

if len(y_test_within) > 0:
    results_within = svm_model.evaluate(X_test_within, y_test_within)
    print_evaluation_results(results_within, "SVM Test Within-Subject")
else:
    print("No test_within data available for SVM.")

# Combined evaluation
if len(y_test_cross) > 0 and len(y_test_within) > 0:
    print("\nEvaluating SVM on combined test data...")
    X_test_combined = np.vstack([X_test_cross, X_test_within])
    y_test_combined = np.concatenate([y_test_cross, y_test_within])
    results_combined = svm_model.evaluate(X_test_combined, y_test_combined)
    print_evaluation_results(results_combined, "SVM Combined Test")

print("\nSVM evaluation complete.")


Training SVM model...
Training accuracy: 0.8136

Evaluating SVM on test_cross data (different subjects)...
Loading 873 EEG samples for test_cross...
Loaded test_cross data: (873, 190) features, 873 labels

===== SVM Test Cross-Subject Results =====
Accuracy: 0.5074
Precision: 0.5140
Recall: 0.5074
F1 Score: 0.5054

Confusion Matrix:
[[175 108  36]
 [ 89 173  45]
 [122  30  95]]

Per-class metrics:
Class A:
  Precision: 0.4534
  Recall: 0.5486
  F1 Score: 0.4965
Class C:
  Precision: 0.5563
  Recall: 0.5635
  F1 Score: 0.5599
Class F:
  Precision: 0.5398
  Recall: 0.3846
  F1 Score: 0.4492

Evaluating SVM on test_within data (same subjects)...
Loading 344 EEG samples for test_within...
Loaded test_within data: (344, 190) features, 344 labels

===== SVM Test Within-Subject Results =====
Accuracy: 0.7267
Precision: 0.7398
Recall: 0.7267
F1 Score: 0.7310

Confusion Matrix:
[[103  13  30]
 [ 17  98  11]
 [ 17   6  49]]

Per-class metrics:
Class A:
  Precision: 0.7518
  Recall: 0.7055
  F1 S

In [26]:

# Initialize and train the Gradient Boosting model
gb_model = EEGGradientBoost(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    subsample=1.0
)
gb_model.train(X_train, y_train)

# Evaluate on test_cross data
print("\nEvaluating Gradient Boosting on test_cross data (different subjects)...")
X_test_cross, y_test_cross = dataset.load_data(data_type='test_cross')
print(f"Loaded test_cross data: {X_test_cross.shape} features, {len(y_test_cross)} labels")

if len(y_test_cross) > 0:
    results_cross = gb_model.evaluate(X_test_cross, y_test_cross)
    print_evaluation_results(results_cross, "GB Test Cross-Subject")
else:
    print("No test_cross data available for Gradient Boosting.")

# Evaluate on test_within data
print("\nEvaluating Gradient Boosting on test_within data (same subjects)...")
X_test_within, y_test_within = dataset.load_data(data_type='test_within')
print(f"Loaded test_within data: {X_test_within.shape} features, {len(y_test_within)} labels")

if len(y_test_within) > 0:
    results_within = gb_model.evaluate(X_test_within, y_test_within)
    print_evaluation_results(results_within, "GB Test Within-Subject")
else:
    print("No test_within data available for Gradient Boosting.")

# Combined evaluation
if len(y_test_cross) > 0 and len(y_test_within) > 0:
    print("\nEvaluating Gradient Boosting on combined test data...")
    X_test_combined = np.vstack([X_test_cross, X_test_within])
    y_test_combined = np.concatenate([y_test_cross, y_test_within])
    results_combined = gb_model.evaluate(X_test_combined, y_test_combined)
    print_evaluation_results(results_combined, "GB Combined Test")

print("\nGradient Boosting evaluation complete.")

Training Gradient Boosting model...
Training accuracy: 0.8975

Evaluating Gradient Boosting on test_cross data (different subjects)...
Loading 873 EEG samples for test_cross...
Loaded test_cross data: (873, 190) features, 873 labels

===== GB Test Cross-Subject Results =====
Accuracy: 0.4914
Precision: 0.5133
Recall: 0.4914
F1 Score: 0.4733

Confusion Matrix:
[[183 119  17]
 [ 93 193  21]
 [155  39  53]]

Per-class metrics:
Class A:
  Precision: 0.4246
  Recall: 0.5737
  F1 Score: 0.4880
Class C:
  Precision: 0.5499
  Recall: 0.6287
  F1 Score: 0.5866
Class F:
  Precision: 0.5824
  Recall: 0.2146
  F1 Score: 0.3136

Evaluating Gradient Boosting on test_within data (same subjects)...
Loading 344 EEG samples for test_within...
Loaded test_within data: (344, 190) features, 344 labels

===== GB Test Within-Subject Results =====
Accuracy: 0.7762
Precision: 0.7762
Recall: 0.7762
F1 Score: 0.7713

Confusion Matrix:
[[125  10  11]
 [ 18 104   4]
 [ 26   8  38]]

Per-class metrics:
Class A:
  P