In [None]:
%reset

In [None]:
from os import listdir
from os.path import exists
from importlib import reload
import numpy as np
import pandas as pd
import pyxdf
import mne
from utils import *
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import train_test_split, cross_val_score, LeaveOneOut, cross_val_predict
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import time
import datetime
from datetime import datetime, timezone
import pickle
import plotly.express as px
from scipy import stats
from scipy.stats import t

print('Imports done...')

In [None]:
def load_epochs(path, cue_based):
    file_names = [f for f in listdir(path) if '_bad_annotations_raw.fif' in f]

    # Load file
    raw = load_raw_file(dirpath=path, file=file_names[0])

    events_from_annot, event_dict = mne.events_from_annotations(raw)

    if cue_based:
        # Select subset of event_dict with following markers:
        markers_of_interest = ['LTR-s', 'LTR-l','RTL-s', 'RTL-l', 'TTB-s', 'TTB-l', 'BTT-s', 'BTT-l']
        event_dict_of_interest = get_subset_of_dict(event_dict, markers_of_interest)

        epochs = mne.Epochs(raw, events_from_annot, event_id=event_dict_of_interest, tmin=0.0, tmax=7.0, baseline=None, reject_by_annotation=True, preload=True, picks=['eeg'], reject=dict(eeg=200e-6 ))
    else:
        # Looking at indication release (movement onset):
        trial_type = trial_type_markers
        period = ['i'] # 'i', 'c' .. indication, cue
        position = ['l', 'r', 't', 'b', 'c']
        state = ['1'] # 0,1 .. touch/release
        markers_of_interest = generate_markers_of_interest(trial_type, period, position, state)

        event_dict_of_interest = get_subset_of_dict(event_dict, markers_of_interest)

        epochs = mne.Epochs(raw, events_from_annot, event_id=event_dict_of_interest, tmin=-2.0, tmax=3.5, baseline=None, reject_by_annotation=True, preload=True, picks=['eeg'], reject=dict(eeg=200e-6 ))

    # Downsample to 10 Hz:
    epochs = epochs.copy().resample(10)

    return epochs, markers_of_interest

In [None]:
def get_peak_sample(epoch_type, windowed):
    # Get timepoint where the accuracy is max:
    idx_peak = df[(df['Type'] == epoch_type) & (df['5-point'] == windowed) & (df['Subject'] == 'Mean')]['Accuracy'].idxmax()
    tp_peak = df['Timepoint'][idx_peak]

    # Get the peak sample:
    if 'sfreq: 10.0' in df['epoch_info'][idx_peak]:
        fs = 10
    elif 'sfreq: 200.0' in df['epoch_info'][idx_peak]:
        fs = 200
    return int(round((tp_peak - df['t_min'][idx_peak]) * fs, 0))


In [None]:
#data_path = 'C:/Users/tumfart/Code/github/master-thesis/data/'
data_path = 'C:/Users/peter/Google Drive/measurements/eeg/'
subjects = ['A01', 'A02', 'A03', 'A04', 'A05', 'A06', 'A07' , 'A08', 'A09', 'A10']
# = 'A03'
paradigm = 'paradigm' # 'eye', 'paradigm'
plot = False
mne.set_log_level('WARNING')
trial_type_markers = ['LTR-s', 'LTR-l','RTL-s', 'RTL-l', 'TTB-s', 'TTB-l', 'BTT-s', 'BTT-l']
# Create path list for each subject:
paths = [str(data_path + subject + '/' + paradigm) for subject in subjects]

In [None]:
# Load classification results:
df = pd.read_csv('classification_df.csv', index_col=0)

In [None]:
# Set conditions:
cue_based = [True, False]
epoch_types = ['cue aligned 4 class direction (all)', 'cue aligned 4 class direction (short)', 'cue aligned 4 class direction (long)', 'movement onset aligned 4 class direction (all)', 'movement onset aligned 4 class direction (short)', 'movement onset aligned 4 class direction (long)']
windowed = [False, True]

In [None]:
mne.set_log_level('INFO')
start = time.time()
overall_conf = []
# Iterate over each epoch type and each window type:
for epoch_type in epoch_types:
    for window in windowed:
        peak_sample = get_peak_sample(epoch_type=epoch_type, windowed=window)

        # Calculate confusion matrix for each subject for the peak sample:
        start = time.time()
        conf_mat = []
        for subject, path in zip(subjects, paths):
            if 'cue' in epoch_type:
                epochs, markers_of_interest = load_epochs(path, cue_based=True)
            else:
                epochs, markers_of_interest = load_epochs(path, cue_based=False)
            print(f'Classifying subject {subject}')

            # Get condition:
            if 'all' in epoch_type:
                ups = [m for m in markers_of_interest if 'BT' in m]
                downs = [m for m in markers_of_interest if 'TT' in m]
                lefts = [m for m in markers_of_interest if 'RT' in m]
                rights = [m for m in markers_of_interest if 'LT' in m]
            elif 'long' in epoch_type:
                ups = [m for m in markers_of_interest if 'BTT-l' in m]
                downs = [m for m in markers_of_interest if 'TTB-l' in m]
                lefts = [m for m in markers_of_interest if 'RTL-l' in m]
                rights = [m for m in markers_of_interest if 'LTR-l' in m]
            elif 'short' in epoch_type:
                ups = [m for m in markers_of_interest if 'BTT-s' in m]
                downs = [m for m in markers_of_interest if 'TTB-s' in m]
                lefts = [m for m in markers_of_interest if 'RTL-s' in m]
                rights = [m for m in markers_of_interest if 'LTR-s' in m]

            epochs_up = epochs[ups]
            epochs_down = epochs[downs]
            epochs_right = epochs[rights]
            epochs_left = epochs[lefts]

            # Create data matrix X (epochs x channels x timepoints) and label vector y (epochs x 1):
            X = np.concatenate([epochs_up.get_data(), epochs_down.get_data(), epochs_right.get_data(), epochs_left.get_data()])
            y = np.concatenate([np.zeros(len(epochs_up)), np.ones(len(epochs_down)), 2*np.ones(len(epochs_right)), 3*np.ones(len(epochs_left))])

            clf = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto')
            n_len = X.shape[2]

            # Calculate only for peak_smpl:
            x = X[:,:,peak_sample]

            y_pred = cross_val_predict(clf, x, y, cv=LeaveOneOut(), n_jobs=-1)
            conf_mat.append(confusion_matrix(y, y_pred))

        overall_conf.append(conf_mat)



mne.set_log_level('WARNING')
print(f'Finished epoching, took me {round(time.time() - start)} seconds...')

In [None]:
# Calculate average conf_mat:
cm_lst = []
cm_avg = np.zeros((overall_conf[0][0].shape))
for cm_subject in overall_conf:
    for cm in cm_subject:
        cm_avg += cm
    cm_lst.append(cm_avg/9)

In [None]:
for cm in cm_lst:
    ConfusionMatrixDisplay(confusion_matrix=cm).plot()

In [None]:
6.1e+02

In [None]:
# Get the epoch type
epoch_type = 'cue aligned 4 class direction (all)'

# Get timepoint where the accuracy is max:
idx_peak = df[(df['Type'] == epoch_type) & (df['5-point'] == False) & (df['Subject'] == 'Mean')]['Accuracy'].idxmax()
tp_peak = df['Timepoint'][idx_peak]

# Get the peak sample:
if 'sfreq: 10.0' in df['epoch_info'][idx_peak]:
    fs = 10
elif 'sfreq: 200.0' in df['epoch_info'][idx_peak]:
    fs = 200
peak_smpl = int(round((tp_peak - df['t_min'][idx_peak]) * fs, 0))


# Calculate confusion matrix for each subject for the peak sample:
start = time.time()
conf_mat = []
for subject, path in zip(subjects, paths):
    print(f'Classifying subject {subject}')
    file_names = [f for f in listdir(path) if 'epo.fif' in f]

    # Load file
    file_name = file_names[0]
    file = path + '/' + file_name
    epochs = mne.read_epochs(file, preload=True)

    # Get condition:
    ups = [m for m in markers_of_interest if 'BT' in m]
    downs = [m for m in markers_of_interest if 'TT' in m]
    lefts = [m for m in markers_of_interest if 'RT' in m]
    rights = [m for m in markers_of_interest if 'LT' in m]
    epochs_up = epochs[ups]
    epochs_down = epochs[downs]
    epochs_right = epochs[rights]
    epochs_left = epochs[lefts]

    # Create data matrix X (epochs x channels x timepoints) and label vector y (epochs x 1):
    X = np.concatenate([epochs_up.get_data(), epochs_down.get_data(), epochs_right.get_data(), epochs_left.get_data()])
    y = np.concatenate([np.zeros(len(epochs_up)), np.ones(len(epochs_down)), 2*np.ones(len(epochs_right)), 3*np.ones(len(epochs_left))])

    clf = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto')
    n_len = X.shape[2]

    # Calculate only for peak_smpl:
    x = X[:,:,peak_smpl]

    y_pred = cross_val_predict(clf, x, y, cv=LeaveOneOut(), n_jobs=-1)
    conf_mat.append(confusion_matrix(y, y_pred))

print(f'Finished classification, took me {round(time.time() - start)} seconds...')


In [None]:
# Get grand averages for cue aligned vs. movement onset aligned

In [None]:
# Cue aligned:
mne.set_log_level('INFO')
# Iterate over each subject and extract the streams
start = time.time()

avg_long = []
avg_short = []
for subject, path in zip(subjects, paths):
    print(f'Reading last fif file for subject {subject}', end=' ')
    file_names = [f for f in listdir(path) if '_bad_annotations_raw.fif' in f]

    # Load file
    file_name = file_names[0]
    file = path + '/' + file_name
    raw = mne.io.read_raw(file, preload=True)

    events_from_annot, event_dict = mne.events_from_annotations(raw)


    # Select subset of event_dict with following markers:
    epoch_type = 'movement onset-aligned 4 class direction'
    markers_of_interest = ['LTR-s', 'LTR-l','RTL-s', 'RTL-l', 'TTB-s', 'TTB-l', 'BTT-s', 'BTT-l']

    # Looking at cue touch:
    trial_type = trial_type_markers
    period = ['i'] # 'i', 'c' .. indication, cue
    position = ['l', 'r', 't', 'b', 'c']
    state = ['1'] # 0,1 .. touch/release
    # markers_of_interest = generate_markers_of_interest(trial_type, period, position, state)

    event_dict_of_interest = get_subset_of_dict(event_dict, markers_of_interest)

    # TODO select event ID's of interest, hand over dict for event_id to make it easier to extract them:
    epochs = mne.Epochs(raw, events_from_annot, event_id=event_dict_of_interest, tmin=0.0, tmax=7.0, baseline=None, reject_by_annotation=True, preload=True, picks=['eeg'], reject=dict(eeg=200e-6 ))


    # Get condition:
    longs = [m for m in markers_of_interest if '-l' in m]
    shorts = [m for m in markers_of_interest if '-s' in m]
    epochs_long = epochs[longs]
    epochs_short = epochs[shorts]
    # Get long and short epochs data
    epochs_long = epochs_long.get_data().mean(axis=0)
    epochs_short = epochs_short.get_data().mean(axis=0)
    avg_long.append(epochs_long)
    avg_short.append(epochs_short)

    print()

mne.set_log_level('WARNING')
print(f'Finished epoching, took me {round(time.time() - start)} seconds...')

In [None]:
grand_avg_long = np.zeros(avg_long[0].shape)
grand_avg_short = np.zeros(avg_short[0].shape)
n_tp = avg_short[0].shape[1]
for long, short in zip(avg_long, avg_short):
    grand_avg_long += long
    grand_avg_short += short

grand_avg_long = grand_avg_long / len(subjects)
grand_avg_short = grand_avg_short / len(subjects)

In [None]:
# Bootstrapping for confidence interval:
n_chan, n_ts = avg_long[0].shape

uppers_long = np.zeros((n_chan, n_ts))
lowers_long = np.zeros((n_chan, n_ts))

uppers_short = np.zeros((n_chan, n_ts))
lowers_short = np.zeros((n_chan, n_ts))

confidence = .95
n_sample = 50
for chan in range(n_chan):
    print(chan, end='\r')
    for ts in range(n_ts):
        vals_short = []
        vals_long = []
        for subj in range(len(subjects)):
            vals_short.append(avg_short[subj][chan,ts])
            vals_long.append(avg_long[subj][chan,ts])

        m = np.array(vals_short).mean()
        s = np.array(vals_short).std()
        dof = len(vals_short)-1

        t_crit = np.abs(t.ppf((1-confidence)/2,dof))

        lowers_short[chan, ts], uppers_short[chan, ts] = (m-s*t_crit/np.sqrt(len(vals_short)), m+s*t_crit/np.sqrt(len(vals_short)))

        m = np.array(vals_long).mean()
        s = np.array(vals_long).std()
        dof = len(vals_long)-1

        t_crit = np.abs(t.ppf((1-confidence)/2,dof))

        lowers_long[chan, ts], uppers_long[chan, ts] = (m-s*t_crit/np.sqrt(len(vals_long)), m+s*t_crit/np.sqrt(len(vals_long)))
        # means_short = [np.random.choice(vals_short,size=len(vals_short),replace=True).mean() for i in range(n_sample)]
        # lowers_short[chan, ts], uppers_short[chan, ts] = np.percentile(means_short,[100*(1-confidence)/2,100*(1-(1-confidence)/2)])
        #
        #
        # vals_long.append(avg_long[subj][chan,ts])
        # means_long = [np.random.choice(vals_long,size=len(vals_long),replace=True).mean() for i in range(n_sample)]
        # lowers_long[chan, ts], uppers_long[chan, ts] = np.percentile(means_long,[100*(1-confidence)/2,100*(1-(1-confidence)/2)])

In [None]:
ch_name = 'Cz'
idx = [i for i, name in enumerate(epochs.ch_names) if name == ch_name][0]

x = np.arange(0,7+1/200,1/200)
plt.plot(x,grand_avg_long[idx,:]*1e6)
plt.fill_between(x, lowers_long[idx,:]*1e6, uppers_long[idx,:]*1e6, alpha=0.1)
plt.plot(x,grand_avg_short[idx,:]*1e6)
plt.fill_between(x, lowers_short[idx,:]*1e6, uppers_short[idx,:]*1e6, alpha=0.1)
plt.plot([2, 2], [-2.5, 2.5], color='black')
plt.legend(['Long', '95%-CI', 'Short', '95%-CI', 'Cue presentation'])
plt.xlabel('Time (s)')
plt.ylabel('Voltage (uV)')
plt.title(f'Distance (long vs. short) movement onset aligned on channel {ch_name}')

In [None]:
# Cue aligned:
mne.set_log_level('INFO')
# Iterate over each subject and extract the streams
start = time.time()

avg_long = []
avg_short = []
for subject, path in zip(subjects, paths):
    print(f'Reading last fif file for subject {subject}', end=' ')
    file_names = [f for f in listdir(path) if '_bad_annotations_raw.fif' in f]

    # Load file
    file_name = file_names[0]
    file = path + '/' + file_name
    raw = mne.io.read_raw(file, preload=True)

    events_from_annot, event_dict = mne.events_from_annotations(raw)


    # Select subset of event_dict with following markers:
    epoch_type = 'movement onset-aligned 4 class direction'
    markers_of_interest = ['LTR-s', 'LTR-l','RTL-s', 'RTL-l', 'TTB-s', 'TTB-l', 'BTT-s', 'BTT-l']

    # Looking at cue touch:
    trial_type = trial_type_markers
    period = ['i'] # 'i', 'c' .. indication, cue
    position = ['l', 'r', 't', 'b', 'c']
    state = ['1'] # 0,1 .. touch/release
    markers_of_interest = generate_markers_of_interest(trial_type, period, position, state)

    event_dict_of_interest = get_subset_of_dict(event_dict, markers_of_interest)

    # TODO select event ID's of interest, hand over dict for event_id to make it easier to extract them:
    epochs = mne.Epochs(raw, events_from_annot, event_id=event_dict_of_interest, tmin=-2.0, tmax=3.5, baseline=None, reject_by_annotation=True, preload=True, picks=['eeg'], reject=dict(eeg=200e-6 ))


    # Get condition:
    longs = [m for m in markers_of_interest if '-l' in m]
    shorts = [m for m in markers_of_interest if '-s' in m]
    epochs_long = epochs[longs]
    epochs_short = epochs[shorts]
    # Get long and short epochs data
    epochs_long = epochs_long.get_data().mean(axis=0)
    epochs_short = epochs_short.get_data().mean(axis=0)
    avg_long.append(epochs_long)
    avg_short.append(epochs_short)

    print()

mne.set_log_level('WARNING')
print(f'Finished epoching, took me {round(time.time() - start)} seconds...')

In [None]:
grand_avg_long = np.zeros(avg_long[0].shape)
grand_avg_short = np.zeros(avg_short[0].shape)
n_tp = avg_short[0].shape[1]
for long, short in zip(avg_long, avg_short):
    grand_avg_long += long
    grand_avg_short += short

grand_avg_long = grand_avg_long / len(subjects)
grand_avg_short = grand_avg_short / len(subjects)

In [None]:
# Bootstrapping for confidence interval:
n_chan, n_ts = avg_long[0].shape

uppers_long = np.zeros((n_chan, n_ts))
lowers_long = np.zeros((n_chan, n_ts))

uppers_short = np.zeros((n_chan, n_ts))
lowers_short = np.zeros((n_chan, n_ts))

confidence = .95
n_sample = 50
for chan in range(n_chan):
    print(chan, end='\r')
    for ts in range(n_ts):
        vals_short = []
        vals_long = []
        for subj in range(len(subjects)):
            vals_short.append(avg_short[subj][chan,ts])
            vals_long.append(avg_long[subj][chan,ts])

        m = np.array(vals_short).mean()
        s = np.array(vals_short).std()
        dof = len(vals_short)-1

        t_crit = np.abs(t.ppf((1-confidence)/2,dof))

        lowers_short[chan, ts], uppers_short[chan, ts] = (m-s*t_crit/np.sqrt(len(vals_short)), m+s*t_crit/np.sqrt(len(vals_short)))

        m = np.array(vals_long).mean()
        s = np.array(vals_long).std()
        dof = len(vals_long)-1

        t_crit = np.abs(t.ppf((1-confidence)/2,dof))

        lowers_long[chan, ts], uppers_long[chan, ts] = (m-s*t_crit/np.sqrt(len(vals_long)), m+s*t_crit/np.sqrt(len(vals_long)))
        # means_short = [np.random.choice(vals_short,size=len(vals_short),replace=True).mean() for i in range(n_sample)]
        # lowers_short[chan, ts], uppers_short[chan, ts] = np.percentile(means_short,[100*(1-confidence)/2,100*(1-(1-confidence)/2)])
        #
        #
        # vals_long.append(avg_long[subj][chan,ts])
        # means_long = [np.random.choice(vals_long,size=len(vals_long),replace=True).mean() for i in range(n_sample)]
        # lowers_long[chan, ts], uppers_long[chan, ts] = np.percentile(means_long,[100*(1-confidence)/2,100*(1-(1-confidence)/2)])

In [None]:
ch_name = 'C1'
idx = [i for i, name in enumerate(epochs.ch_names) if name == ch_name][0]

x = np.arange(-2.0,3.5+1/200,1/200)
plt.plot(x,grand_avg_long[idx,:]*1e6)
plt.fill_between(x, lowers_long[idx,:]*1e6, uppers_long[idx,:]*1e6, alpha=0.1)
plt.plot(x,grand_avg_short[idx,:]*1e6)
plt.fill_between(x, lowers_short[idx,:]*1e6, uppers_short[idx,:]*1e6, alpha=0.1)
plt.plot([0, 0], [-2.5, 2.5], color='black')
plt.legend(['Long', '95%-CI', 'Short', '95%-CI', 'Movement onset'])
plt.xlabel('Time (s)')
plt.ylabel('Voltage (uV)')
plt.title(f'Distance (long vs. short) movement onset aligned on channel {ch_name}')