In [17]:
import numpy as np
import pandas as pd
import pywt
import mne
import scipy.stats
import warnings
warnings.filterwarnings('ignore')
from collections import Counter
from sklearn.ensemble import GradientBoostingClassifier

Get raw eeg data

In [31]:
file_name = 'sub-01/eeg/sub-01_task-run4_eeg.edf'
raw_data = mne.io.read_raw_edf(file_name)
raw_data.set_montage('standard_1020', match_case=False)
ica = mne.preprocessing.ICA(n_components=19, random_state=97, max_iter=800)
ica.fit(raw_data)

Extracting EDF parameters from /Users/tavish/PycharmProjects/eeg_music/ds002721-master/sub-01/eeg/sub-01_task-run4_eeg.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Fitting ICA to data using 19 channels (please be patient, this may take a while)
Selecting by number: 19 components
Fitting ICA took 11.6s.


0,1
Method,fastica
Fit,44 iterations on raw data (518000 samples)
ICA components,19
Available PCA components,19
Channel types,eeg
ICA components marked for exclusion,—


Grab times and responses from our data

In [32]:
file = '/Users/tavish/PycharmProjects/eeg_music/ds002721-master/sub-01/eeg/sub-01_task-run4_events.tsv'
events = pd.read_csv(file, sep = '\t')
starts = events[events['trial_type'] == 788]
starts = starts.rename_axis('index1').reset_index()
responses = []
for i in range(starts.shape[0]):
    current = starts.iloc[i]['index1']
    df_new = events.drop(range(int(current)))[events['trial_type'] == 801].rename_axis('index1').reset_index().iloc[0, 0]
    answers = events.drop(range(int(df_new)))[(events['trial_type'] >= 900) & (events['trial_type'] <= 909)].iloc[0, 2]
    responses += [answers - 900]
start_times = starts['onset'].to_list()
print(start_times)
print(responses)

[5.31, 54.437, 103.196, 160.445, 209.091, 257.294, 319.239, 371.167, 422.713, 469.64]
[3, 1, 4, 7, 1, 1, 1, 9, 1, 1]


Grab EEG Data for this timestamp


In [164]:
eeg_data = []
for start_time in start_times:
    start = start_time
    end = (start_time + 12)
    front_1 = raw_data[3, start:end]
    #front_2 = raw_data[4, start:end]
    eeg_data.append(front_1[0][0])
eeg_data

[array([-9.72938974e-05, -9.44676215e-05, -9.63875811e-05, -9.81882884e-05,
        -9.61610020e-05, -9.65664593e-05, -9.79020833e-05, -9.80690363e-05,
        -9.91303804e-05, -9.98458932e-05, -9.83671666e-05, -9.93331090e-05]),
 array([-9.81167371e-05, -1.00346752e-04, -9.78186068e-05, -9.54216387e-05,
        -9.74131495e-05, -9.96073889e-05, -1.00966863e-04, -1.01706227e-04,
        -9.91661560e-05, -9.90469039e-05, -1.00239425e-04, -9.94523612e-05]),
 array([-9.91900064e-05, -9.85937457e-05, -9.84625684e-05, -1.00096323e-04,
        -9.87249231e-05, -9.70434678e-05, -9.71030939e-05, -9.45272476e-05,
        -9.37640339e-05, -9.28338672e-05, -9.29769697e-05, -9.68407392e-05]),
 array([-9.00791426e-05, -9.02103200e-05, -9.23687838e-05, -9.37282582e-05,
        -9.10570102e-05, -9.09973842e-05, -9.25834377e-05, -9.27742411e-05,
        -9.19275509e-05, -9.22853073e-05, -9.26430637e-05, -9.07350294e-05]),
 array([-8.64419523e-05, -8.88389203e-05, -8.89343221e-05, -8.73601938e-05,
    

Label our data

In [166]:
a = {'response': responses, 'eeg_data': eeg_data}
df = pd.DataFrame(data=a)
df.head(10)

Unnamed: 0,response,eeg_data
0,3,"[-9.729389735099334e-05, -9.446762153996393e-0..."
1,1,"[-9.811673714407784e-05, -0.000100346752235480..."
2,4,"[-9.919000643940545e-05, -9.8593745719779e-05,..."
3,7,"[-9.007914264351324e-05, -9.021032000183106e-0..."
4,1,"[-8.644195225379187e-05, -8.883892034669022e-0..."
5,1,"[-9.362093131809437e-05, -9.236878380687882e-0..."
6,1,"[-9.447954675435647e-05, -9.698384177678759e-0..."
7,9,"[-9.8391017075106e-05, -9.870107264931177e-05,..."
8,1,"[-9.247611073641158e-05, -9.072310422070981e-0..."
9,1,"[-8.291208879360327e-05, -7.883366547135833e-0..."


General helper code for retrieving features from our data

In [167]:
#this cell gives us the features of a particular eeg signal, which is how we analyze it

def calculate_entropy(list_values): #this measures the purity of our data, so higher values mean our data is less pure and so we are less likely to draw a good conclusion from it
    counter_values = Counter(list_values).most_common()
    probabilities = [elem[1]/len(list_values) for elem in counter_values]
    entropy=scipy.stats.entropy(probabilities)
    return entropy

def calculate_statistics(list_values): #this calculates a bunch of statistics for our data, including percentiles, median, mean, standard deviation, variance, and root mean squared
    n5 = np.nanpercentile(list_values, 5)
    n25 = np.nanpercentile(list_values, 25)
    n75 = np.nanpercentile(list_values, 75)
    n95 = np.nanpercentile(list_values, 95)
    median = np.nanpercentile(list_values, 50)
    mean = np.nanmean(list_values)
    std = np.nanstd(list_values)
    var = np.nanvar(list_values)
    rms = np.nanmean(np.sqrt(list_values**2))
    return [n5, n25, n75, n95, median, mean, std, var, rms]

def calculate_crossings(list_values): #this calculates crossing rates for signals, which is essentially just how quickly our signal changes according to certain metrics(for instance, zero_crossings is how often it changes to zero)
    zero_crossing_indices = np.nonzero(np.diff(np.array(list_values) > 0))[0]
    no_zero_crossings = len(zero_crossing_indices)
    mean_crossing_indices = np.nonzero(np.diff(np.array(list_values) > np.nanmean(list_values)))[0]
    no_mean_crossings = len(mean_crossing_indices)
    return [no_zero_crossings, no_mean_crossings]

def get_features(list_values): #this returns the features of our data, a list of the results from all three of our above functions
    entropy = calculate_entropy(list_values)
    crossings = calculate_crossings(list_values)
    statistics = calculate_statistics(list_values)
    return np.array([entropy] + crossings + statistics)

General code to generate train and test sets

Generate test and train data specifically for our eeg data

In [193]:
from sklearn.model_selection import train_test_split
test_size = 2
X_train, X_test, Y_train, Y_test = train_test_split(df.loc[:, 'eeg_data'], df.loc[:, 'response'],test_size=test_size)
X_train = X_train.to_list()
Y_train = Y_train.to_list()
X_test = X_test.to_list()
Y_test = Y_test.to_list()

Train classifier

In [196]:
#this cell trains our classifier on the training data and then tests it on the test data, returning the accuracy. We can mess with the parameters of the classifier as needed
cls = GradientBoostingClassifier(n_estimators=10000, random_state=3)
cls.fit(X_train, Y_train)

In [197]:
guesses = cls.predict(X_test)
def score(guesses, real):
    return sum((guesses-real)**2)/len(real)
print(score(guesses, Y_test)) #score metric is how far we are on average

32.0
