## Variable length sequence LSTM full script

In this script, the whole process of loading data, training and testing the LSTM is written.
The script is as generalized as it can get, since the variables of the model change based on the underlying dataset.
This has been done to allow for the addition of new actions and data without having to change the code.
Since it is a variable length sequence LSTM, ragged tensors are used.
The predicting logic is a fixed length LSTM with 8 frame length. Being it real-time, it is not possible to predict variable length sequence, since one does not now when the sequence is completed. A challenge will be finding the sweet spot in terms of sequence length (for now it is 8 frames).

In [None]:
#importing needed libraries

import numpy as np
import tensorflow as tf
import cv2
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, InputLayer
from tensorflow.keras.callbacks import TensorBoard
from scipy.spatial.transform import Rotation as R

In [None]:
#setting up the mediapipe model

mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

#defining a few functions to detect, draw and extract keypoints

def mediapipe_detection(image, model):
    #this function takes in the image and the model and returns the prediction results
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

def draw_landmarks(image, results, drawing_spec_circle, drawing_spec_line):
    #this function takes in the image and results and draws mediapipe landmarks on the picture
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, landmark_drawing_spec=drawing_spec_circle, connection_drawing_spec=drawing_spec_line)
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, landmark_drawing_spec=drawing_spec_circle, connection_drawing_spec=drawing_spec_line)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, landmark_drawing_spec=drawing_spec_circle, connection_drawing_spec=drawing_spec_line)
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, landmark_drawing_spec=drawing_spec_circle, connection_drawing_spec=drawing_spec_line)

def extract_keypoints(results):
    #this function takes in the prediction results and returns the array with the extracted keypoints to be saved
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

def visualize_probabilities(res, actions, input_frame, colors):
    #this function takes in the action probabilities and actions, and draws the coloured probability rectangles on the picture
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*200), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    return output_frame

In [None]:
#preparing the action categories and paths

actions = []

DATA_PATH = os.path.join(os.getcwd(), "DATA")

for action in os.listdir(DATA_PATH):
    actions.append(action)
    
actions = np.array(actions)

label_map = {label:num for num, label in enumerate(actions)}

labels, temp_points = [], []

In [None]:
#a temporary solution to the initializing ragged tensors problem is to initialize with a standard value and then deleting it at
#the end. I didn't find any better solution since documentation on ragged tensors is not so clean
temporary_ragged = tf.ragged.constant([[[3, 1, 4, 1], [5, 9, 2], [6], []]], tf.double)

#for every action, loops and loads data for training
for action in actions: 
    for num_sequence in os.listdir(os.path.join(DATA_PATH, action)):
        temp_points = []
        for point in os.listdir(os.path.join(DATA_PATH, action, num_sequence)):
            res = np.load(os.path.join(DATA_PATH, action, num_sequence, point), allow_pickle = True)
            temp_points.append(res)       
            
        temporary_ragged = tf.concat([temporary_ragged, tf.expand_dims(np.array(temp_points), axis = 0)], axis = 0)
        labels.append(label_map[action])

#skips the first tensor, that is the one we used to intialize the variable
dataset = temporary_ragged[1:]

In [None]:
#preparing train and test sets (not really the cleanest way, ragged tensors don't seem too friendly to handle).
#In the future I should add data augmentation as well, but it will probably be not clean to do that with ragged tensors

y = to_categorical(labels).astype(int)
train_ind = int(dataset.shape[0]*0.9)
X_train = dataset[:train_ind]
X_test = dataset[train_ind:]
y_train = y[:train_ind]
y_test = y[train_ind:]
print(y.shape)

In [None]:
#initializing the Keras model. Different models have been tested, for now with the dataset we have this has performed the best
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=[None, 1662], ragged=True),
    tf.keras.layers.LSTM(64, activation = 'tanh', dropout=0.2, return_sequences = True),
    tf.keras.layers.LSTM(128, activation = 'tanh', dropout=0.2, return_sequences = True),
    tf.keras.layers.LSTM(64, activation = 'tanh', dropout=0.2),
    tf.keras.layers.Dense(64, activation = 'relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(32, activation = 'relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(10, activation = 'relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(len(actions), activation = 'softmax'),
])

In [None]:
#defining compiler and callback to stop training and 97% accuracy
model.compile(optimizer = 'Adam', loss = 'categorical_crossentropy', metrics = ['categorical_accuracy'])

ACCURACY_THRESHOLD = 0.97

class myCallback(tf.keras.callbacks.Callback): 
    def on_epoch_end(self, epoch, logs={}): 
        if(logs.get('categorical_accuracy') > ACCURACY_THRESHOLD):   
            print("\nReached %2.2f%% accuracy, so stopping training!!" %(ACCURACY_THRESHOLD*100))   
            self.model.stop_training = True

In [None]:
callb = myCallback()

#training the model
model.fit(X_train, y_train, epochs = 4000, callbacks = callb)

In [None]:
#using confusion matrix to determine how well the model performs on the test set (false positive and negatives)
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis = 1).tolist()
yhat = np.argmax(yhat, axis = 1).tolist()
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
#printing out the accuracy
accuracy_score(ytrue, yhat)

In [None]:
#saving the weights
model.save('model_weights\\5_actions_696(to_test).h5')

In [None]:
#loading the weights of a previously saved model
model.load_weights('model_weights\\5_actions_741(to_test).h5')

In [None]:
#testing in real-time

colors = [(0,0,0), (255,255,255), (0,255,0), (255,0,0), (0,0,255), (125, 125, 125), (125, 0, 200)]
sequence = []
sentence = []
predictions = [0]
threshold = 0.95

#choosing the camera to use
cap = cv2.VideoCapture(0)

#drawing specs for face dots
drawing_spec_circle = mp_drawing.DrawingSpec()
drawing_spec_circle.circle_radius = 1
drawing_spec_circle.thickness = 1
drawing_spec_circle.color = (0,0,255)

prev_frame_time = 0
new_frame_time = 0

#drawing specs for face connections
drawing_spec_line = mp_drawing.DrawingSpec()
drawing_spec_line.thickness = 1

#starting the loop
with mp_holistic.Holistic(min_detection_confidence=0.8) as holistic:
    while cap.isOpened():

        ret,frame = cap.read()

        #make detections
        image, results = mediapipe_detection(frame, holistic)
        
        #count and show fps
        new_frame_time = time.time()
        fps = 1/(new_frame_time-prev_frame_time)
        prev_frame_time = new_frame_time
        fps = int(fps)
        fps = str(fps)
        font = cv2.FONT_HERSHEY_SIMPLEX
        cv2.putText(image, fps, (550, 120), font, 3, (100, 255, 0), 3, cv2.LINE_AA)
        
        #drawing landmarks
        draw_landmarks(image, results, drawing_spec_circle, drawing_spec_line)
        
        #extracting keypoints and feeding them to the LSTM for prediction
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-10:]
        
        if len(sequence) == 10:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            image = visualize_probabilities(res, actions, image, colors)
        
        #this would be the rendering logic if one would want to write the action. The first line implies that the action gets
        #written only if predicted for 10 consecutive frames, to give an higher stability to the model. This logic is not used
        #now, if one would like to implement that, just add 2 lines of code to display a text box with the sentence in
            if np.unique(predictions[-10:])[0] == np.argmax(res):
                if res[np.argmax(res)] > threshold:
                    if len(sentence) > 0:
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])    

            if len(sentence) > 5:
                sentence = sentence[-5:]

        #showing image with predictions
        cv2.imshow('the feed', image)

        #close loop
        if cv2.waitKey(10) & 0XFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()