go to   Runtime > Change runtime type >  Hardware accelerator.
choose GPU to make the model run faster

# Mounting Gdrive and loading modules (Change depending on your configuration)

In [6]:
from datetime import datetime, timedelta
import numpy as np
# Mounting gdrive
from google.colab import drive
drive.mount('/gdrive')

# Installing missing modules
!pip install mne

# Importing coleeg functions (reload function is used to successfully update any modification in coleeg.py file)
import sys
from importlib import reload
sys.path.append("/gdrive/MyDrive/coleeg/ver1/")
import coleeg
from coleeg import *

# disable line breaking when printing array objects
np.set_printoptions(linewidth=np.nan)


# creating folder for results
import os.path
from os import path
if path.exists('/gdrive/MyDrive/results') == False:
  os.mkdir('/gdrive/MyDrive/results')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [22]:
# reload coleeg  (run it when coleeg.py is changed)
reload(coleeg)
from coleeg import *

# Show info about coleeg

In [27]:
print(coleeg.__version__)

1.0


In [None]:
coleeg_info()

# Obtaining data files (Change depending on your configuration.)

In [24]:
#==================== Options =======================
dataset = 'physionet' # Select 'physionet', or  'bcicomptIV2a'
#=====================================================

start_time = time.time()
unzip_dataset(dataset=dataset)
print(f'Time = {timedelta(seconds=round(time.time() - start_time))}')

Data directory already exists.
Time = 0:00:00


# Loading data

In [26]:
#====================== Options  ==============================
# Determining dataset
dataset = 'physionet' # Select 'physionet' or 'bcicomptIV2a'

resample_freq = 100 # resample frequency in Hz (Set to None to deactivate)

Bands = None 
#Bands = [(0.5, 8.0), (8.0, 13.0), (13.0, 40.0)] # Uncomment this line to use filters

notch_freqs = None # frequency (or list of frequencies) to which a notch filter is applied

tmin = 0.0
tmax = 2.0

# select tasks for Physionet dataset
# Tasks = np.array([[3,7,11],[5,9,13]]) # Real movement
Tasks=np.array([[4,8,12],[6,10,14]]) # Uncommnet this line for imagined movement
#=============================================================================================
# set sample frequency
Sample_Freqs = {'physionet':160, 'bcicomptIV2a':250}
if resample_freq is not None:
  Fs = resample_freq
else:
  Fs = Sample_Freqs[dataset]

# Delete variables
data_input,data_x, data_x2D, data_y, data_yc,  train_x, train_x2D, train_y, train_yc, test_x, test_x2D, test_y, test_yc = 0,0,0,0,0,0,0,0,0,0,0,0,0
del data_input, data_x, data_x2D, data_y, data_yc,  train_x, train_x2D, train_y, train_yc, test_x, test_x2D, test_y, test_yc

start_time = time.time()

# loading data
if dataset is 'bcicomptIV2a':
  import warnings
  warnings.filterwarnings("ignore", category=DeprecationWarning) # disable annoying error messages from python
  data_x, data_y, data_index = get_data_bcicomptIV2a(resample_freq=resample_freq, Bands=Bands, tmin=tmin, tmax=(tmax-1/Fs), Baseline=None)
elif dataset is 'physionet':
  data_x, data_y, data_index = get_data_physionet(resample_freq=resample_freq, Tasks=Tasks, Bands=Bands,  tmin=tmin, tmax=(tmax-1/Fs), Baseline=None, notch_freqs=notch_freqs)
else:
  print('Wrong dataset')

print(f'\nTime = {timedelta(seconds=round(time.time() - start_time))}')

Excluded Subjects are [ 88  89  92 100 104]
Subject: 109
Time = 0:01:13


In [28]:
# balancing data to minimum occured class. Balancing is done for each subject
data_x, data_y, data_index = balance(data_x, data_y, data_index)

In [29]:
# normalizing data (mean => 0, standard deviation => 1)
normalize(data_x)

In [30]:
# Generating categorical data
data_yc = to_categorical(data_y)

# Processing data to investigate trimming (the end) and cutting (the beginning) of time samples

In [None]:
#=============options================
Process = 'None' # select 'trim', 'cut' or None
trim_time= 2 # number of second to trim after
cut_time= 3 # number of second to cut before
#========================================

if Process == 'trim':
  data_x = data_x[:,0:trim_time*Fs,:,:]
elif Process == 'cut':
  data_x = data_x[:,cut_time*Fs:,:,:]
elif Process != None:
  raise('Wrong process')

# Model Evaluation

In [31]:
#====================== Options ===========================
# model_dict: dictionary for models and list of epochs for training (more than one epoch can be tested for each model)
# All models
# model_dict = {'Basic':[60], 'CNN1D':[60], 'EEGNet':[60], 'ShallowConvNet':[80],
#               'DeepConvNet':[50], 'CNN2D':[20], 'CNN3D':[20], 'TimeDist':[40]}
# model_dict = {'Basic':[20,30,40], 'CNN1D':[20,30]} #  single/multiple models and epochs can be selected
model_dict = {'CNN1D':[20]}
splits = 5 # number of splits for test, testing ratio = 1/splits
# splits = data_index.shape[0] # uncomment for per-subject evaluations (align_to_subject should be True)
kfold = False # set to True to enable validating all folds (i.e. splits)
align_to_subject = False # (True/False) do best to align split boundary to subjects (necessary for cross validation)
shuffle = True # shuffle data before splitting, has no effect if align_to_subject is true
batch_norm = True# Enable/Disable Batch normalization in the models: 'EEGNet', 'ShallowConvNet', 'DeepConvNet'
show_summary = False # Show summery of the model (has no effect if kfold  is true)
verbose = True # Show validation progress (has no effect if kfold  is true)
#===========================================================

for model_type in model_dict:
  for epochs in model_dict[model_type]:
    print(f'Number of epochs = {epochs}')
    # open file for saving data
    now = datetime.now().strftime("%Y%m%d_%H%M%S")
    file_h = open(f'/gdrive/MyDrive/results/{dataset}_{model_type}_{epochs}_{now}.txt', "w+")

    if splits > data_index.shape[0] and align_to_subject:
      raise ValueError('Number of splits should be less than number of subjects.')
    elif splits>data_x.shape[0]:
      raise ValueError('Number of splits should be less than number of trials.')

    if model_type in ['CNN3D', 'TimeDist']: # these model requires 2D mapped data
      # Generating 2D mapped data
      pos_map = get_pos_map(dataset) # positions for 2D map conversion
      data_input = make_into_2d(data_x,pos_map)
    else:
      data_input = data_x

    if align_to_subject:# distributing subjects on splits in a fair way
      N = data_index.shape[0]
      sub_split = np.ones(splits)*np.floor(N/splits)
      sub_split[0:(N % splits)] +=1
      sub_split_index = np.append(0,np.cumsum(sub_split)).astype(np.int)

    Iterations = splits if kfold else 1
    accuracy = np.zeros(Iterations)
    start_time = time.time()
    for index in range(Iterations):
      # Splitting data 
      if align_to_subject:
        sub_test_index = np.arange(sub_split_index[index],sub_split_index[index+1])
        train_x, test_x = split_subjects(data_x,data_index,sub_test_index)
        train_y, test_y = split_subjects(data_y,data_index,sub_test_index)
        train_yc, test_yc = split_subjects(data_yc,data_index,sub_test_index)
      else:
        train_x, test_x = split_data(data_input, Splits=splits, split_index=index, shuffle=shuffle)
        train_y, test_y = split_data(data_y, Splits=splits, split_index=index, shuffle=True)

      # Generating categorical data
      train_yc = to_categorical(train_y)
      test_yc = to_categorical(test_y)

      # Building and validating  model (simple validation)

      show_summary = False if kfold else show_summary
      verbose = 0 if kfold else int(verbose)
      model = build_model(train_x, train_yc, model_type=model_type,show_summary=show_summary, batch_norm=batch_norm)    
      accuracy[index] = validate_model(model, train_x, train_yc, test_x, test_yc, epochs=epochs, batch_size=64, verbose=verbose)

      if align_to_subject:
        print(f'Accuracy = {accuracy[index]*100:0.1f}% for subject(s) {data_index[sub_test_index,1]}')
        file_h.write(f'{accuracy[index]:0.5f}, {data_index[sub_test_index,1]}\n ')
      else:
        print('Accuracy for split No. '+ str(index + 1)+ ' = ' + f'{accuracy[index]*100:0.1f}%')
        file_h.write(f'{accuracy[index]:0.5f}, ')

    if Iterations >1:
      if align_to_subject:
        print('Average accuracy = ' + f'{(accuracy * sub_split).sum()/N*100:0.1f}%')
        file_h.write(f'Average accuracy = {(accuracy * sub_split).sum()/N*100:0.1f}%\n')
      else:
        print('Average accuracy = ' + f'{accuracy.mean()*100:0.1f}%' )
        file_h.write(f'Average accuracy: {accuracy.mean()*100:0.1f}\n')
    total_time = timedelta(seconds=round(time.time() - start_time))   
    print('Total time = ' + f'{total_time}')
    
    file_h.write(f'Total time = {total_time}\n')
    # Writing general information to the file
    tasks_str = str(Tasks).replace('\n', '')
    file_h.write(f'Bands: {Bands}, Resample Freq.: {resample_freq}. Notch freq: {notch_freqs}\n Tmin:{tmin}, Tmax: {tmax}\n Tasks: {tasks_str}\n')
    file_h.close()

# Play an audio to alert for finshing
from google.colab import output
output.eval_js('new Audio("https://upload.wikimedia.org/wikipedia/commons/4/42/Bird_singing.ogg").play()')

Number of epochs = 20
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Accuracy for split No. 1 = 55.8%
Total time = 0:00:47


Prediction

In [None]:
# Prediction for each class
prediction_list = predict_model(model,test_x,test_y)

Testing accuracy = 56.6%
Prediction accuracy for class 0 = 67.1%
Prediction accuracy for class 1 = 68.2%
Prediction accuracy for class 2 = 64.1%
Prediction accuracy for class 3 = 45.0%
Prediction accuracy for class 4 = 39.4%


# Removing classes

In [None]:
classes = None
# classes = [0,3,4] # Uncomment and put the classes to be removed

if classes is not None:
  data_y, data_x, data_index  = remove_classes(data_y, classes, data_x, data_index = data_index)
  data_yc = to_categorical(data_y)

# Visualization (video array)

In [32]:
# Generating 2D mapped data
pos_map = get_pos_map(dataset) # positions for 2D map conversion
data_x2D = make_into_2d(data_x,pos_map)

In [35]:
# Generating video array for random samples
video_array(data_x2D, data_y, Class=1, Band=0, Rows=2, Cols=3)

samples = [4614 5930 9708 9257  337 1918]
