In [7]:
# %load_ext autoreload
# %autoreload 2

import numpy as np
import pickle

from gpu import set_gpu
import os
import copy as cp
from sklearn.utils import shuffle

import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Conv2D, BatchNormalization
from tensorflow.keras import Model
from sklearn.preprocessing import MinMaxScaler

import process_data as prd
set_gpu()


Num GPUs Available:  1


In [2]:
sub_type = 'TR'
with open('train_data_raw_'  + sub_type + '.p', 'rb') as f:
    raw, params,feat,feat_sq = pickle.load(f)

In [4]:
## Training 
sub = 1
train_grp = 2
n_train = 'fullallmix4'
train_scale = 5
cv_type = 'manual'
scaler_load = False
feat_type = 'feat'

ind = (params[:,0] == sub) & (params[:,3] == train_grp)

x_train, x_test, x_valid, p_train, p_test, p_valid = prd.train_data_split(raw,params,sub,sub_type,dt=cv_type,load=True,train_grp=train_grp)

emg_scale = np.ones((np.size(x_train,1),1))
for i in range(np.size(x_train,1)):
    emg_scale[i] = 5/np.max(np.abs(x_train[:,i,:]))
x_train = x_train*emg_scale
x_valid = x_valid*emg_scale

x_train_noise, x_train_clean, y_train_clean = prd.add_noise(x_train, p_train, sub, n_train, train_scale)
x_valid_noise, x_valid_clean, y_valid_clean = prd.add_noise(x_train, p_train, sub, n_train, train_scale)

x_train_noise[x_train_noise > 5] = 5
x_train_noise[x_train_noise < -5] = -5
x_train_clean[x_train_clean > 5] = 5
x_train_clean[x_train_clean < -5] = -5

x_valid_noise[x_valid_noise > 5] = 5
x_valid_noise[x_valid_noise < -5] = -5
x_valid_clean[x_valid_clean > 5] = 5
x_valid_clean[x_valid_clean < -5] = -5

# shuffle data to make even batches
x_train_noise, x_train_clean, y_train_clean = shuffle(x_train_noise, x_train_clean, y_train_clean, random_state = 0)

scaler = MinMaxScaler(feature_range=(0,1))

# Extract features
x_train_noise_cnn, scaler = prd.extract_scale(x_train_noise,scaler,scaler_load,ft=feat_type,emg_scale=emg_scale) 
x_train_clean_cnn, _ = prd.extract_scale(x_train_clean,scaler,ft=feat_type,emg_scale=emg_scale)
x_valid_noise_cnn, _ = prd.extract_scale(x_valid_noise,scaler,ft=feat_type,emg_scale=emg_scale)
x_valid_clean_cnn, _ = prd.extract_scale(x_valid_clean,scaler,ft=feat_type,emg_scale=emg_scale)

# reshape data for nonconvolutional network
x_train_noise_mlp = x_train_noise_cnn.reshape(x_train_noise_cnn.shape[0],-1)
x_valid_noise_mlp = x_valid_noise_cnn.reshape(x_valid_noise_cnn.shape[0],-1)

Loading training data: traindata_manual/TR1_traindata_2.p


In [29]:
class MLPenc(Model):
  def __init__(self, latent_dim=4, name='enc'):
    super(MLPenc, self).__init__(name=name)
    self.dense1 = Dense(24, activation='relu')
    self.bn1 = BatchNormalization()
    self.dense2 = Dense(12, activation='relu')
    self.bn2 = BatchNormalization()
    self.dense3 = Dense(8, activation='relu')
    self.bn3 = BatchNormalization()
    self.latent = Dense(latent_dim, activity_regularizer=tf.keras.regularizers.l1(10e-5))
    self.bn4 = BatchNormalization()

  def call(self, x):
    x = self.dense1(x)
    x = self.bn1(x)
    x = self.dense2(x)
    x = self.bn2(x)
    x = self.dense3(x)
    x = self.bn3(x)
    x = self.latent(x)
    return self.bn4(x)

class CLF(Model):
  def __init__(self, n_class=7, name='clf'):
    super(CLF, self).__init__(name=name)
    self.dense1 = Dense(n_class, activation='softmax')

  def call(self, x):
    return self.dense1(x)
  
class MLP(Model):
  def __init__(self):
    super(MLP, self).__init__()
    self.enc = MLPenc()
    self.clf = CLF()
  
  def call(self, x):
    x = self.enc(x)
    return self.clf(x)

class CNNenc(Model):
  def __init__(self, latent_dim=4, name='enc'):
    super(CNNenc, self).__init__(name=name)
    self.conv1 = Conv2D(32,(3,2), activation='relu', strides=1, padding="same")
    self.bn1 = BatchNormalization()
    self.conv2 = Conv2D(32,3, activation='relu', strides=2, padding="same")
    self.bn2 = BatchNormalization()
    self.flatten = Flatten()
    self.dense1 = Dense(16, activation='relu')
    self.bn3 = BatchNormalization()
    self.latent = Dense(latent_dim, activity_regularizer=tf.keras.regularizers.l1(10e-5))
    self.bn4 = BatchNormalization()

  def call(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.flatten(x)
    x = self.dense1(x)
    x = self.bn3(x)
    x = self.latent(x)
    return self.bn4(x)
  
class CNN(Model):
  def __init__(self):
    super(CNN, self).__init__()
    self.enc = CNNenc()
    self.clf = CLF()
  
  def call(self, x):
    x = self.enc(x)
    return self.clf(x)

In [30]:
mlp = MLP()
cnn = CNN()
loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.CategoricalAccuracy(name='test_accuracy')

@tf.function
def train_step(x, y, mod='cnn'):
  with tf.GradientTape() as tape:
    # training=True is only needed if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
    if mod == 'cnn':
      y_out = cnn(x)
    elif mod == 'mlp':
      y_out = mlp(x)
    loss = loss_fn(y, y_out)
  if mod == 'cnn':
    gradients = tape.gradient(loss, cnn.trainable_variables)
    optimizer.apply_gradients(zip(gradients, cnn.trainable_variables))
  elif mod == 'mlp':
    gradients = tape.gradient(loss, mlp.trainable_variables)
    optimizer.apply_gradients(zip(gradients, mlp.trainable_variables))

  train_loss(loss)
  train_accuracy(y, y_out)

@tf.function
def test_step(x, y, mod='cnn'):
  # training=False is only needed if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  if mod == 'cnn':
    y_out = cnn(x)
  elif mod == 'mlp':
    y_out = mlp(x)
  t_loss = loss_fn(y, y_out)

  test_loss(t_loss)
  test_accuracy(y, y_out)

In [26]:
trainmlp_ds = tf.data.Dataset.from_tensor_slices((x_train_noise_mlp, y_train_clean)).batch(128)
testmlp_ds = tf.data.Dataset.from_tensor_slices((x_valid_noise_mlp, y_valid_clean)).batch(128)
traincnn_ds = tf.data.Dataset.from_tensor_slices((x_train_noise_cnn, y_train_clean)).batch(128)
testcnn_ds = tf.data.Dataset.from_tensor_slices((x_valid_noise_cnn, y_valid_clean)).batch(128)

In [31]:
EPOCHS = 30

for epoch in range(EPOCHS):
  # Reset the metrics at the start of the next epoch
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

  for x, y in trainmlp_ds:
    train_step(x, y, 'mlp')

  for x_test, y_test in testmlp_ds:
    test_step(x_test, y_test, 'mlp')

  print(
    f'Epoch {epoch + 1}, '
    f'Loss: {train_loss.result()}, '
    f'Accuracy: {train_accuracy.result() * 100}, '
    f'Test Loss: {test_loss.result()}, '
    f'Test Accuracy: {test_accuracy.result() * 100}'
  )



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Epoch 1, Loss: 1.9407752752304077, Accuracy: 17.341270446777344, Test Loss: 1.915333867073059, Test Accuracy: 23.265872955322266
Epoch 2, Loss: 1.8639380931854248, Accuracy: 26.789682388305664, Test Loss: 1.8328561782836914, Test Accuracy: 31.0
Epoch 3, Loss: 1.8124122619628906, Accuracy: 33.488094329833984, Test Loss: 1.7894322872161865, Test Accuracy: 36.94047546386719
Epoch 4, Loss: 1.7797069549560547, Accuracy: 37.58333206176758, Test Loss: 1.7637282609939575, Test Accuracy: 39.234127044677734
Epoch 5, Loss: 1.7565616369247437, Accuracy: 39.78174591064453, Test Loss: 1.7426151037216187, Test Accuracy: 41.599205017089844
Epoch 6, Loss: 1.74114990234375, Accuracy: 41.36507797241211, Test 

In [32]:
mlp_enc = mlp.get_layer(name='enc')
mlp_weights = mlp_enc.get_weights()

In [None]:
for epoch in range(EPOCHS):
  # Reset the metrics at the start of the next epoch
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

  for x, y in traincnn_ds:
    train_step(x, y, 'cnn')

  for x_test, y_test in testcnn_ds:
    test_step(x_test, y_test, 'cnn')

  print(
    f'Epoch {epoch + 1}, '
    f'Loss: {train_loss.result()}, '
    f'Accuracy: {train_accuracy.result() * 100}, '
    f'Test Loss: {test_loss.result()}, '
    f'Test Accuracy: {test_accuracy.result() * 100}'
  )