# Helper Functions

In [None]:
def get_classes(y, evts_dict):
    '''
    converts the y values to class-values for the cases:
    a) all faces = 1, all cars = 0
    b) intact faces = 1, rest 0
    c) intact car = 0, intact face = 1, scrambled cars = 2, scrambled faces = 3
    '''
    y_faces_cars = np.zeros(shape=(len(y)))
    y_intact_faces_rest = np.zeros(shape=(len(y)))
    y_four_classes = np.zeros(shape=(len(y)))

    for i in range(len(y)):
        for key, value in evts_dict.items():
            if value == y[i]:
                event_id = int(key.split(':')[1])
                # intact face
                if event_id <= 40:
                    y_faces_cars[i] = 1
                    y_four_classes[i] = 1
                    y_intact_faces_rest[i] = 1
                #intact car
                elif event_id <= 80:
                    y_faces_cars[i] = 0
                    y_four_classes[i] = 0
                # scrambled face
                elif event_id >= 101 and event_id <= 140:
                    y_faces_cars[i] = 1
                    y_four_classes[i] = 3
                # scrambled car
                elif event_id >= 141 and event_id <= 180:
                    y_faces_cars[i] = 0
                    y_four_classes[i] = 2
    
    return y_faces_cars, y_intact_faces_rest, y_four_classes

In [None]:
def calculate_baseline_mean(time_points, array):
    '''
    >>> calculate_baseline_mean([-3. -2, -1, 0, 1, 2, 3], [0,0,0,0,0,0,0])
    0.0
    
    >>> calculate_baseline_mean([-3. -2, -1, 0, 1, 2, 3], [1,3,2,1,1,4,5])
    200.0
    '''
    baseline_mean = 0
    i = 0
    while time_points[i] < 0:
        baseline_mean += array[i]
        i += 1
    baseline_mean = (baseline_mean/i) * 100
    return baseline_mean

In [None]:
def create_baseline_control_group(time_points, array):
    '''
    extract the control group as all points before the stimulus onset (time_points[i]<0)
    '''
    array = np.asarray(array)
    basline_control = []
    index = 0
    while time_points[index] < 0:
        basline_control.extend(array[:,index])
        index += 1
    return basline_control, index

In [None]:
def permutation_test(treatment, control, iterations):
    '''
    A simple implementation of a permutation test
    '''
    number_of_treatments = len(treatment)
    number_of_control = len(control)
    number_of_all_values = number_of_treatments + number_of_control
    mean_tretment = np.average(treatment)
    mean_control = np.average(control)
    initial_statistic = mean_tretment - mean_control
    
    all_values = []
    all_values.extend(control)
    all_values.extend(treatment)
    
    statistic_values = []
    counter = 1
    for i in range(iterations):
        # get a new random permutation of the groups
        permutation = np.random.permutation(number_of_all_values)
        # apply the permutation
        new_control = [all_values[i] for i in permutation[0:number_of_control]]
        new_treatment = [all_values[i] for i in permutation[number_of_control:]]
        # calculate the statistic
        new_statistic = np.mean(new_treatment) - np.mean(new_control)
        statistic_values.append(new_statistic)
        # a counter for the calculation of the p-value
        if new_statistic >= initial_statistic:
            counter += 1
    
    # calculate the p-value
    p_value = counter / (iterations+1)
    return (statistic_values,initial_statistic, p_value)

In [None]:
def p_value_over_time(start_index, control_group, array, perm_test_iter):
    '''
    calculates p-values for each timepoint of the array with a permutation test.
    Therefore a permutation test is used with the given control group 
    and one time group from each of the subjects as treatment.
    '''
    array = np.asarray(array)
    j = start_index
    p_value_list = []
    while j < len(array[0]):
        treatment = array[:,j]
        value_list, initial_statistic, p = permutation_test(treatment,control_group, perm_test_iter)
        p_value_list.append(p)
        print((j-start_index)/(len(mean_over_splits[0]-start_index))*100)
        j += 1
    return p_value_list

# -------------------------------------------------------------------------------------

# Imports and important variables

In [None]:
import osfclient
import mne
import mne_bids
import numpy as np
import ccs_eeg_utils
import ccs_eeg_semesterproject
from mne_bids import (BIDSPath,read_raw_bids)
from matplotlib import pyplot as plt
from importlib import reload 
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression

from helper_functions import *

%matplotlib qt
path = "../local/bidsN170"
temp_path = "/ses-N170/eeg/"

# INFO, WARNING
mne.set_log_level(verbose='INFO')

# 1 Decoding for all subjects (classify over time)

In [None]:
header = ['Subject_ID', 'Amplitude', 'Stimulus', 'Condition']
peak_data = []
score_list_if_rest = []
score_list_faces_cars = []
for s in range(1,41):
    if len(str(s)) > 1:
        sub = '0' + str(s)
    else:
        sub = '00' + str(s)
    # use the data without the channel positions to avoid a bug
    read_path = path + "/sub-" + sub + temp_path + "sub-" + sub +"_cleaned_no_channel_positions.fif"
    raw = mne.io.read_raw_fif(read_path)
    
    epochs, evts_dict = load_epochs(raw)
    epochs.drop_bad()
    
    # data of the epochs
    X = epochs.get_data() 
    # the classes (to predict/to calculate the accuracy)
    y = epochs.events[:, 2]
    y_faces_cars, y_intact_faces_rest, y_four_classes = get_classes(y, evts_dict)
    
    # defines the pipeline
    pipe = make_pipeline(mne.decoding.Scaler(epochs.info),
                    mne.decoding.Vectorizer(),
                    LogisticRegression(solver='lbfgs'))
    
    timeDecode = mne.decoding.SlidingEstimator(pipe)

    # the score for the intact faces/rest classes
    # use a cross-validation with 10 splits to avoid overfitting
    scores_if_rest = mne.decoding.cross_val_multiscore(timeDecode, X, y_intact_faces_rest, cv=10, n_jobs=4)
    score_list_if_rest.append(scores_if_rest)
    
    # the score for the all faces/all cars classes
    # use a cross-validation with 10 splits to avoid overfitting
    scores_face_car = mne.decoding.cross_val_multiscore(timeDecode, X, y_faces_cars, cv=10, n_jobs=4)
    score_list_faces_cars.append(scores_face_car)
    
print('DONE')

## a) if / rest

### save the data

In [None]:
#save_array('decoding_data/time_score_intact_faces_rest_logreg.npy', score_list_if_rest)

### load the data

In [None]:
score_list_if_rest = load_array_from_memory('decoding_data/time_score_intact_faces_rest_logreg.npy')

### plot the data

In [None]:
# get the timepoints
number_of_datapoints = len(score_list_if_rest[0][0])
x = get_timepoints(epoch_start=-0.1, epoch_end=1, number_of_datapoints=number_of_datapoints)


mean_over_splits = [np.mean(score_list_if_rest[i], axis=0) for i in range(0,40)]
mean_over_splits_and_subjects = np.mean(mean_over_splits, axis=0)

In [None]:
baseline_mean = calculate_baseline_mean(x, mean_over_splits_and_subjects)

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
title = ''
ax2.set_title('mean accuracy over splits and subjects for (intact faces/rest)')
ax2.set_xlabel('time[ms]')
ax2.set_ylabel('accuracy[%]')
plt.axvline(x = 0, color = 'black') 
ax2.plot(x, mean_over_splits_and_subjects*100, label='mean accuracy')
ax2.plot(x, np.ones(shape=(number_of_datapoints))*baseline_mean, color='black')
ax2.legend(loc=1)
#plt.savefig('./analysis_images/' + title)
#plt.close(figure)
plt.show()

### cluster permutation test

In [None]:
basline_control, i = create_baseline_control_group(x, mean_over_splits)

In [None]:
p_value_over_time(i, basline_control, mean_over_splits, 2000)

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
title = ''
ax2.set_title('p-value  for different time points')
ax2.set_xlabel('time[ms]')
ax2.set_ylabel('p-value')
ax2.plot(x[i:], p_value_list)
#ax2.legend(loc=1)
plt.yscale('log')
#plt.savefig('./analysis_images/' + title)
#plt.close(figure)
plt.show()

In [None]:
#save_array('decoding_data/p_values_if_rest.npy', p_value_list)

In [None]:
p_value_list = load_array_from_memory('decoding_data/p_values_if_rest.npy')

## b) all faces / all cars

### save the data

In [None]:
#save_array('decoding_data/time_score_all_faces_all_cars_logreg.npy', score_list_faces_cars)

### load the data

In [None]:
score_list_faces_cars = load_array_from_memory('decoding_data/time_score_all_faces_all_cars_logreg.npy')

### plot the data

In [None]:
# get the timepoints
number_of_datapoints = len(score_list_faces_cars[0][0])
x = get_timepoints(epoch_start=-0.1, epoch_end=1, number_of_datapoints=number_of_datapoints)

mean_over_splits = [np.mean(score_list_faces_cars[i], axis=0) for i in range(0,40)]
mean_over_splits_and_subjects = np.mean(mean_over_splits, axis=0)

In [None]:
baseline_mean = calculate_baseline_mean(x, mean_over_splits_and_subjects)

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
title = ''
ax2.set_title('mean accuracy over splits and subjects for (all faces/all cars)')
ax2.set_xlabel('time[ms]')
ax2.set_ylabel('accuracy[%]')
plt.axvline(x = 0, color = 'black') 
ax2.plot(x, mean_over_splits_and_subjects*100, label='mean accuracy')
ax2.plot(x, np.ones(shape=(number_of_datapoints))*baseline_mean, color='black')
ax2.legend(loc=1)
#plt.savefig('./analysis_images/' + title)
#plt.close(figure)
plt.show()

### cluster permutation test

In [None]:
basline_control, i = create_baseline_control_group(x, mean_over_splits)

In [None]:
p_value_list = p_value_over_time(i, basline_control, mean_over_splits, 100)

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
title = ''
ax2.set_title('p-value  for different time points')
ax2.set_xlabel('time[ms]')
ax2.set_ylabel('p-value')
ax2.plot(x[i:], p_value_list)
#ax2.legend(loc=1)
plt.yscale('log')
#plt.savefig('./analysis_images/' + title)
#plt.close(figure)
plt.show()

In [None]:
#save_array('decoding_data/p_values_all_faces_all_cars.npy', p_value_list)

In [None]:
p_value_list = load_array_from_memory('decoding_data/p_values_all_faces_all_cars.npy')

# -------------------------------------------------------------------------------------

# 2 Decoding for all subjects

In [None]:
channel = 'P8'
header = ['Subject_ID', 'Amplitude', 'Stimulus', 'Condition']
peak_data = []
eeg_crit = 100e-6
score_list_if_rest = []
score_list_faces_cars = []
for s in range(1,41):
    print('Subject '+ str(s) + '############################################')
    if len(str(s)) > 1:
        sub = '0' + str(s)
    else:
        sub = '00' + str(s)
    # load the data without channel positions
    read_path = path + "/sub-" + sub + temp_path + "sub-" + sub +"_cleaned_no_channel_positions.fif"
    raw = mne.io.read_raw_fif(read_path)
    
    #epochs = peak_to_peak(raw, eeg_crit)
    epochs, evts_dict = load_epochs(raw)
    epochs.drop_bad()
    
    X = epochs.get_data()  # signals: n_epochs, n_meg_channels, n_times
    y = epochs.events[:, 2]  # stimuli values
    
    y_faces_cars, y_intact_faces_rest, y_four_classes = get_classes(y, evts_dict)
    
    # defines the pipeline
    clf = make_pipeline(mne.decoding.Scaler(epochs.info),
                    mne.decoding.Vectorizer(),
                    LogisticRegression(solver='lbfgs'))

    # Adding channel locations seem to destroy this function...
    scores_if_rest = mne.decoding.cross_val_multiscore(clf, X, y_intact_faces_rest, cv=10, n_jobs=4)

    # Mean scores across cross-validation splits
    score = np.mean(scores_if_rest, axis=0)
    score_list_if_rest.append(score*100)
    
    
    # Adding channel locations seem to destroy this function...
    scores_face_car = mne.decoding.cross_val_multiscore(clf, X, y_faces_cars, cv=10, n_jobs=4)

    # Mean scores across cross-validation splits
    score = np.mean(scores_face_car, axis=0)
    score_list_faces_cars.append(score*100)
print('DONE')

## a) evaluation intact faces vs rest

### plot the data

In [None]:
sorted_score_list = np.sort(score_list_if_rest)
subject_ids = [i+1 for i in range(40)]
subject_ids = [x for _, x in sorted(zip(score_list_if_rest, subject_ids), key=lambda pair: pair[0])]

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
ax2.plot(sorted_score_list[::-1], marker='o',linewidth=0, label='accuracy')
ax2.plot(np.ones(shape=(len(sorted_score_list)))*50, color='black', alpha=0.75)
plt.xticks(np.arange(40), subject_ids[::-1])
ax2.set_title('Accuracy with Logistic-Regression for classes intact-faces / rest')
ax2.set_xlabel('Subject IDs')
ax2.set_ylabel('Accuracy in %')
ax2.legend(loc=1)
#plt.savefig('./analysis_images/accuracy_logistic_regression_intact_faces_rest')
#plt.close(figure)
plt.show()

### save the data

In [None]:
#save_array('decoding_data/score_intact_faces_rest_logreg.npy', score_list_if_rest)

### load the data

In [None]:
score_list_if_rest = load_array_from_memory('decoding_data/score_intact_faces_rest_logreg.npy')

## b) evaluation all faces vs all cars

### plot the data

In [None]:
sorted_score_list_faces_cars = np.sort(score_list_faces_cars)
subject_ids = [i+1 for i in range(40)]
subject_ids = [x for _, x in sorted(zip(score_list_faces_cars, subject_ids), key=lambda pair: pair[0])]

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
ax2.plot(sorted_score_list_faces_cars[::-1], marker='o',linewidth=0, label='accuracy')
ax2.plot(np.ones(shape=(len(sorted_score_list_faces_cars)))*50, color='black', alpha=0.75)
plt.xticks(np.arange(40), subject_ids[::-1])
ax2.set_title('Accuracy with Logistic-Regression for classes all-faces / all cars')
ax2.set_xlabel('Subject IDs')
ax2.set_ylabel('Accuracy in %')
ax2.legend(loc=1)
#plt.savefig('./analysis_images/accuracy_logistic_regression_all_faces_all_cars')
#plt.close(figure)
plt.show()

In [None]:
figure = plt.figure()
ax2 = figure.add_subplot(111)
ax2.plot(sorted_score_list_faces_cars[::-1], marker='o',linewidth=0, label='accuracy all faces/all cars')
ax2.plot(sorted_score_list[::-1], marker='o',linewidth=0, label='accuracy intact faces/rest')
ax2.plot(np.ones(shape=(len(sorted_score_list_faces_cars)))*50, color='black', alpha=0.75)
ax2.plot(np.ones(shape=(len(sorted_score_list_faces_cars)))*75, color='black', alpha=0.75)
plt.xticks(np.arange(40), np.arange(start=1, stop=41))
ax2.set_title('Accuracy with Logistic-Regression')
ax2.set_xlabel('Subjects')
ax2.set_ylabel('Accuracy in %')
ax2.legend(loc=1)
#plt.savefig('./analysis_images/accuracy_logistic_regression_all_faces_all_cars')
#plt.close(figure)
plt.show()

### save the data

In [None]:
#save_array('decoding_data/score_all_faces_all_cars_logreg.npy', sorted_score_list_faces_cars)

### load the data

In [None]:
sorted_score_list_faces_cars = load_array_from_memory('decoding_data/score_all_faces_all_cars_logreg.npy')

# -------------------------------------------------------------------------------------

# 3 Doctests

In [None]:
def check_classes(array):
    '''
    >>> check_classes(y_faces_cars)
    1
    
    >>> check_classes(y_intact_faces_rest)
    1
    
    >>> check_classes(y_four_classes)
    3
    '''
    return int(np.max(array))

In [None]:
def check_range_2D_array(to_check, min_value, max_value):
    '''
    >>> check_range_2D_array(score_list_if_rest[0], 0, 100)
    True
    
    >>> check_range_2D_array(score_list_faces_cars[0], 0, 100)
    True
    '''
    for x in to_check:
        for y in x:
            if y < min_value or y > max_value:
                return False
    return True

In [None]:
import doctest
doctest.testmod(verbose=True)