In [1]:
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
    pass

TensorFlow 2.x selected.


In [2]:
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf

from tensorflow.keras.layers import Dense, Flatten, Conv2D, ConvLSTM2D, BatchNormalization, Dropout, MaxPooling2D, Bidirectional
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import numpy as np
import pylab as plt
import matplotlib.pylab as plt 
import os
import time
from tabulate import tabulate
import pandas as pd
%matplotlib inline
print(tf.__version__)
print(tf.keras.__version__)

2.0.0
2.2.4-tf


In [3]:
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
data = np.loadtxt('/content/drive/My Drive/thesis/CoST.csv',delimiter=',',skiprows=1)

In [6]:
gestures_name = ["grab", "hit", "massage", "pat", "pinch", "poke", "press", "rub", "scratch", "slap", "squeeze", "stroke", "tap", "tickle"]

subjects_train = [2, 3, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 20, 21, 22, 24, 25, 27, 28, 29, 30]
subjects_test = [5, 10, 18, 23, 31]
subjects_val = [15, 19, 26,  1,  4]

max_pressure = 0
for row in data: 
    for each in row[4:]:
        if each > max_pressure:
            max_pressure = each
print(max_pressure)

990.0


In [0]:
def subsampling_generator(frame_length, subject):
    all_frames = []
    y = []
    subjects = []

    for each in data: 
        if each[0] == subject: 
            subjects.append(each)

    for n in range(len(gestures_name)):
        gestures = []
        for each in subjects:
            if each[2] == n + 1: 
                gestures.append(each)

        #rescale each channel by dividing with 990 (max number of pressure)
        #so the range of the pressure is between 0 and 1
        rescaling = []
        for each in gestures:
            rescaling.append(each[4:]/max_pressure)
            

        #To find out index number of each gesture
        borders_index = []
        m = 0
        for each in gestures:
            if each[3] != 1:
                m += 1
            else:
                borders_index.append(int(m))
                m += 1
        borders_index = borders_index[1:]
        borders_index.append(len(gestures))
        

        #To find out length of each gesture
        borders_length = []
        m = 0
        for each in gestures:
            if each[3] != 1:
                m += 1
            else:
                borders_length.append(int(m))
                m = 1
        borders_length.append(int(m)) #to save the last gestures
        borders_length = borders_length[1:] #remove 0 from the list because 0 is included in the list

        #create subsampling
        j = 0
        k = 0
        frames = []
        for border_length in borders_length:
            if border_length > frame_length: #If one gesture has frames more than the set number are arranged as a subsample. 
                iteration = int(border_length/frame_length)
                for i in range(iteration):
                    a = rescaling[j: j + frame_length]
                    for each in a:
                        frames.append(each)
                    for each in a:
                        all_frames.append(each)
                    j = j + frame_length
                b = rescaling[j: borders_index[k]]
                padded = frame_length - len(b)
                if padded != frame_length:
                    c = np.zeros((padded, 64))
                    b = np.vstack((b, c))
                for each in b:
                    frames.append(each)
                for each in b:
                        all_frames.append(each)
                j = borders_index[k]
            elif border_length < frame_length:  #If one gesture has fewer frames, then it is padded by zeros. 
                b = rescaling[j: borders_index[k]]
                padded = frame_length - len(b)
                c = np.zeros((padded, 64))
                b = np.vstack((b, c))
                for each in b:
                    frames.append(each)
                for each in b:
                    all_frames.append(each)
                j = borders_index[k]
            else:   #If one gesture has same length as frame 
                a = rescaling[j: borders_index[k]]
                for each in a:
                    frames.append(each)
                for each in a:
                    all_frames.append(each)
                j = borders_index[k]
            k = k + 1    


        #create label
        for i in range(len(frames)):
            y.append(n)

    return all_frames, y

In [0]:
#reshape the dataset to the input shape that can be accepted by neural network
def create_dataset(subjects, frame_length):
    x_dataset = []
    y_dataset = []
    for subject in subjects:
        channels = []
        y = []
        channels, y = subsampling_generator(frame_length, subject)
        x_dataset.append(channels)
        y_dataset.append(y)

    new_x_dataset = []
    for x in x_dataset:
        for each in x:
              new_x_dataset.append(each)

    new_y_dataset = []
    for x in y_dataset:
        for each in x:
              new_y_dataset.append(each)

    new_x_dataset = np.array(new_x_dataset)
    new_y_dataset = np.array(new_y_dataset)

    xdataset = new_x_dataset.reshape(-1, frame_length, 8, 8, 1)
    new_y_dataset = new_y_dataset.reshape(-1, frame_length, 1)

    ydataset = []
    for each in new_y_dataset:
          ydataset.append(each[1])
    ydataset = np.array(ydataset)

    #create label
    ydataset = to_categorical(ydataset, num_classes = 14)
    ydataset = np.array(ydataset)

    return xdataset, ydataset


# 1 LAYER CONVLSTM

In [0]:
frames = [50, 60, 70, 80, 90, 100]
filters = [64, 32, 16, 8]
results = []
results_dir = '/content/drive/My Drive/'
for frame_length in frames:
    for num_filter in filters:
        print("Number of filters:", num_filter, "Number of Frame Length: ", frame_length)
        xtrain, ytrain = create_dataset(subjects_train, frame_length)
        xval, yval = create_dataset(subjects_val, frame_length)

        print(len(xtrain), len(ytrain), len(xval), len(yval))
        model = Sequential()
        model.add(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', recurrent_dropout=0.5, input_shape=(frame_length, 8, 8, 1)))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Flatten())
        model.add(Dropout(0.5, seed=1))
        model.add(Dense(14, activation='softmax'))
        optimizer = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        model.compile(optimizer=optimizer,
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])

        from google.colab import drive
        drive.mount('/content/drive')

        doc_name = str(frame_length) + "_1convlstm_{epoch:03d}.h5"
        print("Start frame_length = ", frame_length)
        checkpoint = ModelCheckpoint(os.path.join(results_dir, doc_name), monitor='loss', verbose=1, mode='auto', period=5)
        history = model.fit(xtrain, ytrain, epochs=100, verbose=0, batch_size=64, callbacks=[checkpoint], validation_data = (xval, yval))
        # convert the history.history dict to a pandas DataFrame:     
        hist_df = pd.DataFrame(history.history) 
        #save to csv: 
        doc_name = str(frame_length) + '_1convlstm_history.csv'
        with open(os.path.join(results_dir, doc_name), mode='w') as f:
            hist_df.to_csv(f)
print("Finish")

# 2 LAYER CONVLSTM

In [0]:
frames = [50, 60, 70, 80, 90, 100]
filters = [64, 32, 16, 8]
results = []
results_dir = '/content/drive/My Drive/'
for frame_length in frames:
    for num_filter in filters:
        print("Number of filters:", num_filter, "Number of Frame Length: ", frame_length)
        xtrain, ytrain = create_dataset(subjects_train, frame_length)
        xval, yval = create_dataset(subjects_val, frame_length)

        print(len(xtrain), len(ytrain), len(xval), len(yval))
        model = Sequential()
        model.add(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', return_sequences = True, recurrent_dropout=0.5, input_shape=(frame_length, 8, 8, 1)))
        model.add(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', recurrent_dropout=0.5, input_shape=(frame_length, 8, 8, 1)))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Flatten())
        model.add(Dropout(0.5, seed=1))
        model.add(Dense(14, activation='softmax'))
        optimizer = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        model.compile(optimizer=optimizer,
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])

        from google.colab import drive
        drive.mount('/content/drive')

        doc_name = str(frame_length) + "_2convlstm_{epoch:03d}.h5"
        print("Start frame_length = ", frame_length)
        checkpoint = ModelCheckpoint(os.path.join(results_dir, doc_name), monitor='loss', verbose=1, mode='auto', period=5)
        history = model.fit(xtrain, ytrain, epochs=100, verbose=0, batch_size=64, callbacks=[checkpoint], validation_data = (xval, yval))
        # convert the history.history dict to a pandas DataFrame:     
        hist_df = pd.DataFrame(history.history) 
        #save to csv: 
        doc_name = str(frame_length) + '_2convlstm_history.csv'
        with open(os.path.join(results_dir, doc_name), mode='w') as f:
            hist_df.to_csv(f)
print("Finish")

# 1 LAYER OF BICONVLSTM

In [0]:
frames = [50, 60, 70, 80, 90, 100]
filters = [64, 32, 16, 8]
results = []
results_dir = '/content/drive/My Drive/'
for frame_length in frames:
    for num_filter in filters:
        print("Number of filters:", num_filter, "Number of Frame Length: ", frame_length)
        xtrain, ytrain = create_dataset(subjects_train, frame_length)
        xval, yval = create_dataset(subjects_val, frame_length)

        print(len(xtrain), len(ytrain), len(xval), len(yval))
        model = Sequential()
        model.add(Bidirectional(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', recurrent_dropout=0.5), input_shape=(frame_length, 8, 8, 1)))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Flatten())
        model.add(Dropout(0.5, seed=1))
        model.add(Dense(14, activation='softmax'))
        optimizer = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        model.compile(optimizer=optimizer,
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])


        doc_name = str(frame_length) + "_1biconvlstm_{epoch:03d}.h5"
        print("Start frame_length = ", frame_length)
        checkpoint = ModelCheckpoint(os.path.join(results_dir, doc_name), monitor='loss', verbose=1, mode='auto', period=5)
        history = model.fit(xtrain, ytrain, epochs=100, verbose=0, batch_size=64, callbacks=[checkpoint], validation_data = (xval, yval))
        # convert the history.history dict to a pandas DataFrame:     
        hist_df = pd.DataFrame(history.history) 
        #save to csv: 
        doc_name = str(frame_length) + '_1biconvlstm_history.csv'
        with open(os.path.join(results_dir, doc_name), mode='w') as f:
            hist_df.to_csv(f)
print("Finish")

# 2 LAYERS OF BICONVLSTM

In [0]:
frames = [50, 60, 70, 80, 90, 100]
filters = [64, 32, 16, 8]
results = []
results_dir = '/content/drive/My Drive/'
for frame_length in frames:
    for num_filter in filters:
        print("Number of filters:", num_filter, "Number of Frame Length: ", frame_length)
        xtrain, ytrain = create_dataset(subjects_train, frame_length)
        xval, yval = create_dataset(subjects_val, frame_length)

        print(len(xtrain), len(ytrain), len(xval), len(yval))
        model = Sequential()
        model.add(Bidirectional(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', return_sequences = True, recurrent_dropout=0.5), input_shape=(frame_length, 8, 8, 1)))
        model.add(Bidirectional(ConvLSTM2D(filters=num_filter, kernel_size=(3, 3),
                     padding='same', recurrent_dropout=0.5), input_shape=(frame_length, 8, 8, 1)))
        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Flatten())
        model.add(Dropout(0.5, seed=1))
        model.add(Dense(14, activation='softmax'))
        optimizer = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
        model.compile(optimizer=optimizer,
                    loss='categorical_crossentropy',
                    metrics=['accuracy'])


        doc_name = str(frame_length) + "_2biconvlstm_{epoch:03d}.h5"
        print("Start frame_length = ", frame_length)
        checkpoint = ModelCheckpoint(os.path.join(results_dir, doc_name), monitor='loss', verbose=1, mode='auto', period=5)
        history = model.fit(xtrain, ytrain, epochs=100, verbose=0, batch_size=64, callbacks=[checkpoint], validation_data = (xval, yval))
        # convert the history.history dictionary to a pandas DataFrame:     
        hist_df = pd.DataFrame(history.history) 
        #save to csv: 
        doc_name = str(frame_length) + '_2biconvlstm_history.csv'
        with open(os.path.join(results_dir, doc_name), mode='w') as f:
            hist_df.to_csv(f)
print("Finish")

# ACCURACY ON TEST SET

In [0]:
frame_length = 100
results = []
results_dir = "C:/Jupyter/cost5/990/"
print("frame length", frame_length)
results = []
xtest, ytest = create_dataset(subjects_test, frame_length)
print(len(xtest), len(ytest))
doc_name = "100_2biconvlstm32_85.h5"
model = load_model(os.path.join(results_dir, doc_name))
score = model.evaluate(xtest, ytest, verbose=0)
results.append((frame_length, score[1], score[0]))
doc_name = str(frame_length) +'results.csv'
np.savetxt(os.path.join(results_dir, doc_name), results, delimiter=',', fmt='%s')
print("Finish")

# CREATE CONFUSION MATRIX

In [0]:
def create_dataset_no_categorical(subjects, frame_length):
    x_dataset = []
    y_dataset = []
    for subject in subjects:
        channels = []
        y = []
        channels, y = subsampling_generator(frame_length, subject)
        x_dataset.append(channels)
        y_dataset.append(y)

    new_x_dataset = []
    for x in x_dataset:
        for each in x:
              new_x_dataset.append(each)

    new_y_dataset = []
    for x in y_dataset:
        for each in x:
              new_y_dataset.append(each)

    new_x_dataset = np.array(new_x_dataset)
    new_y_dataset = np.array(new_y_dataset)

    xdataset = new_x_dataset.reshape(-1, frame_length, 8, 8, 1)
    new_y_dataset = new_y_dataset.reshape(-1, frame_length, 1)

    ydataset = []
    for each in new_y_dataset:
          ydataset.append(each[1])
    ydataset = np.array(ydataset)

    return xdataset, ydataset


The script below is adapted from:
https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py


In [0]:
print(__doc__)

import numpy as np
import matplotlib.pyplot as plt

from sklearn import svm, datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix


def plot_confusion_matrix(y_true, y_pred,
                          normalize=False,
                          title=None,
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if not title:
        if normalize:
            title = 'Normalized confusion matrix'
        else:
            title = 'Confusion matrix, without normalization'

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    # Only use the labels that appear in the data
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        cm = cm * 100
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    np.savetxt("confmat.csv", cm, delimiter=',', fmt='%s')
    
    fig, ax = plt.subplots(figsize=(10, 10))
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    # We want to show all ticks...
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           title=title,
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    return ax

In [0]:
frame_length = 100
results_dir = "/content/drive/My Drive/"
doc_name = "100_2biconvlstm32_85.h5"
xtest, ytest = create_dataset_no_categorical(subjects_test, frame_length)
model = load_model(os.path.join(results_dir, doc_name))
ypred = model.predict_classes(xtest)
ytest = np.array(ytest)
ypred = np.array(ypred)
print(len(ytest), len(ypred))
np.set_printoptions(precision=2)


# Plot normalized confusion matrix
plot_confusion_matrix(ytest, ypred, normalize=True,
                      title='Normalized confusion matrix')