# 1. Keypoints using MP Holistic

In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp


mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities


def mediapipe_detection(img, mp_model):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # BGR → RGB
    img.flags.writeable = False                  # Disable writeability of img
    results = mp_model.process(img)                 # Obtain landmarks from frame
    img.flags.writeable = True                   # Re-enable writeability of img
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # RGB → BGR
    return img, results


def draw_landmarks(img, results):
    mp_drawing.draw_landmarks(img, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)               # Draw Left hand connections
    mp_drawing.draw_landmarks(img, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)              # Draw Right hand connections
    mp_drawing.draw_landmarks(img, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,                # Draw Face connections
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),      # Change color so it differs
                              mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)) 
    mp_drawing.draw_landmarks(img, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)                    # Draw Pose connections


def extract_keypoints(results):
    # Extract Pose Landmarks
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    # Extract Face Landmarks
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    # Extract left and right Hand Landmarks separately
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

# 2. Setup Folders for Collection

In [2]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('data')

# Actions we want to detect
actions = np.array(['hello', 'thanks', 'iloveyou', 'help', 'please', 'father', 'mother']) 

# 30 videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30

In [None]:
# Create empty folders for each action and within them empty folders for each (in total 30) sequence
# Inside each of these folders, we will locate the 30 .npy files corresponding to each frame
for action in actions: 
    # dirmax = np.max(np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int))
    dirmax = -1
    for sequence in range(1,no_sequences+1):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(dirmax+sequence)))
        except:
            pass

# 3. Collect Keypoint Values for Training and Testing

In [None]:
cap = cv2.VideoCapture(0)   # This will allow us to access the camera in real time
breakk = 0                  # This mousketool will be useful later for ending the loop

# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # Loop through actions
    for action in actions:
        # Loop through videos
        for sequence in range(no_sequences):
            # Loop through video length (30 frames)
            for frame_num in range(sequence_length):

                # Read feed
                ret, frame = cap.read()

                # Extract landmarks
                img, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
                draw_landmarks(img, results)
                
                if frame_num == 0: 
                    cv2.putText(img, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(img, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', img)
                    cv2.waitKey(2000)
                else: 
                    cv2.putText(img, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', img)
                
                # Export keypoints as .npy
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence + 0), str(frame_num))  # Just change the + 10
                np.save(npy_path, keypoints)

                # Break with "Esc"
                if cv2.waitKey(10) & 0xFF == 27:
                    breakk = 1
                    break
            
            if breakk ==1:
                break
        
        if breakk ==1:
            break
                    
    cap.release()
    cv2.destroyAllWindows()

# 3.1. Data Augmentation by Mirroring w.r.t X Axis

In [None]:
def mirr_kps(keypoints_vector):
    # Split the keypoints vector into two parts: one with 4 arguments (x, y, z, visibility) and one with 3 arguments (x, y, z)
    keypoints_4d = keypoints_vector[:33 * 4]
    keypoints_3d = keypoints_vector[33 * 4:]
    
    # Reshape into a 2D array with four columns (x, y, z, visibility) or 3 columns (x, y, z)
    keypoints_4d_2d = keypoints_4d.reshape(-1, 4)
    keypoints_3d_2d = keypoints_3d.reshape(-1, 3)

    # Find indices where x-coordinate is not equal to 0
    nonzero_x_indices_4d = np.where(keypoints_4d_2d[:, 0] != 0)[0]
    nonzero_x_indices_3d = np.where(keypoints_3d_2d[:, 0] != 0)[0]
    
    # Negate the x-coordinates of the keypoints in both parts where x is not 0
    keypoints_4d_2d[nonzero_x_indices_4d, 0] = 1-keypoints_4d_2d[nonzero_x_indices_4d, 0]
    keypoints_3d_2d[nonzero_x_indices_3d, 0] = 1-keypoints_3d_2d[nonzero_x_indices_3d, 0]
    
    # Reshape both parts back into 1D vectors
    mirrored_keypoints_4d = keypoints_4d_2d.flatten()
    mirrored_keypoints_3d = keypoints_3d_2d.flatten()
    
    # Concatenate the two parts back into a single 1D vector
    mirrored_keypoints_vector = np.concatenate([mirrored_keypoints_4d, mirrored_keypoints_3d])
    
    return mirrored_keypoints_vector

In [None]:
# Mirror datasets and add folders (each sequence with the resulting 30 frames) in the action folder
for action in actions:
    dirmax = np.max(np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int))
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence+dirmax+1)))
        except:
            pass
        for frame_num in range(sequence_length):
            kps = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            mirr = mirr_kps(kps)
            npy_path = os.path.join(DATA_PATH, action, str(sequence+30), str(frame_num))  # Just change the + 30
            np.save(npy_path, mirr)

# 4. Preprocess Data and Create Labels and Features

In [3]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

label_mapa = {label:num for num, label in enumerate(actions)}

sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        # Join all frames per sequence in "window"
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_mapa[action])

X = np.array(sequences)
y = to_categorical(labels).astype(int)
# Split dataset in train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

# 5. Build and Train LSTM Neural Network

In [4]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.optimizers import Adam

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [5]:
model = Sequential()
# We need return sequences in the first LSTM layers because that info will input the following LSTM
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
# Last LSTM layer won't need a return sequence since it goes directly to fc layers
model.add(LSTM(64, return_sequences=False, activation='relu'))
# We reduce the width of the layer to finally the output representing each class (action)
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

optimizer = Adam(lr=5e-6)  # You can experiment with different learning rates, we use the other predetermined values
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# del model

In [69]:
model.fit(X_train, y_train, epochs=150, callbacks=[tb_callback])

Epoch 1/150
Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Epoch 8/150
Epoch 9/150
Epoch 10/150
Epoch 11/150
Epoch 12/150
Epoch 13/150
Epoch 14/150
Epoch 15/150
Epoch 16/150
Epoch 17/150
Epoch 18/150
Epoch 19/150
Epoch 20/150
Epoch 21/150
Epoch 22/150
Epoch 23/150
Epoch 24/150
Epoch 25/150
Epoch 26/150
Epoch 27/150
Epoch 28/150
Epoch 29/150
Epoch 30/150
Epoch 31/150
Epoch 32/150
Epoch 33/150
Epoch 34/150
Epoch 35/150
Epoch 36/150
Epoch 37/150
Epoch 38/150
Epoch 39/150
Epoch 40/150
Epoch 41/150
Epoch 42/150
Epoch 43/150
Epoch 44/150
Epoch 45/150
Epoch 46/150
Epoch 47/150
Epoch 48/150
Epoch 49/150
Epoch 50/150
Epoch 51/150
Epoch 52/150
Epoch 53/150
Epoch 54/150
Epoch 55/150
Epoch 56/150
Epoch 57/150
Epoch 58/150
Epoch 59/150
Epoch 60/150
Epoch 61/150
Epoch 62/150
Epoch 63/150
Epoch 64/150
Epoch 65/150
Epoch 66/150
Epoch 67/150
Epoch 68/150
Epoch 69/150
Epoch 70/150
Epoch 71/150
Epoch 72/150
Epoch 73/150
Epoch 74/150
Epoch 75/150
Epoch 76/150
Epoch 77/150
Epoch 78

<tensorflow.python.keras.callbacks.History at 0x161c1217730>

In [17]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm (LSTM)                  (None, 30, 64)            442112    
_________________________________________________________________
lstm_1 (LSTM)                (None, 30, 128)           98816     
_________________________________________________________________
lstm_2 (LSTM)                (None, 64)                49408     
_________________________________________________________________
dense (Dense)                (None, 64)                4160      
_________________________________________________________________
dense_1 (Dense)              (None, 32)                2080      
_________________________________________________________________
dense_2 (Dense)              (None, 7)                 231       
Total params: 596,807
Trainable params: 596,807
Non-trainable params: 0
__________________________________________________

In [None]:
%tensorboard --log_dir logs

# 6. Save / Load Weights

In [None]:
model.save('actions_model.h5')

In [6]:
model.load_weights('actions_model.h5')

# 7. Evaluation using Confusion Matrix and Accuracy

In [7]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score



yhat = model.predict(X_test)                # Can replace with X_train

ytrue = np.argmax(y_test, axis=1).tolist()  # Can replace with y_train
yhat = np.argmax(yhat, axis=1).tolist()

multilabel_confusion_matrix(ytrue, yhat)

array([[[35,  1],
        [ 0,  6]],

       [[38,  1],
        [ 0,  3]],

       [[34,  0],
        [ 0,  8]],

       [[31,  0],
        [ 0, 11]],

       [[37,  0],
        [ 0,  5]],

       [[37,  0],
        [ 1,  4]],

       [[38,  0],
        [ 1,  3]]], dtype=int64)

In [8]:
accuracy_score(ytrue, yhat)

0.9523809523809523

# 8. Test in Real Time

In [16]:
from scipy import stats

# Different colors for each action
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255), (0, 255, 255), (128, 128, 128)]
# Visualize the confidence of an action being displayed in a colored box that is filled at 100% and is empty at 0%
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        # print((prob))
        cv2.rectangle(output_frame, (0,60+num*40), (120, 90+num*40), colors[num], 1)
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*120), 90+num*40), colors[num], -1) # .astype(np.int64)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [17]:
# Detection variables
sequence = []
prediction = ''
predictions = []
# We can use a high threshold since our network's accuracy is high
threshold = 0.8

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        img, results = mediapipe_detection(frame, holistic)
        # print(results)
        
        # Draw landmarks
        draw_landmarks(img, results)
        
        # Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        # Create a buffer with the last 30 frames so that we can input it to the model
        sequence = sequence[-30:]
        
        if len(sequence) == 30:     # Waiting for the frame count to reach 30
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            # print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    prediction = actions[np.argmax(res)]

            # Visualize probabilities in filling % of boxes
            img = prob_viz(res, actions, img, colors)

        # Show Predicted action    
        cv2.rectangle(img, (0,0), (640, 40), (245, 117, 16), -1)
        textsize = cv2.getTextSize(prediction, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)[0]
        cv2.putText(img, prediction, (np.int((img.shape[1]- textsize[0])/2),30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', img)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == 27:
            break
    cap.release()
    cv2.destroyAllWindows()

In [38]:
cap.release()
cv2.destroyAllWindows()