In [1]:
!pip install mne




In [2]:
from pathlib import Path
import numpy as np
import pandas as pd
import mne

def load_task_edf_to_tensor(folder_path: str,
                            info_csv_path: str,
                            resample_to: int | None = None):
    """
    Load EEG during mental arithmetic task (files *_2.edf) into a 3D tensor.

    Returns
    -------
    X : np.ndarray
        Shape (n_subjects, n_channels, n_times)
    y : np.ndarray
        Shape (n_subjects,), Count quality (0 = bad, 1 = good)
    subjects : list[str]
        Subject IDs like 'Subject00', in same order as X and y
    """
    folder = Path(folder_path)

    if not folder.is_dir():
        raise NotADirectoryError(f"{folder_path} is not a valid directory")

    # Read subject info with labels
    info_df = pd.read_csv(info_csv_path)
    # Make sure the column names match exactly:
    # 'Subject' and 'Count quality'
    label_map = dict(zip(info_df["Subject"], info_df["Count quality"]))

    X_list = []
    y_list = []
    subjects = []

    # Only task recordings: *_2.edf
    for file_path in sorted(folder.glob("Subject*_2.edf")):
        subj = file_path.stem.split("_")[0]  # "Subject29_2" -> "Subject29"

        if subj not in label_map:
            print(f"Warning: {subj} not in subject-info, skipping.")
            continue

        print(f"Loading {file_path.name} for {subj} ...")
        raw = mne.io.read_raw_edf(file_path, preload=True, verbose=False)

        # Optional: resample to a common sampling rate (e.g. 128 Hz)
        if resample_to is not None:
            raw.resample(resample_to)

        # (n_channels, n_times)
        data = raw.get_data()

        X_list.append(data)
        y_list.append(int(label_map[subj]))
        subjects.append(subj)

    if not X_list:
        raise ValueError("No *_2.edf files loaded. Check folder path and file pattern.")

    # Make all recordings the same length (time axis)
    n_channels = X_list[0].shape[0]
    lengths = [d.shape[1] for d in X_list]
    min_len = min(lengths)  # truncate to shortest recording

    X = np.stack([d[:, :min_len] for d in X_list], axis=0)  # (N, C, T)
    y = np.array(y_list, dtype=int)

    print("Final tensor shape (N, C, T):", X.shape)
    print("Labels shape:", y.shape)

    return X, y, subjects


In [3]:
source_folder = "/kaggle/input/ahmadi-dataset"
info_csv     = "/kaggle/input/ahmadi-dataset/subject-info.csv"

X, y, subjects = load_task_edf_to_tensor(
    folder_path=source_folder,
    info_csv_path=info_csv,
    resample_to=128  # or None if you don't want resampling
)

print("Subjects:", subjects)
print("X.shape:", X.shape)
print("y:", y)


Loading Subject00_2.edf for Subject00 ...
Loading Subject01_2.edf for Subject01 ...
Loading Subject02_2.edf for Subject02 ...
Loading Subject03_2.edf for Subject03 ...
Loading Subject04_2.edf for Subject04 ...
Loading Subject05_2.edf for Subject05 ...
Loading Subject06_2.edf for Subject06 ...
Loading Subject07_2.edf for Subject07 ...
Loading Subject08_2.edf for Subject08 ...
Loading Subject09_2.edf for Subject09 ...
Loading Subject10_2.edf for Subject10 ...
Loading Subject11_2.edf for Subject11 ...
Loading Subject12_2.edf for Subject12 ...
Loading Subject13_2.edf for Subject13 ...
Loading Subject14_2.edf for Subject14 ...
Loading Subject15_2.edf for Subject15 ...
Loading Subject16_2.edf for Subject16 ...
Loading Subject17_2.edf for Subject17 ...
Loading Subject18_2.edf for Subject18 ...
Loading Subject19_2.edf for Subject19 ...
Loading Subject20_2.edf for Subject20 ...
Loading Subject21_2.edf for Subject21 ...
Loading Subject22_2.edf for Subject22 ...
Loading Subject23_2.edf for Subjec

In [4]:
N, C, T = X.shape

# X_matrix is your “big matrix of all data”
X_matrix = X.reshape(N, C * T)
print("X_matrix shape:", X_matrix.shape)   # (N, C*T)
print("y shape:", y.shape)                 # (N,)


X_matrix shape: (36, 166656)
y shape: (36,)


In [5]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

X_train, X_test, y_train, y_test = train_test_split(
    X_matrix, y, test_size=0.2, random_state=42, stratify=y
)

clf = RandomForestClassifier(
    n_estimators=200,
    max_depth=None,
    random_state=42,
    n_jobs=-1
)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))


              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.75      1.00      0.86         6

    accuracy                           0.75         8
   macro avg       0.38      0.50      0.43         8
weighted avg       0.56      0.75      0.64         8



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [6]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier

from xgboost import XGBClassifier


In [7]:
# X_matrix: shape (N, features)
# y: shape (N,)

X_train, X_test, y_train, y_test = train_test_split(
    X_matrix,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y   # keeps class ratio similar in train/test
)


In [8]:
models = {
    "KNN": KNeighborsClassifier(
        n_neighbors=3
    ),
    "RandomForest": RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        random_state=42,
        n_jobs=-1
    ),
    "DecisionTree": DecisionTreeClassifier(
        max_depth=None,
        random_state=42
    ),
    "XGBoost": XGBClassifier(
        n_estimators=300,
        max_depth=4,
        learning_rate=0.05,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="binary:logistic",
        eval_metric="logloss",
        n_jobs=-1,
        tree_method="hist"  # good default on CPUs
    ),
}


In [9]:
results = []

for name, model in models.items():
    print(f"Training {name}...")
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)
    
    acc  = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, zero_division=0)
    rec  = recall_score(y_test, y_pred, zero_division=0)
    f1   = f1_score(y_test, y_pred, zero_division=0)
    
    results.append({
        "model": name,
        "accuracy": acc,
        "precision": prec,
        "recall": rec,
        "f1": f1,
    })

# Put results into a nice table
results_df = pd.DataFrame(results).set_index("model")

# Optional: sort by F1 score
results_df = results_df.sort_values(by="f1", ascending=False)

print("\nBenchmark results:")
print(results_df.round(3))


Training KNN...
Training RandomForest...




Training DecisionTree...
Training XGBoost...

Benchmark results:
              accuracy  precision  recall     f1
model                                           
DecisionTree     0.875      1.000   0.833  0.909
KNN              0.750      0.750   1.000  0.857
RandomForest     0.750      0.750   1.000  0.857
XGBoost          0.625      0.714   0.833  0.769
