In [18]:
import sklearn
import mne
import numpy as np
import glob
import os
import sys
import logging

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV
from joblib import parallel_backend
import multiprocessing


In [2]:
def split_train_test_path_list(data_path, file_name_template, train_ratio):
    file_list = sorted(glob.glob(os.path.join(data_path, file_name_template)))
    np.random.shuffle(file_list)
    split_id = int(len(file_list) * train_ratio)

    train_list = file_list[:split_id]
    test_list = file_list[split_id:]

    return train_list, test_list


In [3]:
def read_eeg_epochs(train_list, test_list):
    epochs_train_list = []
    epochs_test_list = []

    for file_path in train_list:
        with mne.utils.use_log_level("ERROR"):
            epoch_train = mne.read_epochs(file_path, preload=True)
            epochs_train_list.append(epoch_train)

    for file_path in test_list:
        with mne.utils.use_log_level("ERROR"):
            epoch_test = mne.read_epochs(file_path, preload=True)
            epochs_test_list.append(epoch_test)

    epochs_train = mne.concatenate_epochs(epochs_train_list)
    epochs_test = mne.concatenate_epochs(epochs_test_list)

    return epochs_train, epochs_test

In [4]:
def get_X_and_Y_from_epochs(train_list, test_list, events, picks=None, t_min = -0.2, t_max = 0.5):

    epochs_train, epochs_test = read_eeg_epochs(train_list, test_list)

    epochs_train_list_event1 = epochs_train[events[0]].get_data(picks=picks, tmin=t_min, tmax=t_max)
    epochs_train_list_event2 = epochs_train[events[1]].get_data(picks=picks, tmin=t_min, tmax=t_max)

    labels_up_train = [0] * len(epochs_train_list_event1)
    labels_inv_train = [1] * len(epochs_train_list_event2)

    X_train = np.concatenate((epochs_train_list_event1, epochs_train_list_event2), axis=0)
    y_train = np.concatenate((labels_up_train, labels_inv_train), axis=0)

    epochs_test_list_event1 = epochs_test[events[0]].get_data(picks=picks, tmin=t_min, tmax=t_max)
    epochs_test_list_event2 = epochs_test[events[1]].get_data(picks=picks, tmin=t_min, tmax=t_max)

    labels_up_test = [0] * len(epochs_test_list_event1)
    labels_inv_test = [1] * len(epochs_test_list_event2)

    X_test = np.concatenate((epochs_test_list_event1, epochs_test_list_event2), axis=0)
    y_test = np.concatenate((labels_up_test, labels_inv_test), axis=0)


    return X_train, X_test, y_train, y_test

In [5]:
def train_and_test_model(X_train, X_test, y_train, y_test, pipeline, gridSerach = False):

    pipeline.fit(X_train, y_train)

    # predict test data
    y_test_pred = pipeline.predict(X_test)
    test_score = accuracy_score(y_test, y_test_pred)

    # predict train data
    y_train_pred = pipeline.predict(X_train)
    train_score = accuracy_score(y_train, y_train_pred)

    print(f"test_score: {test_score:.4f}")
    print(f"train_score: {train_score:.4f}")

    if gridSerach:
        print(f"The best parameters: {pipeline.best_params_}")
        print(f"The best accuracy: {pipeline.best_score_:.4f}")

In [6]:
log_file = "training_log.txt"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(log_file, mode="a"), # a to overwrite
        logging.StreamHandler()
    ]
)

class StreamToLogger:
    def __init__(self, logger, level):
        self.logger = logger
        self.level = level
        self.line_buffer = ""

    def write(self, message):
        if message.strip():
            self.logger.log(self.level, message.strip())

    def flush(self):
        pass

sys.stdout = StreamToLogger(logging.getLogger(), logging.INFO)
sys.stderr = StreamToLogger(logging.getLogger(), logging.ERROR)


In [17]:
dir_path = 'D:\studia\magisterka\dane EEG\BADANIE_POLITYCZNE_2022_eeg_bdfy\EEG_preprocessed'
file_name_template = "s*.bdf-epo.fif"
train_ratio = 0.8

flatten_transformer = FunctionTransformer(lambda X: X.reshape(X.shape[0], -1))

## MODEL 1: time-range 0-250 Logistic Regression

In [21]:
model_1 = Pipeline(steps=[('reshape', flatten_transformer), ('scaler', StandardScaler()), ('logisticRegression', LogisticRegression(max_iter=10000))])

train_list, test_list = split_train_test_path_list(dir_path, file_name_template, train_ratio)
X_train, X_test, y_train, y_test = get_X_and_Y_from_epochs(train_list, test_list, ["up", "inv"], t_min = 0.0, t_max = 0.25)

train_and_test_model(X_train, X_test, y_train, y_test, model_1)

Not setting metadata
18705 matching events found
Applying baseline correction (mode: mean)
Not setting metadata
4631 matching events found
Applying baseline correction (mode: mean)
test_score: 0.7385
train_score: 0.8254


### MODEL 2: Support Vector Machine with grid search


In [16]:
model_2 = Pipeline(steps=[('reshape', flatten_transformer), ('scaler', StandardScaler()), ('svc', SVC())])

param_grid = dict(
    svc__kernel=['linear'],
    svc__C=[0.1, 1.0],
    svc__gamma=[0.001, 0.01],
)

logging.info("Rozpoczynam trenowanie modelu...")
train_list, test_list = split_train_test_path_list(dir_path, file_name_template, train_ratio)
X_train, X_test, y_train, y_test = get_X_and_Y_from_epochs(train_list, test_list, ["up", "inv"], t_min = 0.0, t_max = 0.25)
logging.info("Rozpoczęto GridSearchCV.")
with parallel_backend('multiprocessing'):
    grid_search_model_2 = GridSearchCV(model_2, param_grid, cv=3, scoring='accuracy', n_jobs = -1, verbose=3)
    train_and_test_model(X_train, X_test, y_train, y_test, grid_search_model_2, True)


logging.info(f"Najlepsze parametry: {grid_search_model_2.best_params_}")
logging.info(f"Najlepszy wynik cross-validation: {grid_search_model_2.best_score_}")
logging.info("Trenowanie zakończone.")

NameError: name 'flatten_transformer' is not defined