<a href="https://colab.research.google.com/github/megsdata/sEMG_SupervisedLearning/blob/main/23Jul10_update.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
import pandas as pd
import numpy as np
import pathlib
import os
import datetime
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dropout
from tensorflow.keras.regularizers import l2
import weightwatcher as ww
import warnings
import keras_tuner as kt
warnings.simplefilter(action='ignore', category=RuntimeWarning)
np.set_printoptions(precision=4)
print(tf.version.VERSION)
%load_ext tensorboard

2.12.0
The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [None]:
!pip install WeightWatcher
!pip install keras-tuner --upgrade
!pip install -q -U keras-tuner

In [3]:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(ww.__name__)
logger.setLevel(logging.INFO)
checkpoint_path = "logs/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)

In [4]:
from google.colab import drive
drive.mount('/MSC_THESIS/')
%cd /MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/23Jul09_Envelope_DL_Data
import pathlib
all_files =  sorted(str(p) for p in pathlib.Path('/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/23Jul09_Envelope_DL_Data/').glob("*.csv"))
#print(all_files)
df = pd.read_csv("/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/23Jul09_Envelope_DL_Data/_11_MATRIX.csv")
for i in range(len(all_files)-1):
  temp = pd.read_csv(all_files[i])
  df = pd.concat([df, temp], axis=0)
  #print(len(df.index))
df['ContractionDuration'] = df['Contraction End Time (s)'] - df['Contraction Start Time(s)']

Mounted at /MSC_THESIS/
/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/23Jul09_Envelope_DL_Data


In [5]:
def data_prep(my_state):
  #select features
  X = df[['Trial Type', 'ContractionDuration', 'Pulse Width', \
          'Contraction Start Time(s)', 'Contraction End Time (s)', 'ContractionNo', \
          'Sex', 'Age', 'Height', 'Weight', 'Previous Injury', 'Fibula Length', 'Shank Girth', \
          'Time(s)', 'EMG(mv)', 'MMG_x', 'MMG_y', 'MMG_z', 'ContractionDuration'
          ]]
  y = df['Pain Label']
  #create data split
  from sklearn.model_selection import train_test_split
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=my_state)
  X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=my_state)
  print("Number of samples in the training set: ", len(X_train))
  print("Number of samples in the test set: ", len(X_test))
  print("Number of samples in the validation set: ", len(X_val))
  #scale the data
  from sklearn.preprocessing import StandardScaler
  scaler = StandardScaler()
  X_train_scaled = scaler.fit_transform(X_train)
  X_test_scaled = scaler.transform(X_test)
  X_val_scaled = scaler.transform(X_val)
  return X_train_scaled, X_test_scaled, X_val_scaled, y_train, y_test, y_val

In [6]:
def create_FCNN_model(my_state, num_feat):
  tf.random.set_seed(my_state)
  model = tf.keras.Sequential([
      tf.keras.layers.Conv1D(100, num_feat, activation='relu', name="convLayer", input_shape=(num_feat, 1)),
      tf.keras.layers.Dense(1024, activation='relu', name="relu1Layer"),
      #tf.keras.layers.Conv1D(100, num_feat, activation='relu', name="2ndconvLayer", input_shape=(None, 1, 100000)),
      tf.keras.layers.Dense(256, activation='relu', name="relu2Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu3Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu4Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu5Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu6Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu7Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu8Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu9Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu10Layer"),
      tf.keras.layers.Dense(1, activation='sigmoid', name="sigmoidLayer")
      #tf.keras.layers.Dense(1, activation='relu', name="sigmoidLayer")
  ])

  model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
    metrics=[
        tf.keras.metrics.BinaryAccuracy(name='accuracy'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
        tf.keras.metrics.BinaryCrossentropy(name='binary cross entropy'),
        tf.keras.metrics.BinaryIoU(name='binary IoU')
    ]
  )
  return model


def create_CNN_model(my_state, num_feat):
  tf.random.set_seed(my_state)
  model = tf.keras.Sequential([
      tf.keras.layers.Conv1D(100, num_feat, activation='relu', name="convLayer", input_shape=(num_feat, 1)),
      tf.keras.layers.Dense(1024, activation='relu', name="relu1Layer"),
      #tf.keras.layers.Conv1D(100, num_feat, activation='relu', name="2ndconvLayer", input_shape=(None, 1, 100000)),
      tf.keras.layers.Dense(256, activation='relu', name="relu2Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu3Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu4Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu5Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu6Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu7Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu8Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu9Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu10Layer"),
      tf.keras.layers.Dense(1, activation='sigmoid', name="sigmoidLayer")
      #tf.keras.layers.Dense(1, activation='relu', name="sigmoidLayer")
  ])

  model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
    metrics=[
        tf.keras.metrics.BinaryAccuracy(name='accuracy'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
        tf.keras.metrics.BinaryCrossentropy(name='binary cross entropy'),
        tf.keras.metrics.BinaryIoU(name='binary IoU')
    ]
  )
  return model

def create_LSTM_model(my_state, num_feat):
  tf.random.set_seed(my_state)
  model = tf.keras.Sequential([
      #tf.keras.layers.LSTM(128, input_shape=(num_feat, 1)),
      tf.keras.layers.LSTM(256, input_shape=(num_feat, 1)),
      tf.keras.layers.Dense(128, activation='relu', name="relu1Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu2Layer"),
      tf.keras.layers.Dense(256, activation='relu', name="relu3Layer"),
      tf.keras.layers.Dense(1, activation='sigmoid', name="sigmoidLayer")
  ])
  model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
    metrics=[
        tf.keras.metrics.BinaryAccuracy(name='accuracy'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
        tf.keras.metrics.BinaryCrossentropy(name='binary cross entropy'),
        tf.keras.metrics.BinaryIoU(name='binary IoU')
    ]
  )
  return model

def create_regularized_LSTM_model(my_state, factor, rate, num_feat):
  tf.random.set_seed(my_state)
  model = tf.keras.Sequential([
      #Dropout(rate),
      tf.keras.layers.LSTM(256, input_shape=(num_feat, 1)),
      #Dropout(rate),
      #tf.keras.layers.LSTM(128, input_shape=(None, 1, 256)),
      Dropout(rate),
      tf.keras.layers.Dense(128, kernel_regularizer=l2(factor), activation='relu', name="relu1Layer"),
      Dropout(rate),
      tf.keras.layers.Dense(256, kernel_regularizer=l2(factor), activation='relu', name="relu2Layer"),
      Dropout(rate),
      tf.keras.layers.Dense(256, kernel_regularizer=l2(factor), activation='relu', name="relu3Layer"),
      Dropout(rate),
      tf.keras.layers.Dense(256, kernel_regularizer=l2(factor), activation='relu', name="relu4Layer"),
      tf.keras.layers.Dense(1, activation='sigmoid', name="sigmoidLayer")
      #tf.keras.layers.Dense(1, activation='relu', name="reluLayer")
  ])

  model.compile(
      loss=tf.keras.losses.binary_crossentropy,
      optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
      metrics=[
          tf.keras.metrics.BinaryAccuracy(name='accuracy'),
          tf.keras.metrics.Precision(name='precision'),
          tf.keras.metrics.Recall(name='recall'),
          tf.keras.metrics.BinaryCrossentropy(name='binary cross entropy'),
          tf.keras.metrics.BinaryIoU(name='binary IoU')
      ]
  )
  return model

def train_model(X_train_scaled, y_train, num_epochs):
  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=checkpoint_dir, histogram_freq=1)
  print("Training the neural network.. ")
  history = model.fit(X_train_scaled, y_train, batch_size=32, epochs=num_epochs, callbacks=[cp_callback])
  model.summary()
  return history

def visualize_performance(num_epochs, history):
  import matplotlib.pyplot as plt
  from matplotlib import rcParams
  rcParams['figure.figsize'] = (18, 8)
  rcParams['axes.spines.top'] = False
  rcParams['axes.spines.right'] = False
  plt.plot(
      np.arange(1, num_epochs+1),
      history.history['loss'], label='Loss'
  )
  plt.plot(
      np.arange(1, num_epochs+1),
      history.history['accuracy'], label='Accuracy'
  )
  plt.plot(
      np.arange(1, num_epochs+1),
      history.history['precision'], label='Precision'
  )
  plt.plot(
      np.arange(1, num_epochs+1),
      history.history['recall'], label='Recall'
  )
  plt.title('Evaluation metrics', size=20)
  plt.xlabel('Epoch', size=14)
  plt.legend();

def evaluate_performance(model, X_test_scaled, y_test):
  predictions = model.predict(X_test_scaled)
  prediction_classes = [
      1 if prob > 0.5 else 0 for prob in np.ravel(predictions)
  ]
  prediction_classes = [
      1 if prob > 0.5 else 0 for prob in np.ravel(predictions)
  ]
  from sklearn.metrics import accuracy_score, precision_score, recall_score

  print(f'Accuracy: {accuracy_score(y_test, prediction_classes):.4f}')
  print(f'Precision: {precision_score(y_test, prediction_classes):.4f}')
  print(f'Recall: {recall_score(y_test, prediction_classes):.4f}')

In [20]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)

    outputs = layers.Dense(n_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)

In [22]:
my_state = 22
num_epochs = 50
X_train_scaled, X_test_scaled, X_val_scaled, y_train, y_test, y_val = data_prep(my_state)
# Create a callback that saves the model's weights
checkpoint_path = "/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/logs/23Jul10_update/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)

Number of samples in the training set:  3240141
Number of samples in the test set:  1012545
Number of samples in the validation set:  810036


In [23]:
input_shape = X_train_scaled.shape[1:]

model = build_model(
    input_shape,
    head_size=256,
    num_heads=4,
    ff_dim=4,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)

model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    metrics=["sparse_categorical_accuracy"],
)
model.summary()

callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]



IndexError: ignored

In [None]:
model.fit(
    x_train_scaled,
    y_train,
    validation_split=0.2,
    epochs=200,
    batch_size=64,
    callbacks=callbacks,
)

model.evaluate(X_test, y_test, verbose=1)

In [None]:
def model_builder(hp):
  model = keras.Sequential()
  model.add(keras.layers.Flatten(input_shape=(28, 28)))

  # Tune the number of units in the first Dense layer
  # Choose an optimal value between 32-512
  hp_units = hp.Int('units', min_value=32, max_value=512, step=32)
  model.add(keras.layers.Dense(units=hp_units, activation='relu'))
  model.add(keras.layers.Dense(10))

  # Tune the learning rate for the optimizer
  # Choose an optimal value from 0.01, 0.001, or 0.0001
  hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

  model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['accuracy'])

  return model

tuner = kt.Hyperband(model_builder,
                     objective='val_accuracy',
                     max_epochs=10,
                     factor=3,
                     directory='my_dir',
                     project_name='intro_to_kt')

In [21]:
my_state = 22
num_epochs = 50
X_train_scaled, X_test_scaled, X_val_scaled, y_train, y_test, y_val = data_prep(my_state)
# Create a callback that saves the model's weights
checkpoint_path = "/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/logs/23Jul10_update/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)
#model = create_regularized_LSTM_model(my_state, 1e-5, 0.3, 19) #model creation and compilation
#model.load_weights('/MSC_THESIS/MyDrive/School - Ryerson Graduate/MSC_THESIS/logs/23Jul09_Regularized_LSTM/cp-0097.ckpt')


Number of samples in the training set:  3240141
Number of samples in the test set:  1012545
Number of samples in the validation set:  810036


In [None]:
def create_model():
    input_layer = keras.Input(shape=(512, 1))

    x = layers.Conv1D(
        filters=32, kernel_size=3, strides=2, activation="relu", padding="same"
    )(input_layer)
    x = layers.BatchNormalization()(x)

    x = layers.Conv1D(
        filters=64, kernel_size=3, strides=2, activation="relu", padding="same"
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Conv1D(
        filters=128, kernel_size=5, strides=2, activation="relu", padding="same"
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Conv1D(
        filters=256, kernel_size=5, strides=2, activation="relu", padding="same"
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Conv1D(
        filters=512, kernel_size=7, strides=2, activation="relu", padding="same"
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Conv1D(
        filters=1024, kernel_size=7, strides=2, activation="relu", padding="same"
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Dropout(0.2)(x)

    x = layers.Flatten()(x)

    x = layers.Dense(4096, activation="relu")(x)
    x = layers.Dropout(0.2)(x)

    x = layers.Dense(
        2048, activation="relu", kernel_regularizer=keras.regularizers.L2()
    )(x)
    x = layers.Dropout(0.2)(x)

    x = layers.Dense(
        1024, activation="relu", kernel_regularizer=keras.regularizers.L2()
    )(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(
        128, activation="relu", kernel_regularizer=keras.regularizers.L2()
    )(x)
    output_layer = layers.Dense(num_classes, activation="softmax")(x)

    return keras.Model(inputs=input_layer, outputs=output_layer)

In [17]:
import kerastuner as kt

tuner = kt.Hyperband(
    create_regularized_LSTM_model(my_state, 1e-5, 0.3, 19),
    objective='val_accuracy',
    max_epochs=30,
    hyperband_iterations=2)
tuner.to_tensor()
layer(tuner)

TypeError: ignored

In [None]:
#visualize_performance(num_epochs, history)
evaluate_performance(model, X_test_scaled, y_test)

In [None]:
best_model = tuner.get_best_models(1)[0]