In [1]:
## Upload dataset
from google.colab import files
upload = files.upload()

Saving A01E2Class.mat to A01E2Class (1).mat


KeyboardInterrupt: ignored

In [1]:
import tensorflow as tf
import numpy as np
import scipy.io
!pip install mne
import mne



In [0]:
# Define moduels

def load_data(train):
    if train == True:
        data = np.array([scipy.io.loadmat("./A01TClass1")["Class1"],
                       scipy.io.loadmat("./A01TClass2")["Class2"]])
        label = data[:, -1, 0, :] #(2 classes, 72 trials)
        data = data[:, :-1, :, :] #(2 classes, 22 channels, 700 timepoints, 72 trials)

        return data, label
      
    else:
        data = np.array(scipy.io.loadmat("./A01E2Class")["data"]) #(23, 700, 144)
        label = data[-1, 0, :] #(144 trials)
        data = data[:-1, :, :] #(22 channels, 700 timepoints, 144 trials)
        return data, label
      
def get_params(name, shape, n_filter):
    w = tf.get_variable(name=name + "w", shape=shape, initializer=tf.contrib.layers.xavier_initializer_conv2d(), dtype = tf.float32)
    b = tf.Variable(name=name + "b", initial_value=tf.constant(0.1, shape=[n_filter], dtype=tf.float32))
    return w, b

def conv(input, w, b):
    conv = tf.nn.conv2d(input=input, filter=w, strides=(1, 1, 1, 1), padding="VALID")
    return tf.nn.bias_add(conv, b)

def max_pool(input):
    return tf.nn.max_pool(value=input, ksize=(1, 1, 3, 1), strides=(1, 1, 3, 1), padding="VALID")
    
def fclayer(input, name, num_output):
    num_input = input.get_shape().as_list()[-1]
    w, b = get_params(name=name, shape=(num_input, num_output), n_filter=num_output)
    logit = tf.nn.bias_add(tf.matmul(input, w), b)
    return logit, tf.nn.sigmoid(logit)
  
def batchnorm(input):
  return tf.nn.batch_normalization(x=input, mean=0, variance=1, offset=None, scale=None, variance_epsilon=1e-8)

def dropout(input, keep_prob):
  return tf.nn.dropout(x=input, keep_prob=keep_prob)

In [0]:
## Assemble DeepConvNet

def convnet(input, keep = 0.5, reuse=None):
    with tf.variable_scope("convnet") as scope:
      if reuse:
        scope.reuse_variables()
        keep = 1.0 # For test session
        
      # 1st Temporal Convolution, Linear activation
      w1, b1 = get_params(name="conv1", shape=(1, 10, 1, 25), n_filter=25)
      conv1 = conv(input=input, w=w1, b=b1)
      
      # 2nd Spatial Convolution, ELU activation, Max pooling
      w2, b2 = get_params(name="conv2", shape=(22, 1, 25, 25), n_filter=25)
      conv2 = conv(input=conv1, w=w2, b=b2)
      conv2 = batchnorm(conv2)
      conv2 = tf.nn.elu(conv2)
      conv2 = max_pool(conv2)
      conv2 = dropout(conv2, keep)

      # 3rd Temporal Convolution, ELU activation, Max pooling
      w3, b3 = get_params(name="conv3", shape=(1, 10, 25, 50), n_filter=50)
      conv3 = conv(input=conv2, w=w3, b=b3)
      conv3 = batchnorm(conv3)
      conv3 = tf.nn.elu(conv3)
      conv3 = max_pool(conv3)
      conv3 = dropout(conv3, keep)

      # 4th Temporal Convolution, ELU activation
      w4, b4 = get_params(name="deep4", shape=(1, 10, 50, 100), n_filter=100)
      conv4 = conv(input=conv3, w=w4, b=b4)
      conv4 = batchnorm(conv4)
      conv4 = tf.nn.elu(conv4)
      conv4 = max_pool(conv4)
      conv4 = dropout(conv4, keep)

      # 5th Temporal Convolution, ELU activation
      w5, b5 = get_params(name="deep5", shape=(1, 10, 100, 200), n_filter=200)
      conv5 = conv(input=conv4, w=w5, b=b5)
      conv5 = batchnorm(conv5)
      conv5 = tf.nn.elu(conv5)
      conv5 = max_pool(conv5)
      conv5 = dropout(conv5, keep)

      batch = conv5.get_shape().as_list()[0]
      conv5 = tf.reshape(conv5, shape=(batch, -1))
      logit, output = fclayer(input=conv5, name="output", num_output=1)
      output = tf.argmax
#       output = tf.argmax(tf.sigmoid(logit), -1)
    return tf.squeeze(logit), output, w2, conv1

In [4]:
## Load dataset
train_data, train_label = load_data(train=True) #(2 classes, 22 channels, 700 timepoints, 72 trials), (2 classes, 72 trials)
train_label = train_label - 1 ##############3

print("Train data shape:", train_data.shape)
print("Train label shape:", train_label.shape)

Train data shape: (2, 22, 700, 72)
Train label shape: (2, 72)


In [0]:
## Placeholding
batch_size = 32
num_channel = 22
window_size = 512
rest_point = train_data.shape[-2] - window_size + 1
num_class = train_data.shape[0]
num_trials = train_data.shape[-1]
num_timepoints = train_data.shape[1]
total_epoch = 5
X = tf.placeholder(dtype=tf.float32, shape=(batch_size, num_channel, window_size, 1))
Y = tf.placeholder(dtype=tf.float32, shape=(batch_size))
# For test
X_test = tf.placeholder(dtype=tf.float32, shape=(rest_point , num_channel, window_size, 1))

In [0]:
## Calculate logit and loss
# Extract logit and output
logit, output, _, _ = convnet(input=X)
# For test
_, pred, w, conv_input = convnet(input=X_test, reuse=True)

# # One-hot encoding
# label = tf.one_hot(tf.cast(Y, tf.int64), depth=2)
# Calculate loss
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=Y, logits=logit))

In [0]:
## Call parameters and build an optimizer
# Call parameters
parameters = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="convnet")

# Build an Adam-optimizer
optimizer = tf.train.AdamOptimizer(learning_rate=1e-5).minimize(loss, var_list=parameters)

In [0]:
## Open Tensorflow session
sess = tf.Session()
# Initialize all variables
sess.run(tf.global_variables_initializer())

In [9]:
## Model training
print("Begin Training")

total_batch = int((rest_point * num_class * num_trials)/batch_size) # int(27216/32) = 850
for epoch in range(total_epoch):
  # Randomize the training dataset
  rand_idx = np.random.permutation(rest_point * num_class * num_trials)
  for batch in range(total_batch):
    batch_x = np.empty(shape=(batch_size, num_channel, window_size, 1))
    batch_y = np.empty(shape=batch_size)
    for pos in range(batch_size):
      position = np.unravel_index(indices=rand_idx[batch * batch_size + pos], dims=(num_class, rest_point, num_trials))
      batch_x[pos, :, :, :] = np.expand_dims(train_data[position[0], :, position[1]:position[1]+window_size, position[2]], axis=-1)
      batch_y[pos] = train_label[position[0], position[2]]
    
  # Feed dictionary
    _, loss_, output_ = sess.run([optimizer, loss, output], feed_dict={X:batch_x, Y:batch_y})
   
  print(batch_y) #####
  print(output_)
  print("%dth Epoch Training Loss: %f" %(epoch + 1, loss_))


Begin Training
[0. 1. 0. 0. 1. 0. 0. 0. 1. 1. 1. 1. 0. 1. 0. 1. 0. 1. 1. 0. 0. 0. 1. 0.
 0. 0. 0. 1. 0. 0. 0. 1.]
[[0.46312714]
 [0.3238025 ]
 [0.49918225]
 [0.75741595]
 [0.28158665]
 [0.7940784 ]
 [0.66617525]
 [0.5963476 ]
 [0.6430868 ]
 [0.7185918 ]
 [0.6454005 ]
 [0.29727548]
 [0.7970066 ]
 [0.3170322 ]
 [0.33914486]
 [0.39120576]
 [0.36655012]
 [0.85369337]
 [0.4804779 ]
 [0.7621475 ]
 [0.5107533 ]
 [0.52139986]
 [0.58759683]
 [0.46178007]
 [0.2084667 ]
 [0.6584383 ]
 [0.22862521]
 [0.5425312 ]
 [0.3098023 ]
 [0.6387328 ]
 [0.58469194]
 [0.46962637]]
1th Epoch Training Loss: 0.806807
[0. 0. 1. 0. 1. 1. 0. 1. 0. 1. 1. 1. 1. 0. 1. 0. 1. 0. 1. 1. 1. 0. 1. 1.
 1. 0. 0. 1. 0. 0. 1. 1.]
[[0.31113926]
 [0.29082894]
 [0.62426704]
 [0.85455877]
 [0.28456503]
 [0.4120317 ]
 [0.62218577]
 [0.41007864]
 [0.7463413 ]
 [0.19125758]
 [0.18485707]
 [0.4394604 ]
 [0.4265429 ]
 [0.7143759 ]
 [0.33601648]
 [0.3487909 ]
 [0.73380786]
 [0.7172403 ]
 [0.5419623 ]
 [0.2253439 ]
 [0.5902801 ]
 [0.593806

In [10]:
## Test
# Load test dataset
test_data, test_label = load_data(train=False) #(22 channels, 700 timepoints, 144 trials), #(144 trials)
test_label = test_label - 1 ##################
prediction = np.zeros(shape=(num_trials))
ground_truth = np.zeros(shape=(num_trials))
for trials in range(num_trials):
  batch_x_test = np.empty(shape=(rest_point, num_channel, window_size, 1))
  for i in range(rest_point):
    batch_x_test[i, :, :, :] = np.expand_dims(test_data[:, i:i+window_size, trials], axis=-1)
  pred_ = sess.run([pred], feed_dict={X_test:batch_x_test})
  ground_truth[trials] = test_label[trials]
  predict = np.squeeze(np.asarray(pred_)) # For activation pattern
  pred_ = np.argmax(np.bincount(np.squeeze(np.asarray(pred_))))
  prediction[trials] = pred_
    
# Calculate accuracy
from sklearn.metrics import accuracy_score
print("2 class classification accuracy: %f" % (accuracy_score(y_true=ground_truth, y_pred=prediction)))

2 class classification accuracy: 0.513889
