# Imports and initialization

In [1]:
import pandas as pd
import numpy as np
import os
import math
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, LSTM, BatchNormalization, Bidirectional
from sklearn.utils import shuffle
from sklearn.preprocessing import MinMaxScaler, StandardScaler
physical_devices = tf.config.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], enable=True)

Choose one of the two below cells depending on the environment.

In [2]:
# dir_path = './' #uncomment if in local environment

In [3]:
#uncomment in google colab environment
from google.colab import drive
drive.mount('/content/drive') #path to root directory in Drive
dir_path = './drive/MyDrive/BAKA/' #colab
df = pd.read_csv(dir_path + 'DataCollection' + '/' + '1' + '/' + '0.txt', header=None, sep=' ')
if len(df.columns) > 31:
  df = df.drop(columns=[31])

Mounted at /content/drive


In [4]:
files = os.listdir(dir_path + 'DataCollection')
files

['4', '7', '8', '0', '5', '2', '3', '6', '1']

# Dataset Split
Split samples into Test sets and Training sets <br>
Use 20% of available samples for Testing and the remaining for Training

In [5]:
x_train = [];
y_train = [];

x_test = [];
y_test = [];
for i in files:
    samples = os.listdir(dir_path + 'DataCollection' + '/' + i)
    # num_tests = int(len(samples)/5);
    num_tests = len(samples);
    shuffle(samples, random_state = 0)
    for k in range(0, num_tests):
        df = pd.read_csv(dir_path + 'DataCollection' + '/' + i + '/' + samples[k], header=None, sep=' ')
        df = df.drop(df.index[60:])
        if len(df.columns) > 31:
          df = df.drop(columns=[31])
        if df.isnull().values.any():
          print(i, " ", samples[k])
        x_test.append(df.to_numpy())
        y_test.append(int(i));
    
    # for k in range(num_tests, len(samples)):
    #     df = pd.read_csv(dir_path + 'DataCollection' + '/' + i + '/' + samples[k], header=None, sep=' ')
    #     df = df.drop(df.index[60:])
    #     if len(df.columns) > 31:
    #       df = df.drop(columns=[31])
    #     if df.isnull().values.any():
    #       print(i, " ", samples[k])
    #     x_train.append(df.to_numpy())
    #     y_train.append(int(i));
    
    # print(len(samples), ' ', num_tests, ' ', len(samples)- num_tests)
x_train = np.array(x_train)
y_train = np.array(y_train);

# x_test = np.array(x_test)
# y_test = np.array(y_test)
print("x_train.shape: ", x_train.shape)
print("y_train.shiape: ", y_train.shape)
# print("x_test.shape: ", x_test.shape)
# print("y_test.shape: ", y_test.shape)


KeyboardInterrupt: ignored

In [None]:
print("x_train.shape: ", x_train.shape)
print("y_train.shiape: ", y_train.shape)
# print("x_test.shape: ", x_test.shape)
# print("y_test.shape: ", y_test.shape)



### Scaling and feature labels
Apply min-max scaling technique and relable features corresponding to the description in bachelor thesis's text

In [None]:
def scale_data(data, min_max_scaler):
    for i in range(len(data)):
        data[i] = min_max_scaler.transform(data[i])
    return data

Form preprocessed datasets to corresponding shapes as well as shuffle samples

In [None]:
min_max_scaler = MinMaxScaler(feature_range=(0,1))

num_instances, num_time_steps, num_features = x_train.shape
x_train = np.reshape(x_train, newshape=(-1, num_features))
x_train = min_max_scaler.fit_transform(x_train)
x_train = np.reshape(x_train, newshape=(num_instances, num_time_steps, num_features))

x_train, y_train = shuffle(x_train, y_train, random_state=0)

# num_instances, num_time_steps, num_features = x_test.shape
# x_test = np.reshape(x_test, newshape=(-1, num_features))
# x_test = min_max_scaler.transform(x_test)
# x_test = np.reshape(x_test, newshape=(num_instances, num_time_steps, num_features))

# x_test, y_test = shuffle(x_test, y_test, random_state=0)


In [None]:
print("x_train.shape: ", x_train.shape)
print("y_train.shiape: ", y_train.shape)
# print("x_test.shape: ", x_test.shape)
# print("y_test.shape: ", y_test.shape)

# Model definition

Define Two-layered bidirectional LSTM<br>
Each layer consists of `Bidirectional(LSTM)` cell with addition of `Dropout()` and `BatchNormalization()` to minimize overfitting and decrease learning time

In [None]:
model = Sequential()
model.add(Bidirectional(LSTM(units=60, return_sequences=True ,dtype='float64'),input_shape=x_train.shape[1:],dtype='float64'))
model.add(BatchNormalization())
model.add(Dropout(0.6))

model.add(Bidirectional(LSTM(units=60 ,dtype='float64') ,dtype='float64'))
model.add(BatchNormalization())
model.add(Dropout(0.6))

model.add(Dense(len(files), activation='softmax',dtype='float64'))
model.summary()

# Training

Model training using 200 epochs <br>
Marking checkpoints(models with best `val_accuracy`) to `./Checkpoints` directory

In [None]:
opt = tf.keras.optimizers.Adam(lr=0.0001, decay=1e-5)

checkpoint_filepath = dir_path + 'Checkpoints/'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=opt,
    metrics=['accuracy'],
)


gestures = model.fit(x = x_train,
            y = y_train,
            epochs=200,
            # validation_split=0.1, #split 10% of the trainning set for the validation set,
            batch_size=24,
            callbacks=[model_checkpoint_callback],
            shuffle=True
         )

In [None]:
# print("Evaluate on test data")
# results = model.evaluate(x_test, y_test, batch_size=24)
# print("test loss, test acc:", results)
# model.save(dir_path + 'Models/gestures_bidir', save_format='tf')

In [None]:
min_max_scaler.data_min_

In [None]:
min_max_scaler.data_max_

In [None]:
model.input