## CSP Type:
1, CSSP: Common Spatial Spectral Pattern

2, SWCSP: Spectrally Weighted Common Spatial Patterns

3, ISSPL: Iterative Spatio Spectral Patterns Learning

4, FBCSP: Filter Bank Common Spatial Patterns

5, SCSSP: Seperable Common Spatio-spectral patterns


In [None]:
%matplotlib tk 
import mne
from mne.decoding import CSP
from mne.preprocessing import ICA
import numpy as np
import matplotlib.pyplot as plt

In [None]:
plot_enable =1

In [15]:
epoch_file = '../preproc/Physionet_ssp_ica_epo.fif'
epochs = mne.read_epochs(epoch_file)
epochs = epochs.apply_baseline()
epochs.equalize_event_counts()

Reading ../preproc/Physionet_ssp_ica_epo.fif ...
    Read a total of 2 projection items:
        EOG-eeg--0.200-0.200-PCA-01 (1 x 27) active
        EOG-eeg--0.200-0.200-PCA-02 (1 x 27) active
    Found the data of interest:
        t =   -2000.00 ...    4000.00 ms
        0 CTF compensation matrices available
Not setting metadata
Not setting metadata
90 matching events found
No baseline correction applied
Created an SSP operator (subspace dimension = 2)
2 projection items activated
Applying baseline correction (mode: mean)
Dropped 2 epochs: 72, 85


(<EpochsFIF |  88 events (all good), -2 - 4 sec, baseline -2 – 0 sec, ~17.5 MB, data loaded,
  'T1': 44
  'T2': 44>,
 array([72, 85]))

In [None]:

epochs.events.shape

In [16]:
epochs_data = epochs.get_data()
# labels = epochs.events[epochs.events[:,-1]!=3]
labels = epochs.events[:,-1]
labels = labels-2
# labels = epochs.events[:,-1]-2
print(f"Data Size: {epochs_data.shape}")

# train data 
epochs_train = epochs.copy().crop(tmin=1, tmax=2)
epochs_train_data = epochs_train.get_data()
print(f"Train Size: {epochs_train_data.shape}")


Data Size: (88, 27, 961)
Train Size: (88, 27, 161)


In [None]:
## Task  Analysis
if plot_enable==1:
    task = T2
    title = str(task)[11:11+2] +' - '+ epoch_file.split('/')[-1]
    # task.plot_topomap();
    # task.plot_white(); # Noise cov required
    # task.plot_field(); # requires  surf maps
    # task.plot_sensors();
    # task.plot_topo();
    # task.plot_joint(times=[0.0, 0.2, 0.3]);#,picks=['C4','C2','C6','C1','C3','C5']);
    # task.plot_image(titles=f'{title} Image',show_names='all');
    # task.plot(proj= True, titles = '{task} - Projs - True',spatial_colors=True);
    # task.plot(proj= False, titles = '{task} -  Projs - False',spatial_colors=True);
    # task.plot(proj= 'reconstruct', titles = '{task} -  Projs - reconstruct',spatial_colors=True);
    # task.plot_topomap();
    # task.plot(gfp= "only"); # population standard deviation of the signal across channels
    ## Compare regions
    # mne.channels.combine_channels({task}, roi_dict, method='mean')
    ## Compare conditions
    evoked = dict(T1 = list(epochs['T1'].iter_evoked()), T2 = list(epochs['T2'].iter_evoked()))
    mne.viz.plot_compare_evokeds(evoked, combine='mean');
    # task_t0 = mne.combine_evoked([task, T0], weights=[1,-1])
    # task_t0.plot_joint();

In [None]:
# Evoked data
# T0 = epochs['T0'].average() # Shape = chan x timepnts
T1 = epochs['T1'].average()
T2 = epochs['T2'].average()
print(T1.data.shape)
print(T2.data.shape)

### Classification

In [None]:
from sklearn.model_selection import ShuffleSplit
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.tree import DecisionTreeClassifier as DTC
from sklearn.svm import SVC
from sklearn.kernel_approximation import RBFSampler

In [None]:
cv = ShuffleSplit(10, test_size = 0.2, random_state=1)
cv_split = cv.split(epochs_train_data, labels)


In [None]:
lda = LDA()
svc = SVC()
dtc = DTC()


In [17]:
csp = CSP(n_components=4, reg='ledoit_wolf', log=True, transform_into= 'average_power', #  reg='ledoit_wolf', log=None, norm_trace=False, rank='full',cov_est = 'epoch')#
            cov_est = 'concat',rank='full', norm_trace= True)
rbf = RBFSampler(gamma=1, random_state=1)
clf = SVC()
pip = Pipeline([('CSP', csp),('CLF', clf)])
scores = cross_val_score(pip, epochs_train_data, labels, cv = cv, verbose=False)
scores

Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating covariance using LEDOIT_WOLF
Done.
Computing rank from data with rank='full'
    MAG: rank 27 from info
Reducing data rank from 27 -> 27
Estimating

array([0.94444444, 0.94444444, 1.        , 1.        , 1.        ,
       0.94444444, 1.        , 1.        , 0.94444444, 1.        ])

In [18]:
class_balance = np.mean(labels == labels[0])
class_balance = max(class_balance, 1. - class_balance)
print("Classification accuracy: %f / Chance level: %f" % (np.mean(scores),
                                                          class_balance))

Classification accuracy: 0.977778 / Chance level: 0.500000


In [None]:
csp_data = csp.fit_transform(epochs_train_data, labels)
csp.plot_patterns(epochs.info, title='Patterns', size = 1);
csp.plot_filters(epochs.info, title='Filters');
print(csp.filters_.shape)
print(csp.patterns_.shape)

In [None]:
sfreq = epochs.info['sfreq']
w_length = int(sfreq * 0.05)   # running classifier: window length
w_step = int(sfreq * 0.01)  # running classifier: window step size
w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step)

scores_windows = []

for train_idx, test_idx in cv_split:
    y_train, y_test = labels[train_idx], labels[test_idx]

    X_train = csp.fit_transform(epochs_train_data[train_idx], y_train)
    X_test = csp.transform(epochs_train_data[test_idx])

    # fit classifier
    clf.fit(X_train, y_train)

    # running classifier: test classifier on sliding window
    score_this_window = []
    for n in w_start:
        X_test = csp.transform(epochs_data[test_idx][:, :, n:(n + w_length)])
        score_this_window.append(clf.score(X_test, y_test))
    scores_windows.append(score_this_window)
w_times = (w_start + w_length / 2.) / sfreq + epochs.tmin

plt.scatter(w_times,np.mean(scores_windows,0))

In [None]:
np.mean(scores_windows,0)

In [None]:
sfreq = epochs.info['sfreq']
w_length = int(sfreq * 0.05)   # running classifier: window length
w_step = int(sfreq * 0.01)  # running classifier: window step size
w_start = np.arange(0, epochs_data.shape[2] - w_length, w_step)
w_start

In [None]:
csp = CSP(n_components=6, reg='ledoit_wolf', log=None, transform_into= 'average_power', #  reg='ledoit_wolf', log=None, norm_trace=False, rank='full',cov_est = 'epoch')#
            cov_est = 'epoch',rank='full', norm_trace= True)
csp_data = csp.fit_transform(epochs_data , labels);
csp_data.shape

In [None]:
srcs = csp.transform(epochs_data)
evoked.data = np.mean(srcs,axis=0)
evoked.times = np.arange(evoked.data.shape[0])

In [None]:
evoked.plot_topomap(times=[0, 1, 2, 3, 4]);

In [None]:
evoked = epochs.average()
evoked.data = csp.filters_.T
evoked.times = np.arange(evoked.data.shape[0])
evoked.plot_topomap();