In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np

In [None]:
# print(sorted(os.listdir("/content/drive/MyDrive/Thesis Project/Example_P300/data"))[11:19])

# Define Bandpass filter

In [None]:
import numpy as np
from scipy.signal import butter,filtfilt
from scipy import pi
from scipy.fftpack import fft

# Filter requirements.
fs = 240       # sample rate, Hz
cutoff = 0.5     # desired cutoff frequency of the filter, Hz , slightly higher than actual 1.2 Hz
nyq = 0.5 * fs  # Nyquist Frequency
order = 2       # sin wave can be approx represented as quadratic

def butter_bandpass_filter(data, cutoff, fs, order):
    lowcut = 0.1
    highcut = 60
    low = lowcut / nyq
    high = highcut / nyq
    # Get the filter coefficients 
    b, a = butter(order, [low, high], btype='bandpass', analog=False)
    y = filtfilt(b, a, data)
    return y

In [None]:
def apply_filter(eeg_train):
  eeg_train_clean = []
  for i in range(len(eeg_train)):
    data_transpose = eeg_train[i].T
    # print(data_transpose.shape)
    channel_clean = []
    for j in range(len(data_transpose)):
      # print(channel.shape)
      channel_clean.append(butter_bandpass_filter(data_transpose[j], cutoff, fs, order))
    eeg_train_clean.append(np.array(channel_clean).T)
  eeg_train_clean = np.array(eeg_train_clean)
  return eeg_train_clean

In [None]:
pd.DataFrame(np.load('/content/drive/MyDrive/Thesis Project/BCI_IIb_Testing_Label.npy', allow_pickle = True), columns = ['epoch', 'row/column', 'true_character', 'P300_prob'])

Unnamed: 0,epoch,row/column,true_character,P300_prob
0,1,6,W,
1,1,12,W,
2,1,8,W,
3,1,3,W,
4,1,4,W,
...,...,...,...,...
5575,15,7,E,
5576,15,8,E,
5577,15,4,E,
5578,15,9,E,


# Training File loading and Preprocessing

In [None]:
from scipy.io import loadmat
import os


signal_train = np.array([]).reshape(0,240,64)
intensification_train = np.array([]).reshape(0,1)
label_train = np.array([]).reshape(0,1)
label_0_train = np.array([]).reshape(0,1)
label_1_train = np.array([]).reshape(0,1)

# change the directory here
for filename in sorted(os.listdir("/content/drive/MyDrive/Thesis Project/Example_P300/data"))[:11]:
  mat = loadmat("/content/drive/MyDrive/Thesis Project/Example_P300/data/" + filename)
  # print(mat)
  # get row/column number, label, and signal from the mat file
  StimCode = mat.get('StimulusCode')
  stim_index = np.where(StimCode != 0)[0]
  intensification = StimCode[stim_index]
  labels = mat.get('StimulusType')[stim_index]

  # extend the stim_index considering the delay of response
  signal_index = np.array([])
  stim_index_reshape = stim_index.reshape(int(len(stim_index)/24), 24)
  # print(stim_index_reshape.shape)

  for i in range(len(stim_index_reshape)):
    # print(signal_index[i].shape)
    # print(np.array(range(signal_index[i][-1], signal_index[i][-1] + 240 -24)).shape)
    # print(np.concatenate([signal_index, np.array(range(stim_index_reshape[i][0], stim_index_reshape[i][0] + 240))]).shape)
    signal_index = np.concatenate([signal_index, np.array(range(stim_index_reshape[i][0], int(stim_index_reshape[i][0] + 240)))])
  # signal_index = signal_index.flatten()

  data_transpose = mat.get('signal').T
  channel_clean = []
  for j in range(len(data_transpose)):
    # print(channel.shape)
    channel_clean.append(butter_bandpass_filter(data_transpose[j], cutoff, fs, order))
  signal = np.array(channel_clean).T

  # signal = mat.get('signal')[signal_index.astype(int)]
  signal = signal[signal_index.astype(int)]

  # reshape the data to correct input size (T * F * C)
  F = 240 # sampling frequency
  T = 0.1 # period of time after intensification
  data_size = int(stim_index.shape[0]/ (F * T))
  # print(data_size)

  # normalize the signal
  signal = ((signal.flatten() - signal.flatten().mean())/signal.flatten().std())
  signal = signal.reshape(data_size,240,64)
  
  intensification = intensification.reshape(data_size, 24, 1)[:,0,:]
  labels = labels.reshape(data_size, 24, 1)[:,0,:]
  label_0_index = np.where(labels == 0)[0]
  label_1_index = np.where(labels == 1)[0]
  label_0 = labels[label_0_index]
  label_1 = labels[label_1_index]

  # update training data
  signal_train = np.concatenate([signal_train, signal])
  intensification_train = np.concatenate([intensification_train, intensification])
  label_train = np.concatenate([label_train, labels]) 
  label_0_train = np.concatenate([label_0_train, label_0]) 
  label_1_train = np.concatenate([label_1_train, label_1])


In [None]:
print(signal_train.shape)
print(intensification_train.shape)
print(label_0_train.shape)
print(label_1_train.shape)

(7560, 240, 64)
(7560, 1)
(6300, 1)
(1260, 1)


In [None]:
BCI_IIb_training_Data = signal_train.reshape(1, signal_train.shape[0], signal_train.shape[1], signal_train.shape[2])

In [None]:
BCI_IIb_label = label_train.flatten().reshape(1, label_train.shape[0])

# Training File Saving

In [None]:
# Change the directory here
# np.save('/content/drive/MyDrive/Thesis Project/Example_P300/Processed Data/BCI_IIb_training_data', BCI_IIb_training_Data)
# np.save('/content/drive/MyDrive/Thesis Project/Example_P300/Processed Data/BCI_IIb_training_label', BCI_IIb_label)

# Testing File Loading and Preprocessing

In [None]:
# Change the testing character here
testing_character = 'FOODMOOTHAMPIECAKETUNAZYGOT4567'

In [None]:
# loadmat("/content/drive/MyDrive/Thesis Project/Example_P300/data/AAS012R01.mat")

signal_testing = np.array([]).reshape(0,240,64)
intensification_testing = np.array([]).reshape(0,1)
# label_train = np.array([]).reshape(0,1)
# label_0_train = np.array([]).reshape(0,1)
# label_1_train = np.array([]).reshape(0,1)

# change the directory here
for filename in sorted(os.listdir("/content/drive/MyDrive/Thesis Project/Example_P300/data"))[11:19]:
  mat = loadmat("/content/drive/MyDrive/Thesis Project/Example_P300/data/" + filename)
  # print(mat)
  # get row/column number, label, and signal from the mat file
  StimCode = mat.get('StimulusCode')
  stim_index = np.where(StimCode != 0)[0]
  intensification = StimCode[stim_index]
  # labels = mat.get('StimulusType')[stim_index]

  # extend the stim_index considering the delay of response
  signal_index = np.array([])
  stim_index_reshape = stim_index.reshape(int(len(stim_index)/24), 24)
  # print(stim_index_reshape.shape)

  for i in range(len(stim_index_reshape)):
    # print(signal_index[i].shape)
    # print(np.array(range(signal_index[i][-1], signal_index[i][-1] + 240 -24)).shape)
    # print(np.concatenate([signal_index, np.array(range(stim_index_reshape[i][0], stim_index_reshape[i][0] + 240))]).shape)
    signal_index = np.concatenate([signal_index, np.array(range(stim_index_reshape[i][0], int(stim_index_reshape[i][0] + 240)))])
  # signal_index = signal_index.flatten()

  data_transpose = mat.get('signal').T
  channel_clean = []
  for j in range(len(data_transpose)):
    # print(channel.shape)
    channel_clean.append(butter_bandpass_filter(data_transpose[j], cutoff, fs, order))
  signal = np.array(channel_clean).T

  # signal = mat.get('signal')[signal_index.astype(int)]
  signal = signal[signal_index.astype(int)]

  # reshape the data to correct input size (T * F * C)
  F = 240 # sampling frequency
  T = 0.1 # period of time after intensification
  data_size = int(stim_index.shape[0]/ (F * T))
  # print(data_size)

  # normalize the signal
  signal = ((signal.flatten() - signal.flatten().mean())/signal.flatten().std())
  signal = signal.reshape(data_size,240,64)
  
  intensification = intensification.reshape(data_size, 24, 1)[:,0,:]
  # labels = labels.reshape(data_size, 24, 1)[:,0,:]
  # label_0_index = np.where(labels == 0)[0]
  # label_1_index = np.where(labels == 1)[0]
  # label_0 = labels[label_0_index]
  # label_1 = labels[label_1_index]

  # update testing data
  signal_testing = np.concatenate([signal_testing, signal])
  intensification_testing = np.concatenate([intensification_testing, intensification])
  # label_train = np.concatenate([label_train, labels]) 
  # label_0_train = np.concatenate([label_0_train, label_0]) 
  # label_1_train = np.concatenate([label_1_train, label_1])
epoch = np.array([[i]*12 for i in range(1,16)]*len(testing_character)).flatten()
# print(epoch)
print(signal_testing.shape)
print(intensification_testing.shape)
# 'WORDTESTBCIRENEEBLOWFOGHAVEWINE'
character_list = np.array([[character]*180 for character in testing_character]).flatten()
character_index_list = np.array([[character_index]*180 for character_index in range(len(testing_character))]).flatten()
character_index_list.shape
testing_label = pd.DataFrame({'epoch': epoch, 'row/column': intensification_testing.astype(int).flatten(), 'true_character': character_list, 'true_character_index': character_index_list}, columns=['epoch','row/column', 'true_character', 'true_character_index', 'P300_prob'])
# testing_data['signal'] = signal_testing
# testing_data

(5580, 240, 64)
(5580, 1)


In [None]:
testing_label

Unnamed: 0,epoch,row/column,true_character,true_character_index,P300_prob
0,1,6,F,0,
1,1,12,F,0,
2,1,8,F,0,
3,1,3,F,0,
4,1,4,F,0,
...,...,...,...,...,...
5575,15,7,7,30,
5576,15,8,7,30,
5577,15,4,7,30,
5578,15,9,7,30,


# Testing File Saving

In [None]:
# Change the directory here
# np.save('/content/drive/MyDrive/Thesis Project/Example_P300/Processed Data/BCI_IIb_testing_data', signal_testing)
# np.save('/content/drive/MyDrive/Thesis Project/Example_P300/Processed Data/BCI_IIb_testing_label', testing_label)