In [1]:
import numpy as np 
import math
import os 
import sys 
import random
import pandas as pd
from tqdm import tqdm
import copy
import numpy as np
import scipy as sp
import scipy.signal
from matplotlib import pyplot as plt
from pathlib import Path
import bbcpy
module_path = os.path.abspath(os.path.join('../'))
if module_path not in sys.path:
    sys.path.append(module_path)
from src.data.srm_datamodule import SRMDatamodule

In [None]:
import numpy as np
import scipy as sp
import sklearn as sk
from sklearn.base import TransformerMixin, BaseEstimator

import bbcpy.functions.helpers as helpers
# from bbcpy.functions import ImportFunc
from bbcpy.datatypes.eeg import Data
from bbcpy.functions.artireject import averagevariance

from bbcpy.datatypes.srm_eeg import SRM_Data


class _GEVDsf(sk.base.BaseEstimator, sk.base.TransformerMixin):
    d = None
    W = None
    A = None

    def calcAB(self, x, y=None):
        return None, None

    def calcPatterns(self, x):  #
        """
        calculates spatial pattern for data x
        :param x: data
        :return: A spatial pattern
        """
        covX = x.cov(target='all', estimator=self.estimator)
        return covX @ self.W_allcmps @ np.linalg.pinv(self.W_allcmps.T @ covX @ self.W_allcmps)[:, self.selected_cmps]

    def scoring(self, d, Y):  # just use EVs
        return d

    def select(self, score, n_cmps):  # this is the general case, CSP is a special case.
        return np.flipud(np.argsort(score))[:n_cmps]

    def fit(self, x, y=None, n_cmps=None):
        '''Fit CSP'''
        if n_cmps is None:
            n_cmps = self.n_cmps
        if y is not None:
            x.y = y
        A, B = self.calcAB(x, y)
        d, W = sp.linalg.eigh(A, B)
        score = self.scoring(d, W.T @ x)
        if n_cmps == 'all':
            selected_cmps = np.arange(x.nCh)
        else:
            selected_cmps = self.select(score, n_cmps)
        self.d = d[selected_cmps]
        self.W = W[:, selected_cmps]
        self.selected_cmps = selected_cmps
        self.W_allcmps = W
        self.A = self.calcPatterns(x)
        return self

    def transform(self, x, y=None):
        """Apply CSP.

        Parameters
        ----------
        x : ndarray, shape (n_matrices, n_channels, n_times)
            Multi-channel time-series
        y : ndarray, shape (n_trials, 2)
            Marker positions and marker class.
        Returns
        -------
        out : ndarray, shape (n_matrices, n_csp, n_time)
            transformed
        """
        return self.W.T @ x


class PCA(_GEVDsf):
    def __init__(self, n_cmps='all', excllev=None, estimator='scm', scoring=helpers.evscoring_EV,
                 select=helpers.evselect_best):
        self.n_cmps = n_cmps
        self.excllev = excllev
        self.estimator = estimator
        self.scoring = scoring
        self.select = select

    def calcAB(self, x, y=None):
        if isinstance(x, Data):
            if self.excllev is not None:
                Sigma_trial = x.cov(
                    target='all')  # no estimator here because on single trials, this will fuck up excllev
                covtr = np.trace(np.linalg.pinv(x.cov(target='all', estimator=self.estimator)) @ Sigma_trial, axis1=1,
                                 axis2=2) / x.shape[1]
                sel_tr = covtr <= self.excllev
                covs = x[sel_tr].cov(target='all', estimator=self.estimator)
            else:
                covs = x.cov(target='all', estimator=self.estimator)
            return covs, None
        else:
            c1 = sk.covariance.OAS().fit(x).covariance_
            return c1, None


class CSP(_GEVDsf):

    def __init__(self, n_cmps=6, excllev=None, estimator='scm', scoring=helpers.evscoring_EV,
                 select=helpers.evselect_best_csp):
        self.n_cmps = n_cmps
        self.excllev = excllev
        self.estimator = estimator
        self.scoring = scoring
        self.select = select

    def calcAB(self, x, y=None):
        if isinstance(x, Data) or isinstance(x, SRM_Data):  # has method cov etc
            if self.excllev is not None:
                covs = averagevariance(x, self.excllev, self.estimator).cov(target='class', estimator=self.estimator)
            else:
                covs = x.cov(target='class', estimator=self.estimator)
            return covs[0], covs[0] + covs[1]
        else:  # I would delete the following but merged it for Gabriel. Do we want to write all functions also to work
            # with arbitrary arrays? I think we should consider only our own datatypes.
            classes = np.unique(y)
            c1 = sk.covariance.OAS().fit(x[y == classes[0]]).covariance_
            c2 = sk.covariance.OAS().fit(x[y == classes[1]]).covariance_
            return c1, c1 + c2
        # if self.excllev is not None:
        #     covs = averagevariance(x, self.excllev, self.estimator).cov(target='class', estimator=self.estimator)
        # else:
        #     covs = x.cov(target='class', estimator=self.estimator)
        # return covs[0], covs[0] + covs[1]

In [2]:
data_dir = "../data/SMR/raw/"
ival= "2s:8s:10ms"
bands= [8,13]
chans= ["*"]
classes= ["R", "L"]
test_subjects_sessions_dict= {"S1": [1]}
train_subjects_sessions_dict= {"S1": [1]}
vali_subjects_sessions_dict={"S1": [1]}
concatenate_subjects= True
train_val_split=None

srm_datamodule = SRMDatamodule(data_dir=data_dir,
                                            ival=ival,
                                            bands=bands,
                                            chans=chans,
                                            classes=classes,
                                            test_subjects_sessions_dict=test_subjects_sessions_dict,
                                            train_subjects_sessions_dict=train_subjects_sessions_dict,
                                            vali_subjects_sessions_dict=vali_subjects_sessions_dict,
                                            concatenate_subjects=concatenate_subjects,
                                            train_val_split=train_val_split)

data_train = srm_datamodule.load_data(train_subjects_sessions_dict,
                                           concatenate_subjects)

INFO:root:Collecting subject S1 sessions from:  ../data/SMR/raw/
INFO:root:Loading subject: S1 finalized (1 from 1)
INFO:root:Prepare to Load : ['Session_1'] sessions
INFO:root:Preprocessing data..
INFO:root:Session_1 loaded; has the shape: (155, 62, 600)
INFO:root:Loading sessions: Session_1 finalized (1 from 1)


In [3]:
X_train = data_train
y_train = data_train.y

In [12]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)


INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:57210
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:57215'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:57213'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:57214'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:57216'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:57229', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:57229
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:57232
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:57230', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:57230
INFO:distributed.core:Starting est

In [4]:
import dask.array as da
# Convert your data to Dask arrays or dataframes
X_train_dask = da.from_array(X_train, chunks='auto')
y_train_dask = da.from_array(y_train, chunks='auto')

In [None]:
X_train_dask.dask

In [None]:
y_train_dask

In [5]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import KFold, ShuffleSplit, GridSearchCV
from sklearn.model_selection import cross_val_score

from bbcpy.pipeline import make_pipeline
import bbcpy.functions.helpers as helpers
from bbcpy.functions.base import ImportFunc
from bbcpy.functions.spatial import CSP, MBCSP
from bbcpy.functions.artireject import AverageVariance

from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import GradientBoostingClassifier
from dask_ml.decomposition import PCA
from dask_ml.wrappers import ParallelPostFit
from dask_ml.preprocessing import StandardScaler
from dask_ml.preprocessing import RobustScaler
from dask_ml.datasets import make_classification

In [14]:
cv = KFold()
var = ImportFunc(np.var, axis=2)

In [15]:
import dask_ml

In [None]:
x = X_train_dask
y = y_train_dask

In [None]:
import dask.array as da

x = da.ones((1000,1000), chunks=(100,100))
z = x.sum() # This uses Dask default local cluster
z

In [None]:
from sklearn.model_selection import cross_val_score
import joblib
# Fit the model using Dask
with joblib.parallel_backend('dask'):
    print(cross_val_score(csp_pipeline, X_train_dask, y_train_dask, cv=cv))

In [13]:
from dask_ml.wrappers import ParallelPostFit
import sklearn
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
import joblib

param_grid = {"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}


csp_pipeline = make_pipeline(CSP(scoring=helpers.evscoring_medvar,select=helpers.evselect_directorscut),
                             var,
                             np.log,
                             LDA())
cv = KFold()

with joblib.parallel_backend('dask'):
    # print(cross_val_score(csp_pipeline, X_train_dask, y_train_dask, cv=cv))
    print(cross_val_score(csp_pipeline, data_train, data_train.y, cv=cv))

INFO:distributed.scheduler:Receive client connection: Client-worker-100685a7-267f-11ee-84b8-00216a39830c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:57253


ValueError: 
All the 5 fits failed.
It is very likely that your model is misconfigured.
You can try to debug the error by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 402, in fit
    Xt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 340, in _fit
    self._validate_steps()
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 231, in _validate_steps
    raise TypeError(
TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' '<bbcpy.functions.base.ImportFunc object at 0x00000246249C6B60>' (type <class 'bbcpy.functions.base.ImportFunc'>) doesn't

--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 402, in fit
    Xt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 340, in _fit
    self._validate_steps()
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 231, in _validate_steps
    raise TypeError(
TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' '<bbcpy.functions.base.ImportFunc object at 0x00000246249C6D70>' (type <class 'bbcpy.functions.base.ImportFunc'>) doesn't

--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 402, in fit
    Xt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 340, in _fit
    self._validate_steps()
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 231, in _validate_steps
    raise TypeError(
TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' '<bbcpy.functions.base.ImportFunc object at 0x00000246249C6F20>' (type <class 'bbcpy.functions.base.ImportFunc'>) doesn't

--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 402, in fit
    Xt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 340, in _fit
    self._validate_steps()
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 231, in _validate_steps
    raise TypeError(
TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' '<bbcpy.functions.base.ImportFunc object at 0x00000246249C70D0>' (type <class 'bbcpy.functions.base.ImportFunc'>) doesn't

--------------------------------------------------------------------------------
1 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 402, in fit
    Xt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 340, in _fit
    self._validate_steps()
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\pipeline.py", line 231, in _validate_steps
    raise TypeError(
TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' '<bbcpy.functions.base.ImportFunc object at 0x00000246249C7280>' (type <class 'bbcpy.functions.base.ImportFunc'>) doesn't


In [8]:
print(cross_val_score(csp_pipeline, X_train_dask, y_train_dask, cv=cv))



ValueError: 
All the 5 fits failed.
It is very likely that your model is misconfigured.
You can try to debug the error by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
5 fits failed with the following error:
Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\pipeline\pipeline.py", line 543, in _fit_transform_one
    Xt, yt = transformer.fit_transform(X, y, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\base.py", line 851, in fit_transform
    return self.fit(X, y, **fit_params).transform(X)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\functions\spatial.py", line 43, in fit
    A, B = self.calcAB(x, y)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\functions\spatial.py", line 120, in calcAB
    c1 = sk.covariance.OAS().fit(x[y == classes[0]]).covariance_
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\core.py", line 1993, in __getitem__
    dsk, chunks = slice_array(out, self.name, self.chunks, index2, self.itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 176, in slice_array
    dsk_out, bd_out = slice_with_newaxes(out_name, in_name, blockdims, index, itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 198, in slice_with_newaxes
    dsk, blockdims2 = slice_wrap_lists(out_name, in_name, blockdims, index2, itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 254, in slice_wrap_lists
    return slice_slices_and_integers(out_name, in_name, blockdims, index)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 303, in slice_slices_and_integers
    raise ValueError(
ValueError: Arrays chunk sizes are unknown: (nan,)

A possible solution: https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks
Summary: to compute chunks sizes, use

   x.compute_chunk_sizes()  # for Dask Array `x`
   ddf.to_dask_array(lengths=True)  # for Dask DataFrame `ddf`

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\model_selection\_validation.py", line 686, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\pipeline\pipeline.py", line 179, in fit
    Xt, yt = self._fit(X, y, **fit_params_steps)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\pipeline\pipeline.py", line 146, in _fit
    X, y, fitted_transformer = fit_transform_one_cached(
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\joblib\memory.py", line 353, in __call__
    return self.func(*args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\pipeline\pipeline.py", line 545, in _fit_transform_one
    Xt = transformer.fit_transform(X, y, **fit_params)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\utils\_set_output.py", line 142, in wrapped
    data_to_wrap = f(self, X, *args, **kwargs)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\sklearn\base.py", line 851, in fit_transform
    return self.fit(X, y, **fit_params).transform(X)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\functions\spatial.py", line 43, in fit
    A, B = self.calcAB(x, y)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\bbcpy\functions\spatial.py", line 120, in calcAB
    c1 = sk.covariance.OAS().fit(x[y == classes[0]]).covariance_
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\core.py", line 1993, in __getitem__
    dsk, chunks = slice_array(out, self.name, self.chunks, index2, self.itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 176, in slice_array
    dsk_out, bd_out = slice_with_newaxes(out_name, in_name, blockdims, index, itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 198, in slice_with_newaxes
    dsk, blockdims2 = slice_wrap_lists(out_name, in_name, blockdims, index2, itemsize)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 254, in slice_wrap_lists
    return slice_slices_and_integers(out_name, in_name, blockdims, index)
  File "C:\Users\alioo\Desktop\MA\bbcpy_AutoML\bbcpy_env\lib\site-packages\dask\array\slicing.py", line 303, in slice_slices_and_integers
    raise ValueError(
ValueError: Arrays chunk sizes are unknown: (nan,)

A possible solution: https://docs.dask.org/en/latest/array-chunks.html#unknown-chunks
Summary: to compute chunks sizes, use

   x.compute_chunk_sizes()  # for Dask Array `x`
   ddf.to_dask_array(lengths=True)  # for Dask DataFrame `ddf`


In [None]:
with joblib.parallel_backend('dask'):
    sklearn.model_selection.cross_validate(csp_pipeline, x, y, cv=3)

In [None]:
csp_pipeline

In [None]:
from dask_ml.wrappers import ParallelPostFit
import sklearn

# Create your Scikit-learn estimator (e.g., RandomForestClassifier, SVC, etc.)
estimator = RandomForestClassifier()

# Wrap the estimator with Dask's ParallelPostFit
dask_estimator = ParallelPostFit(estimator)