In [None]:
import os
os.chdir(os.path.join(os.getcwd(), '..'))
os.getcwd()

In [None]:
from src.utils import get_dataset
from src.utils import select_data

import scipy.io as sio
from scipy.signal import resample
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

from keras.utils import to_categorical

from keras.preprocessing import sequence

from keras.models import Sequential
from keras.models import Model

from keras.layers import Input
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Bidirectional
from keras.layers import Dropout
from keras.layers import Conv1D
from keras.layers import MaxPooling1D
from keras.layers import UpSampling1D
from keras.layers import Concatenate

from keras.optimizers import Adam

from keras.callbacks import EarlyStopping
from keras.callbacks import TensorBoard
from keras.callbacks import ModelCheckpoint

In [None]:
DATA_PATH = os.path.join(os.getcwd(), 'data')
os.listdir(DATA_PATH)

In [None]:
depth_path, _,inertial_path, skeleton_path,rgb_path = get_dataset(DATA_PATH)

In [None]:
activities = list(range(1,28))

In [None]:
def select_subject(d_path, subject):
    select_statement = '_s{}_'.format(subject)
    subjects = []
    for i in d_path:
        if select_statement in i:
            subjects.append(i)
    return subjects

In [None]:
def get_action_number(single_path):
    return int(single_path.split('/')[-1].split('_')[0][1:])
def get_subject_number(single_path):
    return int(single_path.split('/')[-1].split('_')[1][1:])
def get_trial_number(single_path):
    return int(single_path.split('/')[-1].split('_')[2][1:])


In [None]:
X_train = []
Y_train = []
X_test = []
Y_test = []

resample_len = 180

for path in skeleton_path:
    if get_subject_number(path) in [1,3,5,7]:
        X_train.append(path)
        Y_train.append(get_action_number(path))
    else:
        X_test.append(path)
        Y_test.append(get_action_number(path))

X_train = [sio.loadmat(x)['d_skel'] for x in X_train]
X_test = [sio.loadmat(x)['d_skel'] for x in X_test]



In [None]:
temp = []
for i in range(len(X_train)):
    temp.append(X_train[i].shape[-1])

In [None]:
np.array(temp).mean()

In [None]:
plt.hist(np.array(temp)), 

In [None]:
# train on subject 1,3,5,7
# test on subject 2,4,6,8
X_train = []
Y_train = []
X_test = []
Y_test = []

resample_len = 180

for path in skeleton_path:
    if get_subject_number(path) in [1,3,5,7]:
        X_train.append(path)
        Y_train.append(get_action_number(path))
    else:
        X_test.append(path)
        Y_test.append(get_action_number(path))

# X_train = [pad_len_inertial(sio.loadmat(x)['d_iner']) for x in X_train]
# X_test = [pad_len_inertial(sio.loadmat(x)['d_iner']) for x in X_test]

X_train = [np.reshape(resample(sio.loadmat(x)['d_skel'], resample_len, axis = -1), (60,180)) for x in X_train]
X_test = [np.reshape(resample(sio.loadmat(x)['d_skel'], resample_len, axis = -1), (60,180)) for x in X_test]


X_train = np.array(X_train)
X_test = np.array(X_test)

X_train = np.swapaxes(X_train, 1,2)
X_test = np.swapaxes(X_test, 1,2)

Y_train = to_categorical(np.array(Y_train) - 1)
Y_test = to_categorical(np.array(Y_test) - 1)


# X_train[:,:,3:] = X_train[:,:,3:]/ max(X_train[:,:,3:].max(), abs(X_train[:,:,3:].min()))
# X_train[:,:,:3] = X_train[:,:,:3]/ max(X_train[:,:,:3].max(), abs(X_train[:,:,:3].min()))

# X_test[:,:,3:] = X_test[:,:,3:]/ max(X_test[:,:,3:].max(), abs(X_test[:,:,3:].min()))
# X_test[:,:,:3] = X_test[:,:,:3]/ max(X_test[:,:,:3].max(), abs(X_test[:,:,:3].min()))

X_train.shape, Y_train.shape, X_test.shape, Y_test.shape

In [None]:
def simple_LSTM():
    np.random.seed(99)
    model = Sequential(name = 'simple_LSTM')
    model.add(LSTM(512, input_shape=(None, 60), recurrent_dropout=0.5))
    model.add(Dense(len(activities), activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=1e-3), metrics=['accuracy'])
    print(model.summary())
    return model

In [None]:
model = simple_LSTM()
# model = bidirectional_LSTM()
# model = conv_LSTM()
# model = UNet_LSTM()

In [None]:
LOG_DIR = os.path.join(os.getcwd(), 'logs')
tb = TensorBoard(LOG_DIR)

weights_dir = 'weights/' + model.name + \
            '-{epoch:02d}-{loss:.2f}.hdf5'
chkpt = ModelCheckpoint(filepath=weights_dir, monitor='loss', save_best_only=True, save_weights_only=True, mode='auto', period=1)

early_stop = EarlyStopping(monitor='val_acc', min_delta=0, patience=10)

In [None]:
history = model.fit(X_train, Y_train, epochs=20, batch_size=3, validation_data = (X_test, Y_test), callbacks=[tb, chkpt])

# Evaluation

In [None]:
train_acc = history.history['acc']
val_acc = history.history['val_acc']
plt.plot(train_acc, 'C0')
plt.plot(val_acc, 'C1')
plt.show()

In [None]:
train_acc = history.history['loss']
val_acc = history.history['val_loss']
plt.plot(train_acc, 'C0')
plt.plot(val_acc, 'C1')
plt.show()

In [None]:
model.evaluate(X_train, Y_train)

In [None]:
model.evaluate(X_test, Y_test)

In [None]:
pred_1 = np.argmax(model.predict(X_test), axis = -1)

In [None]:
class_labels = ['swipe to the left',
               'swipe to the right',
               'wave',
               'front clap',
               'throw',
               'cross arms',
               'basketball shoot',
               'draw x',
               'draw circle (CW)',
               'draw circle (CCW)',
               'draw triangle',
               'bowling',
               'boxing',
               'baseball swing',
               'tennis swing',
               'arm curl',
               'tennis serve',
               'two hand push',
               'knock door',
               'catch',
               'pick and throw',
               'jogging',
               'walking',
               'sit to stand',
               'stand to sit',
               'forward lunge',
               'squat']