In [13]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import *
from sktime_neuro.datasets.matching_pennies_util import *
from sktime_neuro.transformations.multivariate_detrender import ColumnDetrender
from sktime_neuro.transformations.preprocessing.PREP import do_prep, build_prep_params
from sktime_neuro.transformations.series_to_panel.eeg_epoching import epoch
from sktime_neuro.utils.mne_processing import create_annotation
from sktime.classification.kernel_based import RocketClassifier
from sktime_neuro.datasets.matching_pennies_util import PENNIES_ONSET_LABELS, load_pickles
from sktime_neuro.transformations.series.filterforseries import FilterforSeries
from sklearn.metrics import *
from mne.epochs import Epochs
from typing import *

In [14]:
def detrend(data, n_jobs=1) -> np.ndarray:
    dt = ColumnDetrender(data)
    return dt.detrend(n_jobs=n_jobs)

In [15]:
def load_subject_and_detrend(subjectId: int) -> Tuple[np.ndarray, np.ndarray, float]:
    raw = get_raw(subjectId)
    sf = raw.info["sfreq"]
    annotations = create_annotation(raw)
    data = raw.get_data().transpose()
    detrended = detrend(data, -1)
    return annotations, detrended, sf

def load_subject(sid):
    raw = get_raw(sid)
    sf = raw.info["sfreq"]
    annotations = create_annotation(raw)
    data = raw.get_data().transpose()
    return annotations, data, sf

def load_subject_detrend_filter(subjectId: int, lf: float, hf: float, resampleTo = None) -> Tuple[np.ndarray, np.ndarray, float]:
    raw = get_raw(subjectId)
    sf = raw.info["sfreq"]
    annotations = create_annotation(raw)
    data = raw.get_data().transpose()
    detrended = detrend(data, -1)
    if resampleTo is None:
        resampleTo = sf
    if resampleTo is not None:
        sf = resampleTo
    #filtered = FilterforSeries(lf, hf, resampleTo).fit_transform(detrended)
    filtered = applyfilter(detrended, lf, hf, resampleTo)
    return annotations, filtered, sf

def load_all_subjects_raw() -> List[Tuple[np.ndarray, np.ndarray, float]]:
    allData = []
    for i in range(5,12):
        allData.append(load_subject_and_detrend(i))
    return allData

def load_all_subjects_filter(lf: float, hf: float, rsto = None) -> List[Tuple[np.ndarray, np.ndarray, float]]:
    allData = []
    for i in range(5,12):
        allData.append(load_subject_detrend_filter(i, lf, hf, rsto))
    return allData

def applyfilter(detrended, lf, hf, sf, rsto):
    if rsto is None:
        rsto = sf
    filtered = FilterforSeries(l_freq=lf, h_freq=hf, sfreq=rsto).fit_transform(detrended)
    return filtered

In [16]:
def epoch_subject(subjectData: Tuple[np.ndarray, np.ndarray, float]) -> Tuple[np.ndarray, np.ndarray]:
    epochs, annotations = epoch(subjectData[1], subjectData[0], PENNIES_ONSET_LABELS, [-0.5, 0.5], sfreq=subjectData[2])
    return epochs, annotations

def epoch_set(dataset: List[Tuple[np.ndarray, np.ndarray, float]]) -> List[Tuple[np.ndarray, np.ndarray]]:
    epoched = []
    for subj in dataset:
        epoched.append(epoch_subject(subj))
    return epoched


The search_str was "/home/patchouli/programming/foss/sktime-neuro/cache/eeg_matchingpennies/sub-08/**/eeg/sub-08*eeg.json"
  return read_raw_bids(bids_path=bidspath, verbose=False)


In [18]:
# Uncomment this if you don't have them pickled already
#allData = load_all_subjects()
allData = load_pickles("detrended")
#i = 0
#for d in allData:
    #filtered = applyfilter(d[1], 0.5, 50, d[2], None)
    #allData[i][1] = filtered
    #allData[i] = (allData[i][0], filtered, allData[i][2])
    #i = i+1

In [19]:
epochedSet = epoch_set(allData)

Perform a test_train_split and classify on the first subject's data.

In [None]:
# Build a train set. [0][0] for the data, [0][1] for labels
# This creates a 225/75 split of /epochs/
x_train, x_test, y_train, y_test = train_test_split(epochedSet[0][0], epochedSet[0][1])

In [None]:
rocket = RocketClassifier(n_jobs=16)
rocket.fit(x_train, y_train)
pred = rocket.predict(x_test)
print(classification_report(y_test, pred))

Run a 5-fold crossvalidation on every dataset

In [None]:
scorevec = []
for i in range(0,len(epochedSet)):
    #x_train, x_test, y_train, y_test = train_test_split(epochedSet[i][0], epochedSet[i][1])
    scores = cross_val_score(rocket, epochedSet[i][0], epochedSet[i][1], cv=5)
    scorevec.append(scores)
    print(scores)

In [None]:
np.savetxt("latestresults.csv", scorevec, delimiter=', ', fmt='%12.8f')