In [132]:
import os
import numpy as np
import mne
import sklearn
import matplotlib.pyplot as plt

from AudioOnsetUtils import *

RAW_EOG_CHANNELS = [u'EXG1', u'EXG2', u'EXG3', u'EXG4']
MASTOID_CHANNELS = [u'EXG5', u'EXG6']

In [133]:
## This cell gets filtered data so we can open ICA for Participant PNUM

def get_participant_filtered_data(PNUM):
    data_raw_file = os.path.join('raw_data', 'P' + PNUM + '-raw.fif')
    raw = mne.io.read_raw_fif(data_raw_file)

    # remove mastoid channels if present 
    if MASTOID_CHANNELS[0] in raw.ch_names:
        mne.io.set_eeg_reference(raw.load_data(), MASTOID_CHANNELS, copy=False) # inplace
        raw.drop_channels(MASTOID_CHANNELS)

    # Drop bad channels - in place on raw
    for bad_channel in raw.info['bads']:
        raw.drop_channels(bad_channel)
        print("dropped: " + bad_channel)

    eeg_picks = mne.pick_types(raw.info, meg=False, eeg=True, eog=False, stim=False, exclude=[])

    # bandpass filter - keeping a frequency range between 0.5 (high pass filter) and 30 Hz (low pass filter)
    filtered_data = raw.load_data().filter(0.5, 30, picks=eeg_picks)
    
    return filtered_data

In [134]:
## This cell opens the ICA file for Participant PNUM

def get_ica_data_from_file(filtered_data, PNUM):
    ica = mne.preprocessing.read_ica('ica_data/P' + PNUM + '-ica.fif')

    # apply the transformation
    postica_data = ica.apply(filtered_data, exclude=ica.exclude)
    return postica_data

In [135]:
## Original code from https://github.com/sstober/openmiir-rl-2016/blob/master/openmiir/events.py#L21 and adapted 
KEYSTROKE_BASE_ID = 2000
TIME_INDEX = 0
ID_INDEX = 1

# returns two things, a True if Not Noise boolean and a tuple with three elements:
# 0- this is lyrical (0-2) <- YLABEL
# 1- start time (onsettime + cue duration if condition 1/2, else just onsettime)
# 2- end time (starttime + duration of song without cue)
def get_event_data(event_times_ids, events_index, music_version, postica_data):
    event_id = event_times_ids[events_index[0]][ID_INDEX]

    if event_id < 1000:
        """
        Event Ids < 1000 are trial labels
        with the last digit indicating the condition
                1 : 'perception',
                2 : 'cued imag',
                3 : 'imag fix cross',
                4 : 'imagination',
        and the remaining digits referring to the stimulus id.
        """
        stimulus_id, condition = decode_event_id(event_id)
        
        stimulus_info = get_vers_stim_dict(music_version)[stimulus_id]
        cue_length = stimulus_info['cue_length']
        song_length = stimulus_info['song_length']
        
        events_index[0] += 1
        
        #print(get_id_to_song_name(stimulus_id))
        
        # get time of audio onset for this stimulus
        while event_times_ids[events_index[0]][ID_INDEX] != 1000:
            #print("Expected an audio onset event but got {}".format(event_times_ids[events_index[0]]))
            events_index[0] += 1
            
        audio_onset_id = event_times_ids[events_index[0]][TIME_INDEX]
        audio_onset_time = postica_data.times[audio_onset_id]
        start_time = (audio_onset_time + cue_length) if condition in [1,2] else audio_onset_time
        end_time = start_time + song_length
        
        # move the pointer by 1 to prep next call to get_event_data
        events_index[0] += 1
        
        return True, (stimulus_info["audio_type"], start_time, end_time)
    else:
        # move the pointer by 1 to prep next call to get_event_data
        events_index[0] += 1
        
        return {
            1111: (False, ("noise")),
            KEYSTROKE_BASE_ID: (False, 'imagination failed'),
            KEYSTROKE_BASE_ID+1: (False, 'imagination okay')
        }[event_id]
    
    
# convert event id to stimulus (song num) and condition (1-4)
def decode_event_id(event_id):
    if event_id < 1000:
        stimulus_id = event_id // 10
        condition = event_id % 10
        return stimulus_id, condition
    else:
        return event_id

In [136]:
def get_event_data_list(event_times_ids, p):
    events_data_list = []
    events_index = [0]
    music_version = get_participant_vers(p)

    while len(events_data_list) < 50:
        is_audio, event_data = get_event_data(event_times_ids, events_index, music_version, postica_data)
        if is_audio:
            # ignore AudioType.NONLYRICAL_LYRICAL (songs that have lyrics but played w/o lyrics)
            if event_data[0] == AudioType.LYRICAL or event_data[0] == AudioType.NON_LYRICAL:
                events_data_list.append(event_data)
    
    return events_data_list

In [137]:
def get_channels_eeg_data(event_matrix, start_time, end_time, ch_names, postica_data):
    # for each channel
    # extract this individual event's eeg data btwn start and endtime

    for i in range(len(ch_names)):
        channel_index = 0
        sampling_freq = postica_data.info['sfreq']
        start_stop_seconds = np.array([11, 13])
        start_sample, stop_sample = (start_stop_seconds * sampling_freq).astype(int)
        
        # returns a tuple, see https://mne.tools/dev/auto_tutorials/raw/10_raw_overview.html
        ch_selection = postica_data[i, start_sample:stop_sample]
    
        # append eeg data list to eventMatrix
        event_matrix.append(ch_selection) 
        
        # print("CH_SELECTION")
        # print(ch_selection)

In [138]:
## This cell prepares X_list and Y_list (data and labels)

X_list = []
Y_list = []
monster_matrix = [] # list of matrices

part_list = ["01"]

for part in get_participant_list():
    # load ica data for participant
    filtered_data = get_participant_filtered_data(part)
    postica_data = get_ica_data_from_file(filtered_data, part)

    # from ica data, get stimuli events
    events = mne.find_events(postica_data)

    # creates tuples of (event_time, event_id)
    event_times_ids = [(events[:][i][0], events[:][i][2]) for i in range(len(events))]
    
    # get a list of event datas (enum, starttime, endtime)
    events_list = get_event_data_list(event_times_ids, part)
    
    # for each event in event datas
    for audio_type,start_time,end_time in events_list:
        event_matrix = [] # list of lists

        # get eeg data for all 62 channels, put into event_matrix
        eeg_ch_names = postica_data.ch_names[:62]
        get_channels_eeg_data(event_matrix, start_time, end_time, eeg_ch_names, postica_data)

        # call normalize on event_matrix
      
        # append event_matrix to monster_matrix
        monster_matrix.append(event_matrix)

#         Y_list.append(audio_type)=
        
        # 0 for non-lyrical, 1 for lyrical
        Y_list.append(1) if audio_type == AudioType.LYRICAL else Y_list.append(0)

Opening raw data file raw_data/P01-raw.fif...
Isotrak not found
    Read a total of 1 projection items:
        Average EEG reference (1 x 64)  idle
    Range : 0 ... 2478165 =      0.000 ...  4840.166 secs
Ready.
Removing projector <Projection | Average EEG reference, active : False, n_channels : 64>
dropped: P8
dropped: P10
dropped: T8
Reading 0 ... 2478165  =      0.000 ...  4840.166 secs...
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 3381 samples (6.604 sec)

Reading ica_data/P01-ica.fif ...
Isot

In [131]:
# XList = pca on monstermatrix for feature vector (SOUMYA AND CHRIS)
# make final list of feature vectors

In [9]:
# split data using (training vs testing)
# test_size: what proportion of original data is used for test set
train_data, test_data, train_lbl, test_lbl = train_test_split(X_list, Y_list, test_size=0.20, random_state=0)

NameError: name 'train_test_split' is not defined

In [None]:
def calcAccuracy(predictions, y):
    # True positives, false positives, etc.
    TP_ = np.logicaland(predictions, y)
    FP = np.logical_and(predictions, np.logicalnot(y))
    TN = np.logical_and(np.logical_not(predictions), np.logicalnot(y))
    FN = np.logical_and(np.logicalnot(predictions), y)

    TP = sum(TP)
    FP = sum(FP)
    TN = sum(TN)
    FN = sum(FN_)
    
    accuracy = (TP + TN)/(TP + FP + TN + FN)
    print('Accuracy:{}'.format(accuracy))
    
    return accuracy

In [None]:
# perform classification using SVM

from sklearn import svm

clf = svm.LinearSVC()
clf.fit(train_data, train_lbl)
y_pred = clf.predict(test_data)

# compare predictions for accuracy
accuracy = calcAccuracy(y_pred, Y_list)

# we could compare accuracy linearSVC and just SVC