# Neural Network Testing Framework
This notebook is intended to be a general framework for testing Neural Neworks on the EEG data set. While the initial iteration of this notebook is used for a simple Convolutional Neural Network (CNN), other networks using PyTorch can be implemented and assigned to the neural network variable. Many of the beginning cells are used to intialize for a Google Drive Colaboratory notebook GPU usability and can be ignored as necessary.

---



##Initialization
Google Drive access, PyTorch, etc.

In [0]:
!kill -9 -1

This section provides access to the user's Google drive.

In [0]:
!apt-get install -y -qq software-properties-common python-software-properties module-init-tools
!add-apt-repository -y ppa:alessandro-strada/ppa 2>&1 > /dev/null
!apt-get update -qq 2>&1 > /dev/null
!apt-get -y install -qq google-drive-ocamlfuse fuse
from google.colab import auth
auth.authenticate_user()
from oauth2client.client import GoogleCredentials
creds = GoogleCredentials.get_application_default()
import getpass
!google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret} < /dev/null 2>&1 | grep URL
vcode = getpass.getpass()
!echo {vcode} | google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret}

This section creates a drive that links to the user's Google drive.

In [46]:
!mkdir -p drive
!google-drive-ocamlfuse drive

fuse: mountpoint is not empty
fuse: if you are sure this is safe, use the 'nonempty' mount option


This section installs PyTorch.

In [0]:
# http://pytorch.org/
from os import path
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())

accelerator = 'cu80' if path.exists('/opt/bin/nvidia-smi') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.3.0.post4-{platform}-linux_x86_64.whl torchvision

##Imports

Imports can be added as necessary.

In [0]:
import numpy as np
import h5py
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.cuda
from torch.utils.data import Dataset
from torch.autograd import Variable

##Classes

###EEGDataset
This class inherits the torch.utils.data.Dataset class to be used with the torch.utils.data.Dataloader class.

In [0]:
class EEGDataset(Dataset):
  """EEG dataset."""
  
  def __init__(self, x, y, transform=None):
    """
    Args:
      x (numpy array): Input data of shape 
                       num_trials x num_electrodes x num_time_bins.
      y (numpy array): Output data of shape num_trials x 1.
      transform (callable, optional): Optional transform to be applied.
    """
    self.x = x
    self.y = y
    self.transform = transform
    
  def __len__(self):
    return len(self.x)
  
  def __getitem__(self, idx):
    x_sample = torch.from_numpy(self.x[idx])
    y_sample = torch.IntTensor([int(self.y[idx])])
    
    if self.transform:
      pass #FIXME
    
    return x_sample, y_sample

###EEGMinimalContainer
This class holds a train and test EEGDataset. It processes the data into a (N, C, H, W) format in time.

In [0]:
class EEGMinimalContainer():
  """EEG container for training and testing datasets."""
  
  def __init__(self, data_dir, train_subject=None, test_subject=None, 
               remove_eog_channels=True, seed=42):
    """
    Args:
      data_dir (string): Path to all A0iT_slice.mat files for i in [1, 9].
      train_subject(int): Subject to train on. If None, train on all.
      test_subject(int): Subject to test on. If None, train on all except for
                         train_subject. Only used if train_subject is not None.
    """
    self.X_train = None
    self.y_train = None
    self.X_test = None
    self.y_test = None
    self.train_dataset = None
    self.test_dataset = None
    np.random.seed(seed)
    
    if train_subject is None:
      # Step 1: Append all of the input and output data together
      X = None
      y = None
      end = np.empty(9)
      for i in np.arange(9):
        A0iT = h5py.File(data_dir + ('/A0%dT_slice.mat' % (i+1)), 'r')
        X_temp = np.copy(A0iT['image'])
        y_temp = np.copy(A0iT['type'])
        y_temp = y_temp[0,0:X_temp.shape[0]:1]
        y_temp = np.asarray(y_temp, dtype=np.int32)
        X = X_temp if X is None else np.append(X, X_temp, axis=0)
        y = y_temp if y is None else np.append(y, y_temp, axis=0)
        end[i] = X_temp.shape[0] if i == 0 else X_temp.shape[0] + end[i-1]
      X = np.expand_dims(X, axis=1)
      y -= 769
      # Step 2: Remove the EOG
      if remove_eog_channels:
        X = X[:, :, 0:22, :] 
      # Step 3: Remove NaN trials
      remove_list = []
      for i in range(len(X)):
        if np.isnan(X[i]).any():
          remove_list.append(i)
      for trial_row in remove_list:
        end[end > trial_row] -= 1
      X = np.delete(X, remove_list, axis=0)
      y = np.delete(y, remove_list, axis=0)
      # Step 4: Generate an train/test split
      remove_list = []
      self.X_test = {}
      self.y_test = {}
      self.test_dataset = {}
      sloc = 0
      for i, eloc in enumerate(end, 1):
        t_list = np.random.choice(np.arange(sloc, eloc), 50, replace=False)
        t_list = t_list.astype(int)
        self.X_test[str(i)] = X[t_list, :, :, :]
        self.y_test[str(i)] = y[t_list]
        self.test_dataset[str(i)] = EEGDataset(X[t_list, :, :, :], y[t_list])
        remove_list = remove_list + t_list.tolist()
        sloc = eloc
      self.X_train = np.delete(X, remove_list, axis=0)
      self.y_train = np.delete(y, remove_list, axis=0)
      self.train_dataset = EEGDataset(self.X_train, self.y_train)
      
      print('EEGContainer X_train: ' + str(self.X_train.shape))
      print('EEGContainer y_train: ' + str(self.y_train.shape))
      for i in range(1, 10):
        print(('EEGContainer X_test%d: ' %i) + str(self.X_test[str(i)].shape))
        print(('EEGContainer y_test%d: ' %i) + str(self.y_test[str(i)].shape))
    
    else:
      pass #FIXME

###Fully-Connected Neural Network

####FNN1

Extremely Vanilla FNN

In [0]:
class FNN1(nn.Module):
  def __init__(self):
    super(FNN1, self).__init__()
    self.input_size = 1 * 22 * 1000
    self.fc1 = nn.Linear(self.input_size, 1000)
    self.fc2 = nn.Linear(1000, 500)
    self.fc3 = nn.Linear(500, 120)
    self.fc4 = nn.Linear(120, 80)
    self.fc5 = nn.Linear(80, 4)
    
  def forward(self, x):
    x = x.view(-1, self.input_size)
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = F.relu(self.fc3(x))
    x = F.relu(self.fc4(x))
    x = F.relu(self.fc5(x))
    return x

###Convolutional Neural Network

####CNN1
Extremely Vanilla CNN. Note the 3x3 convolutions do not take advantage of the temporal information.

conv+relu - conv+relu - pool - fc+relu - fc+relu - fc

In [0]:
class CNN1(nn.Module):
  def __init__(self):
    super(CNN1, self).__init__()
    self.conv1 = nn.Conv2d(1, 4, 3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(4, 8, 3, stride=1, padding=1)
    self.pool = nn.MaxPool2d(2, 2)
    self.fc1 = nn.Linear(8 * 11 * 500, 120)
    self.fc2 = nn.Linear(120, 80)
    self.fc3 = nn.Linear(80, 4)
  
  def forward(self, x):
    # x is 1 x 22 x 1000
    x = self.pool(F.relu(self.conv2(F.relu(self.conv1(x)))))
    # x is 8 x 11 x  500
    x = x.view(-1, 8 * 11 * 500)
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)
    return x    

##Setup

In [0]:
data_dir = 'drive/ee239as/project_datasets'
batch_size = 17
num_epochs = 20
learning_rate = 1e-3

use_cuda = True

In [54]:
EEGset = EEGMinimalContainer(data_dir)

train_loader = torch.utils.data.DataLoader(EEGset.train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = {}
for i in range(1, 10):
  test_loader[str(i)] = torch.utils.data.DataLoader(EEGset.test_dataset[str(i)],
                                                    batch_size=10,
                                                    shuffle=False)

EEGContainer X_train: (2108, 1, 22, 1000)
EEGContainer y_train: (2108,)
EEGContainer X_test1: (50, 1, 22, 1000)
EEGContainer y_test1: (50,)
EEGContainer X_test2: (50, 1, 22, 1000)
EEGContainer y_test2: (50,)
EEGContainer X_test3: (50, 1, 22, 1000)
EEGContainer y_test3: (50,)
EEGContainer X_test4: (50, 1, 22, 1000)
EEGContainer y_test4: (50,)
EEGContainer X_test5: (50, 1, 22, 1000)
EEGContainer y_test5: (50,)
EEGContainer X_test6: (50, 1, 22, 1000)
EEGContainer y_test6: (50,)
EEGContainer X_test7: (50, 1, 22, 1000)
EEGContainer y_test7: (50,)
EEGContainer X_test8: (50, 1, 22, 1000)
EEGContainer y_test8: (50,)
EEGContainer X_test9: (50, 1, 22, 1000)
EEGContainer y_test9: (50,)


In [0]:
net = CNN1()

if use_cuda and torch.cuda.is_available():
  net.cuda()

In [0]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

##Training

In [57]:
loss_arr = np.empty(num_epochs * (len(EEGset.train_dataset)//batch_size))
training_acc_arr = np.empty(num_epochs)
testing_acc_arr = np.empty((9, num_epochs))

j = 0
for epoch in range(num_epochs):
  for i, (signals, labels) in enumerate(train_loader):
    
    signals = signals.type(torch.FloatTensor)
    signals = Variable(signals)
    labels = labels.type(torch.LongTensor)
    labels = Variable(torch.squeeze(labels))
    
    if use_cuda and torch.cuda.is_available():
      signals = signals.cuda()
      labels = labels.cuda()
    
    optimizer.zero_grad()
    outputs = net(signals)
    
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    
    loss_arr[j] = loss.data[0]
    j += 1
    
    if (i+1) % 31 == 0:
      print('Epoch [%d/%d], Step [%d/%d], Loss: %.4f' 
            % (epoch+1, num_epochs, i+1, len(EEGset.train_dataset)//batch_size, 
               loss.data[0]))
  
  # Training accuracy
  total = 0
  correct = 0
  for signals, labels in train_loader:
    signals = signals.type(torch.FloatTensor)
    signals = Variable(signals)
    labels = torch.squeeze(labels.type(torch.LongTensor))
    if use_cuda and torch.cuda.is_available():
      signals = signals.cuda()
      labels = labels.cuda()
    outputs = net(signals)
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum()
  training_acc_arr[epoch] = (correct/total)
  print ('Training Accuracy: %.5f' % training_acc_arr[epoch])
  
  
  # Testing accuracy
  for subject in range(9):
    total = 0
    correct = 0
    for signals, labels in test_loader[str(subject+1)]:
      signals = signals.type(torch.FloatTensor)
      signals = Variable(signals)
      labels = torch.squeeze(labels.type(torch.LongTensor))
      if use_cuda and torch.cuda.is_available():
        signals = signals.cuda()
        labels = labels.cuda()
      outputs = net(signals)
      _, predicted = torch.max(outputs.data, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum()
    testing_acc_arr[subject, epoch] = (correct/total)
  print ('Testing Accuracy: ' + str(testing_acc_arr[:, epoch]))
  print ('Testing Accuracy Average: %.5f' % np.average(testing_acc_arr[:, epoch]))

Epoch [1/20], Step [31/124], Loss: 1.5090
Epoch [1/20], Step [62/124], Loss: 1.3263
Epoch [1/20], Step [93/124], Loss: 1.3644
Epoch [1/20], Step [124/124], Loss: 1.4613
Training Accuracy: 0.46395
Testing Accuracy: [0.26 0.3  0.36 0.28 0.28 0.3  0.32 0.22 0.26]
Testing Accuracy Average: 0.28667
Epoch [2/20], Step [31/124], Loss: 1.1848
Epoch [2/20], Step [62/124], Loss: 1.3234
Epoch [2/20], Step [93/124], Loss: 1.2599
Epoch [2/20], Step [124/124], Loss: 1.2096
Training Accuracy: 0.60104
Testing Accuracy: [0.28 0.28 0.24 0.32 0.36 0.2  0.38 0.38 0.26]
Testing Accuracy Average: 0.30000
Epoch [3/20], Step [31/124], Loss: 0.6771
Epoch [3/20], Step [62/124], Loss: 0.5387
Epoch [3/20], Step [93/124], Loss: 0.6532
Epoch [3/20], Step [124/124], Loss: 0.7100
Training Accuracy: 0.89374
Testing Accuracy: [0.38 0.32 0.34 0.3  0.32 0.28 0.5  0.36 0.4 ]
Testing Accuracy Average: 0.35556
Epoch [4/20], Step [31/124], Loss: 0.3586
Epoch [4/20], Step [62/124], Loss: 0.2464
Epoch [4/20], Step [93/124], Lo

Epoch [12/20], Step [124/124], Loss: 0.0151
Training Accuracy: 0.99953
Testing Accuracy: [0.4  0.32 0.24 0.36 0.44 0.32 0.44 0.42 0.44]
Testing Accuracy Average: 0.37556
Epoch [13/20], Step [31/124], Loss: 0.0050
Epoch [13/20], Step [62/124], Loss: 0.0005
Epoch [13/20], Step [93/124], Loss: 0.0005
Epoch [13/20], Step [124/124], Loss: 0.0013
Training Accuracy: 1.00000
Testing Accuracy: [0.44 0.38 0.28 0.36 0.38 0.4  0.44 0.48 0.38]
Testing Accuracy Average: 0.39333
Epoch [14/20], Step [31/124], Loss: 0.0006
Epoch [14/20], Step [62/124], Loss: 0.0005
Epoch [14/20], Step [93/124], Loss: 0.0002
Epoch [14/20], Step [124/124], Loss: 0.0002
Training Accuracy: 1.00000
Testing Accuracy: [0.46 0.38 0.28 0.4  0.4  0.38 0.44 0.48 0.44]
Testing Accuracy Average: 0.40667
Epoch [15/20], Step [31/124], Loss: 0.0005
Epoch [15/20], Step [62/124], Loss: 0.0005
Epoch [15/20], Step [93/124], Loss: 0.0003
Epoch [15/20], Step [124/124], Loss: 0.0000
Training Accuracy: 1.00000
Testing Accuracy: [0.48 0.38 0.2

###Print MATLAB Format

In [0]:
# Print Loss
print('Loss = [', end='')
for lossval in np.nditer(loss_arr):
  print('%.5f, ' % lossval, end='')
print('];')

# Print Training Accuracy
print('Training_Accuracy = [', end='')
for acc in np.nditer(training_acc_arr):
  print('%.5f, ' % acc, end='')
print('];')

# Print Testing Accuracy
print('Testing_Accuracy = [', end='')
for subject in range(9):
  for acc in np.nditer(testing_acc_arr[subject, :]):
    print('%.5f, ' % acc, end='')
  print('; ', end='')
print('];')