In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import re 
import datetime

from tensorflow import keras
from tensorflow.keras import layers, regularizers, optimizers
from tensorflow.keras.preprocessing import sequence
from keras.models import Sequential
from keras.layers import Activation, Dense, Dropout, LSTM, Softmax, Bidirectional, Conv1D, Conv2D, MaxPooling2D, Flatten, Reshape, LayerNormalization, BatchNormalization
from keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
print(f"{tf.__version__=}")
print(f"{np.__version__=}")
print("nvidia-smi")
!nvidia-smi
print("nvcc version")
!nvcc --version
print("nvinfer version")
!dpkg -l | grep nvinfer
print("TensorRT version")
!dpkg -l | grep TensorRT

# Load & Prepare Data

In [None]:
GESTRUES = ["STATIC", "SLIDE_UP", "SLIDE_DOWN", "SLIDE_LEFT", "SLIDE_RIGHT", "RELEASE", "GRASP", "HIGHLIGHT", "ON_YES", "OFF_NO", "NONE",]
NUM_CLASSES = len(GESTRUES)

In [None]:
def load_data(filepath='/home/ubuntu/FYP-ROS/rosbag/bag/info.txt'):
    global GESTRUES
    dataset = {}
    with open(filepath, 'r') as f:
        for line in f:
            line = line.strip()
            for gesture in GESTRUES:
                if line.startswith(f'{gesture}:'):
                    key = gesture
                    dataset[key] = []
                    break
            else:
                if line != '':
                    dataset[key].append(line.split()[-1])
    return dataset

DATA_BUF_LEN = 100
def get_data_by_id(data_id, augment):
    data_root = "/home/ubuntu/FYP-ROS/rosbag/data"

    x_raw = pd.read_csv(f"{data_root}/data_clean_augment/{data_id}_data{augment}.csv")
    x_raw = x_raw.drop([
        "timestamp", 
        "imu0_to_imu1_rotation_x", "imu0_to_imu1_rotation_y", "imu0_to_imu1_rotation_z", "imu0_to_imu1_rotation_w",
        "imu0_to_imu2_rotation_x", "imu0_to_imu2_rotation_y", "imu0_to_imu2_rotation_z", "imu0_to_imu2_rotation_w"], axis=1)

    x_raw = x_raw.to_numpy()
    if x_raw.shape[0] < DATA_BUF_LEN:
        last_row = np.repeat([x_raw[-1]], repeats=DATA_BUF_LEN-x_raw.shape[0], axis=0)
        x_raw = np.vstack([x_raw, last_row])
    else:
        x_raw = x_raw[:DATA_BUF_LEN]

    y_label = pd.read_csv(f"{data_root}/label_clean_augment/{data_id}_label{augment}.csv")['label']
    return x_raw, y_label

X = []
y = []
metadata = load_data()
for gesture in GESTRUES[:7]:
    print(f"{gesture}: {len(metadata[gesture])}")
    for data_id in metadata[gesture]:
        x_raw, y_label = get_data_by_id(data_id, augment="")
        X.append(x_raw)
        y.append(y_label)

        x_jitter, y_label = get_data_by_id(data_id, augment="_jitter")
        X.append(x_jitter)
        y.append(y_label)
        
        x_time_warp, y_label = get_data_by_id(data_id, augment="_time_warp")
        X.append(x_time_warp)
        y.append(y_label)

        x_time_warp2, y_label = get_data_by_id(data_id, augment="_time_warp2")
        X.append(x_time_warp2)
        y.append(y_label)

X = np.array(X)
y = tf.keras.utils.to_categorical(y, num_classes=NUM_CLASSES)
FEATURES_DIM = X.shape[2]

# print dimension
print(f"{X.shape=}")
print(f"{y.shape=}")

print(f"{X[1]=}")
print(f"{y[100]=}")

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

In [None]:
# plot
y_sum = np.sum(y, axis=0)
plt.figure(figsize=(10, 5))
plt.bar(GESTRUES, y_sum)
plt.title("Number of samples per gesture")
plt.xlabel("Gesture")
plt.xticks(fontsize=7)
plt.ylabel("Number of samples")
for i, v in enumerate(y_sum):
    plt.text(i, v+1, str(v), ha='center', fontsize=10)

y_test_sum = np.sum(y_test, axis=0)
plt.bar(GESTRUES, y_test_sum)
for i, v in enumerate(y_test_sum):
    plt.text(i, v+1, str(v), ha='center', fontsize=10)

plt.show()

# Build Model

In [None]:
kernal_reg = 5e-3
bias_reg = 1e-4

inputs = keras.Input((DATA_BUF_LEN, FEATURES_DIM))
x1 = Reshape(target_shape=(DATA_BUF_LEN, FEATURES_DIM ,1))(inputs)
x1 = Conv2D(32, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(kernal_reg), bias_regularizer=regularizers.l2(bias_reg), name='Conv2DL1')(x1)
x1 = MaxPooling2D(pool_size=(2, 2), name='MaxPoolL1')(x1)
x1 = LayerNormalization(name='LayerNormL1')(x1)
x1 = Flatten(name='FlattenL1')(x1)

x2 = Bidirectional(LSTM(units=64, kernel_regularizer=regularizers.L2(kernal_reg), bias_regularizer=regularizers.L2(bias_reg), return_sequences=True), name='BiLSTM1')(inputs)
x2 = LayerNormalization(name='LayerNormR1')(x2)
x2 = Bidirectional(LSTM(units=64, kernel_regularizer=regularizers.L2(kernal_reg), bias_regularizer=regularizers.L2(bias_reg), return_sequences=True), name='BiLSTM2')(x2)
x2 = LayerNormalization(name='LayerNormR2')(x2)
x2 = Flatten(name='FlattenR1')(x2)

x = keras.layers.concatenate([x1, x2], axis=-1)
x = LayerNormalization(name='LayerNormC1')(x)
output = Dense(NUM_CLASSES,activation='softmax', kernel_regularizer=regularizers.L2(kernal_reg), bias_regularizer=regularizers.L2(bias_reg), name='Dense3')(x)

model = keras.Model(inputs=inputs, outputs=output, name="mnist_model")

model.compile(loss='categorical_crossentropy',
              optimizer=optimizers.Adam(learning_rate=0.001),
              metrics=['accuracy'])
print(model.summary())

In [None]:
# model.load_weights("/home/ubuntu/FYP-ROS/weights/model_lstm_weights-2023_2_24-14_51-acc0.96.h5")
# model = keras.models.load_model("/home/ubuntu/FYP-ROS/weights/model_lstm-2023_2_25-14_38-acc0.94")

# Training

In [None]:
ACCURACY_THRESHOLD = 0.99
class accuryThreasholdCallback(tf.keras.callbacks.Callback): 
    def on_epoch_end(self, epoch, logs={}): 
        if(logs.get('accuracy') > ACCURACY_THRESHOLD and logs.get('val_accuracy') > ACCURACY_THRESHOLD):   
            print("\nReached %2.2f%% accuracy, so stopping training!!" %(ACCURACY_THRESHOLD*100))   
            self.model.stop_training = True

accuracy_threashold_monitor = accuryThreasholdCallback()
early_stopping_monitor = EarlyStopping(patience=3)
history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=50, batch_size=32, callbacks=[accuracy_threashold_monitor])

In [None]:
print(history.history.keys())

fig, ax = plt.subplots(1, 2, figsize=(15, 5))
ax[0].plot(history.history['accuracy'])
ax[0].plot(history.history['val_accuracy'])
ax[0].set_ylabel('accuracy')
ax[0].set_xlabel('epoch')
ax[0].legend(['train', 'val'], loc='upper right')

ax[1].plot(history.history['loss'])
ax[1].plot(history.history['val_loss'])
ax[1].set_ylabel('loss')
ax[1].set_xlabel('epoch')
ax[1].legend(['train', 'val'], loc='upper right')
fig.suptitle('Model training history')
plt.show()

# Evaluate

In [None]:
scores = model.evaluate(X_test, y_test, verbose=0)
print(f"{scores=}")

In [None]:
y_pred = model.predict(X_test)

# print(f"{np.argmax(y_pred, axis=1)=}")
# print(f"{np.argmax(y_test, axis=1)=}")

error_data_index = []
for i, (yp, yt) in enumerate(zip(np.argmax(y_pred, axis=1), np.argmax(y_test, axis=1))):
    if yp != yt:
        print(f"index: {i}, truth: {GESTRUES[yt]}, predicted: {GESTRUES[yp]}")
        error_data_index.append(i)

In [None]:
y_test_not_onehot = np.argmax(y_test, axis=1)
y_pred_not_onehot = np.argmax(y_pred, axis=1)
plt.figure(figsize=(10, 10))
matrix_confusion = confusion_matrix(y_pred_not_onehot, y_test_not_onehot)
sns.heatmap(matrix_confusion, square=True, annot=False, cmap='Blues', fmt='d')

for i in range(matrix_confusion.shape[0]):
    for j in range(matrix_confusion.shape[1]):
        plt.text(j+0.5, i+0.5, f'{matrix_confusion[i, j]}/{np.sum(matrix_confusion[i, :])}', 
                 horizontalalignment='center', verticalalignment='center', fontsize=7)

plt.xlabel('ground truth')
plt.ylabel('predictions')
plt.xticks(np.arange(0.5, matrix_confusion.shape[0]+0.5), GESTRUES, rotation=45, fontsize=7)
plt.yticks(np.arange(0.5, matrix_confusion.shape[0]+0.5), GESTRUES, rotation=45, fontsize=7)
plt.show()

In [None]:
print(f"total error data size: {len(error_data_index)}")
index = error_data_index[1]

true_label = np.argmax(y_test[index])
predicted_class = np.argmax(y_pred[index])

print(f"{GESTRUES[true_label]=}")
print(f"{GESTRUES[predicted_class]=}")

fig, axs = plt.subplots(3, 6, figsize=(20, 10))

raw_data = X_test[index]
acc_axes = axs[:, :3].ravel()
vel_axes = axs[:, 3:].ravel()
acc_data = np.concatenate([raw_data[:,1:4], raw_data[:,11:14], raw_data[:,21:24]], axis=1)
vel_data = np.concatenate([raw_data[:,4:7], raw_data[:,14:17], raw_data[:,24:27]], axis=1)
acc_titles = [f'Imu{i}_acc_{xyz}' for i in range(3) for xyz in ['x', 'y', 'z']]
vel_titles = [f'Imu{i}_vel_{xyz}' for i in range(3) for xyz in ['x', 'y', 'z']]

for ax, data, title in zip(acc_axes, acc_data.T, acc_titles):
    ax.plot(data)
    ax.set_title(title)
    ax.set_ylim([-1.5 * 9.8, 1.5 * 9.8])
    
for ax, data, title in zip(vel_axes, vel_data.T, vel_titles):
    ax.plot(data)
    ax.set_title(title)
    ax.set_ylim([-5, 5])

fig.suptitle(f'labeled: {GESTRUES[true_label]}, predicted: {GESTRUES[predicted_class]}')
plt.plot()

# Save model

In [None]:
t = datetime.datetime.now()
t_str = f"{t.year}_{t.month}_{t.day}-{t.hour}_{t.minute}"
acc_str = f"{scores[1]:.2f}" 
# model.save_weights(f'/home/ubuntu/FYP-ROS/weights/model_cnn_weights-{t_str}-acc{acc_str}.h5')
# model.save(f'/home/ubuntu/FYP-ROS/weights/model_cnn-{t_str}-acc{acc_str}.h5')

# Explaination 

In [None]:
# !pip install shap
# import shap

In [None]:
# explainer = shap.Explainer(model)
# shap_values = explainer(X_train.transpose(0, 2, 1))
# shap.summary_plot(shap_values, X_train.transpose(0, 2, 1))