#### Import datasets

In [1]:
import mne
from mne.channels import make_standard_montage
from mne.datasets import eegbci
from mne.io import concatenate_raws, read_raw_edf

In [2]:
runs = [6, 10, 14]  # motor imagery: hands vs feet

source_fnames = eegbci.load_data(1, runs)
source = concatenate_raws([read_raw_edf(f, preload=True) for f in source_fnames])
eegbci.standardize(source)  # set channel names
montage = make_standard_montage("standard_1005")
source.set_montage(montage)
source.annotations.rename(dict(T0="rest", T1="hands", T2="feet"))
source.set_eeg_reference(projection=True)

Extracting EDF parameters from C:\Users\zeyad\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R06.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\zeyad\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R10.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Extracting EDF parameters from C:\Users\zeyad\mne_data\MNE-eegbci-data\files\eegmmidb\1.0.0\S001\S001R14.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
EEG channel type selected for re-referencing
Adding average EEG reference projection.
1 projection items deactivated
Average reference projection was added, but has not been applied yet. Use the apply_proj method to apply it.


0,1
Measurement date,"August 12, 2009 16:15:00 GMT"
Experimenter,Unknown
Participant,X

0,1
Digitized points,67 points
Good channels,64 EEG
Bad channels,
EOG channels,Not available
ECG channels,Not available

0,1
Sampling frequency,160.00 Hz
Highpass,0.00 Hz
Lowpass,80.00 Hz
Projections,Average EEG reference : off
Filenames,S001R06.edf<br>S001R10.edf<br>S001R14.edf
Duration,00:06:15 (HH:MM:SS)


#### Preprocess source data

In [3]:
# print channel names
print(source.ch_names)

['FC5', 'FC3', 'FC1', 'FCz', 'FC2', 'FC4', 'FC6', 'C5', 'C3', 'C1', 'Cz', 'C2', 'C4', 'C6', 'CP5', 'CP3', 'CP1', 'CPz', 'CP2', 'CP4', 'CP6', 'Fp1', 'Fpz', 'Fp2', 'AF7', 'AF3', 'AFz', 'AF4', 'AF8', 'F7', 'F5', 'F3', 'F1', 'Fz', 'F2', 'F4', 'F6', 'F8', 'FT7', 'FT8', 'T7', 'T8', 'T9', 'T10', 'TP7', 'TP8', 'P7', 'P5', 'P3', 'P1', 'Pz', 'P2', 'P4', 'P6', 'P8', 'PO7', 'PO3', 'POz', 'PO4', 'PO8', 'O1', 'Oz', 'O2', 'Iz']


In [4]:
from scripts.raw_preprocessing import Preprocessing

source_preprocessing = Preprocessing(source)
source_preprocessing.preprocess_raw()
source_epochs = source_preprocessing.segment_into_epochs(['rest', 'hands', 'feet'], channels=["C3", "C4", "P3", "P4", "T7", "T8", "P7", "P8"])

Filtering raw data in 3 contiguous segments
Setting up band-pass filter from 8 - 35 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 8.00
- Lower transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 7.00 Hz)
- Upper passband edge: 35.00 Hz
- Upper transition bandwidth: 8.75 Hz (-6 dB cutoff frequency: 39.38 Hz)
- Filter length: 265 samples (1.656 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.0s


EEG channel type selected for re-referencing
Applying average reference.
Applying a custom ('EEG',) reference.
Removing existing average EEG reference projection.
Fitting ICA to data using 64 channels (please be patient, this may take a while)
Selecting by number: 15 components
Computing Extended Infomax ICA
Fitting ICA took 22.1s.
NOTE: pick_channels() is a legacy function. New code should use inst.pick(...).


  ica_labels = label_components(self.processed_raw, ica, method='iclabel')


Applying ICA to Raw instance
    Transforming to ICA space (15 components)
    Zeroing out 8 ICA components
    Projecting back using 64 PCA components
Used Annotations descriptions: ['feet', 'hands', 'rest']
Ignoring annotation durations and creating fixed-duration epochs around annotation onsets.
Not setting metadata
90 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 90 events and 801 original time points ...
3 bad epochs dropped


In [62]:
# epochs preprocessing pipeline
from scripts.epochs_preprocessing import EventsEncoder, EventsEqualizer, Cropper, EpochsSegmenter
from sklearn.pipeline import Pipeline

pipeline = Pipeline([
    ('events_encoder', EventsEncoder(counter_class="rest")),
    ('events_equalizer', EventsEqualizer()),
    ('cropper', Cropper()),
    ('segmenter', EpochsSegmenter())
])

source_epochs_p = pipeline.fit_transform(source_epochs.copy())

Epochs after event encoding [not feet vs feet] <Epochs |  87 events (all good), -1 – 4 s, baseline off, ~4.3 MB, data loaded,
 'not feet': 63
 'feet': 24>
Cropped Epochs to:  0.5 s --  3.5 s


In [63]:
print(source_epochs.events[:,-1])
print(source_epochs_p.events[:,-1])

[1 3 2 3 2 3 1 3 2 3 1 3 1 3 2 3 2 3 1 3 1 3 2 3 2 3 1 3 1 2 3 1 3 1 3 2 3
 1 3 2 3 2 3 1 3 1 3 2 3 2 3 1 3 1 3 2 3 1 1 3 2 3 1 3 2 3 2 3 1 3 2 3 1 3
 2 3 1 3 1 3 2 3 1 3 2 3 1]
[1 0 0 0 0 0 1 0 0 0 1 0 1 0 0 0 0 0 1 0 1 0 0 0 0 0 1 0 1 0 0 1 0 1 0 0 0
 1 0 0 0 0 0 1 0 1 0 0 0 0 0 1 0 1 0 0 0 1 1 0 0 0 1 0 0 0 0 0 1 0 0 0 1 0
 0 0 1 0 1 0 0 0 1 0 0 0 1]


#### Simulate real time data

##### Prepare calibration data

In [None]:
# calibration data
target_fnames = eegbci.load_data(2, [6])
target = concatenate_raws([read_raw_edf(f, preload=True) for f in target_fnames])
eegbci.standardize(target)
target.set_montage(montage)
target.annotations.rename(dict(T0="rest", T1="hands", T2="feet"))
target.set_eeg_reference(projection=True)

# preprocessing
target_preprocessing = Preprocessing(target)
target_preprocessing.preprocess_raw()
target_epochs = target_preprocessing.segment_into_epochs(['hands', 'feet'], channels=["C3", "C4", "P3", "P4", "T7", "T8", "P7", "P8"])

##### Label alignment & TS mapping

In [None]:
from scripts.label_alignment import LabelAlignment
import numpy as np

source_events = source_epochs.events[:, -1]
la = LabelAlignment(target_epochs, concat=True)
source_aligned, source_events = la.fit_transform(source_epochs.get_data(), source_events)

# append target data to source data
# source_aligned = np.concatenate([source_aligned, target_epochs.get_data()])
# source_events = np.concatenate([source_events, target_epochs.events[:, -1]])

In [None]:
# from scripts.ts_feature_extraction import tangent_space_mapping

# source_ts = tangent_space_mapping(source_aligned)

##### Train model

In [None]:
# rt data
rt_fnames = eegbci.load_data(2, [10, 14])
rt = concatenate_raws([read_raw_edf(f, preload=True) for f in rt_fnames])
eegbci.standardize(rt)
rt.set_montage(montage)
rt.annotations.rename(dict(T0="rest", T1="hands", T2="feet"))
rt.set_eeg_reference(projection=True)

# preprocessing
rt_preprocessing = Preprocessing(rt)
rt_preprocessing.preprocess_raw()
rt_epochs = rt_preprocessing.segment_into_epochs(['hands', 'feet'], channels=["C3", "C4", "P3", "P4", "T7", "T8", "P7", "P8"])

In [None]:
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.metrics import classification_report

from scripts.ts_feature_extraction import TangentSpaceMapping

X = source_aligned
y = source_events

from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, y_train = X, y
X_test = rt_epochs.get_data()
y_test = rt_epochs.events[:, -1]

# Create and fit the classifier
lda =  LDA()
clf = make_pipeline(TangentSpaceMapping(), lda)
clf.fit(X_train, y_train)

# Predict on the test set
y_pred = clf.predict(X_test)

# Print classification report
print(classification_report(y_test, y_pred))

from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Compute and plot the confusion matrix
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.show()