# Set up Mediapipe and camera functions

The goal of this project is to make a simple PacMan game that can be played by using hand gestures, employing computer vision and machine learning for recognition and prediction. To that end, this notebook will serve to (1) collect the necessary data, and (2) make and test the machine learning model.

First, it is necessary to set up the camera that can accept a live video feed, and implement hand tracking to enable gesture recognition. I will use OpenCV to open and process the camera feed, and Google's MediaPipe to enable hand tracking.

In [1]:
# import all necessary libraries

import os
import numpy as np
import cv2
from matplotlib import pyplot as plt
import mediapipe as mp
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Conv2D, MaxPooling2D, Input, Flatten, GlobalAveragePooling2D
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.regularizers import l2
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

First of all, we need to set up MediaPipe. Since I wanted to experiment with different gesture-tracking setups, I decided to use the holistic module which can detect face, pose, and hands. I assumed the pose especially would be valuable, as my original plan was to use an LSTM model to track longer sequences where the hands' positions relative to the body would be important. In the end, I only needed the hands but decided to keep the holistic model in case I want to employ more poses or gestures in the future.

We also need to set up the drawing utilities that will enable us to overlay the keypoints that MediaPipie detects over the live video feed.

In [8]:
# holistic model
mp_holistic = mp.solutions.holistic

# drawing utilities
mp_drawing = mp.solutions.drawing_utils

In [13]:
def mp_detection(img, model):
    '''
    Process an image with the passed MediaPipe model and track keypoints specified by the model.
    
        Parameters:
            img (numpy.ndarray): an array representing an image
            model (MediaPipe Model): a custom MediaPipe class representing a model
            
        Returns:
            img (numpy.ndarray): an array representing the video frame equal to the input img
            results (NamedTuple): a NamedTuple containing the face, pose, and left and right hand landmarks detected 
                by MediaPipe's Holistic model
    '''
    
    # cv2 captures video in BGR - have to convert to RGB
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    # stop writing to the image
    img.flags.writeable = False
    
    # make a prediction
    results = model.process(img)
    
    # start writing to the image again
    img.flags.writeable = True
    
    # convert the image back to BGR
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    
    return img, results

In [14]:
def draw_landmarks(img, results):
    '''
    Draw keypoints detected by MediaPipe on top of the passed image.
    
        Parameters:
            img (numpy.ndarray): an array representing an image
            results (NamedTuple): a NamedTuple containing the face, pose, and left and right hand landmarks detected 
                by MediaPipe's Holistic model
                
        Returns:
            void
    '''

    # draw left and right hand connections
    mp_drawing.draw_landmarks(img, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(0, 128, 0), thickness=2, circle_radius=2),
                             mp_drawing.DrawingSpec(color=(0, 128, 0), thickness=2, circle_radius=2)
                             )
    mp_drawing.draw_landmarks(img, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
                             mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2)
                             )

Having prepared the MediaPipe helper functions, we can now access the webcam through OpenCV and make sure that everything is set up correctly.

In [12]:
# access webcam
capture = cv2.VideoCapture(0)

# access mediapipe holistic model
# set initial detection confidence and subsequent tracking confidence
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while capture.isOpened():

        # read the video
        ret, frame = capture.read()

        # make detections with the holistic model
        image, results = mp_detection(frame, holistic)
        
        # draw landmarks on the live feed
        draw_landmarks(image, results)

        # show the video in frame
        cv2.imshow('OpenCV Feed', image)

        # escape the video feed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    capture.release()
    cv2.destroyAllWindows()

In [7]:
print('Nr of keypoints in pose: ' + str(len(results.pose_landmarks.landmark)))
print('Nr of keypoints in face: ' + str(len(results.face_landmarks.landmark)))
print('Nr of keypoints in hand: ' + str(len(results.left_hand_landmarks.landmark)))

Nr of keypoints in pose: 33


AttributeError: 'NoneType' object has no attribute 'landmark'

# Collect outputs

Now that everything is set up, we can start collecting data for the gesture recognition model. For that, we first need to set up a function that extracts only the keypoints that we are interested in from the model.

In [15]:
def get_single_hand_keypoints(results):
    '''
    Get the keypoints that represent a hand detected by MediaPipe. Only read one hand at a time, and prioritise right hand.
    
        Parameters:
            results (NamedTuple): a NamedTuple containing the face, pose, and left and right hand landmarks detected 
                by MediaPipe's Holistic model
        
        Returns:
            keypoints (numpy.ndarray): an array of shape (21 keypoints, 3 dimensions) representing the keypoints in 
                the hand detected by MediaPipe's Holistic model
    '''
    
    # get one hand's keypoint if the other hand is not in frame at all
    if results.left_hand_landmarks == None and results.right_hand_landmarks != None:
        keypoints = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark])
    elif results.right_hand_landmarks == None and results.left_hand_landmarks != None:
        keypoints = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark])
        
    # get one hand's keypoints if the other hand is in frame only partially
    elif (results.right_hand_landmarks != None and results.left_hand_landmarks != None) and (np.count_nonzero(results.right_hand_landmarks.landmark) >= np.count_nonzero(results.left_hand_landmarks.landmark)):
        keypoints = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark])
    elif (results.right_hand_landmarks != None and results.left_hand_landmarks != None) and (np.count_nonzero(results.left_hand_landmarks.landmark) > np.count_nonzero(results.right_hand_landmarks.landmark)):
        keypoints = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark])
        
    # return an array full of zeros if no hands are in the frame at all
    else:
        keypoints = np.zeros(21*3)
    
    return keypoints

Now we can make a folder to store the keypoint arrays and start data collection for each action we want to detect. Since we need to detect gestures to play PacMan, we need four different gestures representing directions right, left, up and down.

In [7]:
# path for exported arrays
PATH = os.path.join('data-single-hand')

# setting up collecting actions
actions = np.array(['right', 'left', 'up', 'down'])

# seq_nr videos to collect data from for each action
seq_nr = 500

# each video is seq_len frames long
seq_len = 1

In [183]:
# set up folders for each video
for action in actions:
    for seq in range(seq_nr*2):
        try:
            os.makedirs(os.path.join(PATH, action, str(seq)))
        except:
            pass

In [187]:
# collect the actual training videos for individual actions

# access webcam
capture = cv2.VideoCapture(0)

# access mediapipe holistic model
# set initial detection confidence and subsequent tracking confidence
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # loop through the 4 actions
    for action in actions:
        
        if action != actions[0]:
            # 5 second break before recording each action besides the first one
            cv2.waitKey(5000)
        
        # loop through the number of videos we want to collect for each action
        # on first go, use range(0, seq_nr) and record all actions done by the left hand,
        # and on second go, use range(seq_nr, seq_nr*2) and record all actions done by the right hand
        for sequence in range(seq_nr, seq_nr*2):

            # loop through video length
            # in the end I did not use an LSTM model so I only needed individual frames, 
            # making seq_len = 1
            for frame_nr in range(seq_len):

                 # read the video
                ret, frame = capture.read()

                # make detections with the holistic model
                image, results = mp_detection(frame, holistic)

                # draw landmarks on the live feed
                draw_landmarks(image, results)

                # output text to frame
                cv2.putText(image, '{}: {}'.format(action, sequence), (15, 12),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)

                # show the video in frame
                cv2.imshow('OpenCV Feed', image)

                # extract keypoints for each collection sample
                keypoints = get_single_hand_keypoints(results)
                npy_path = os.path.join(PATH, action, str(sequence), str(frame_nr))
                np.save(npy_path, keypoints)

                # escape the video feed
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    capture.release()
    cv2.destroyAllWindows()

# Preprocess data

Now we have all the data we need saved in a folder on the computer. We need to preprocess them in order to feed them to the recognition model.

In [9]:
# map the actions to numbers for labels
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'right': 0, 'left': 1, 'up': 2, 'down': 3}

In [10]:
# extra ???

# sequences = x, labels =y
sequences, labels = [], []

# loop through all the data we collected and save them to the sequences array,
# 
for action in actions:
    for sequence in range(seq_nr*2):
        
        # get together all arrays for that particular video
        window = []
        
        # load and append all frames for a single video
        for frame_nr in range(seq_len):
            res = np.load(os.path.join(PATH, action, str(sequence), '{}.npy'.format(frame_nr)))
            window.append(res)
            
        # append each video to the sequences (x data)
        sequences.append(window)
        
        # append label for the given video to the labels (y data)
        labels.append(label_map[action])

In [69]:
# sequences = x, labels =y
sequences, labels = [], []

# loop through all the data we collected and save them to the sequences array,
# and save the corresponding label to the labels array
for action in actions:
    for sequence in range(seq_nr*2):
        res = np.load(os.path.join(PATH, action, str(sequence), '{}.npy'.format(frame_nr)))
        sequences.append(res)
        labels.append(label_map[action])

In [71]:
# it should have the shape of (x, y, z), where
# x is the number of collected videos (single frames), 
# y is the number of keypoints in each sample
# and z is the number of dimensions per each keypoint
np.array(sequences).shape

(4000, 21, 3)

In [72]:
# the labels should have a single dimension matching x from the shape of sequences
np.array(labels).shape

(4000,)

In [73]:
# get numpy array of video data and reshape it for input into the model
X = np.array(sequences).reshape(-1, 21, 3, 1)

# get one-hot encoding of label variables
y = to_categorical(labels).astype(int)

In [74]:
# get train and test partitions
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [75]:
# double check train and test shapes to make sure they match up
print('Data shape matching: {}'.format(X_train.shape[0] == y_train.shape[0] and X_test.shape[0] == y_test.shape[0]))

Data shape matching: True


# Model and training

Now that we have the data processed, it is time to define a machine learning model for multilabel classification. I experimented with several different architectures, including an LSTM model before I decided to use single frames individually (originally, I wanted to record each gesture as a sequence of multiple frames). I ended up deciding on a deep learning convolutional network which trained fairly quickly and offered good performance without overfitting on the training data.

In [93]:
# set up logging and callback
log_dir = os.path.join('logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [108]:
def CNN_model(input_shape, output_shape):
    model = Sequential()
    model.add(Input(input_shape))
    model.add(Conv2D(32, (3, 3), activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.1))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(16, activation='relu'))
    model.add(GlobalAveragePooling2D())
    model.add(Dense(output_shape, activation='softmax'))
    
    return model

The resulting model has 4 different components:

1. Convolutional layers
Convolutional layers apply convolutions to the input, passing a learnable filter over it. It is used to learn patterns from the input. It is common for use in image and spatial data. Since I am not flattening the array of keypoints but keeping it in its x, y and z components, I thought it would be a good choice.

2. Dropout layers
Dropout layers randomly set some layer activations to 0, helping the model learn multiple independent representations and reduce dependency on any single parameter. I added them to help avoid overfitting.

3. Pooling layer
I chose GlobalAveragePooling2D as my pooling layer. Since the data is fairly small, a simple pooling layer like this seemed appropriate. It also further acts as a regularisation layer, decreasing the dependency on one single parameter. Lastly, as the model will be used for live predictions, I chose it for its computational efficiency.

4. Dense (fully connected) layers
Dense layers were introduced to perform classification on the features extracted by the previous layers.

In [110]:
X.shape

(4000, 21, 3, 1)

In [111]:
model = CNN_model((X.shape[1], X.shape[2], X.shape[3]), y.shape[1])

In [112]:
model.summary()

Model: "sequential_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_13 (Conv2D)          (None, 19, 1, 32)         320       
                                                                 
 dense_40 (Dense)            (None, 19, 1, 128)        4224      
                                                                 
 dropout_16 (Dropout)        (None, 19, 1, 128)        0         
                                                                 
 dense_41 (Dense)            (None, 19, 1, 64)         8256      
                                                                 
 dropout_17 (Dropout)        (None, 19, 1, 64)         0         
                                                                 
 dense_42 (Dense)            (None, 19, 1, 32)         2080      
                                                                 
 dense_43 (Dense)            (None, 19, 1, 16)       

In [113]:
# categorical crossentropy loss necessary for a categorical classification model
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [153]:
model.fit(X_train, y_train, epochs=100, callbacks=[tb_callback])

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.callbacks.History at 0x2428aa7f970>

Training for 100 epochs provides a consistently good result with high accuracy and low loss.

In [154]:
res = model.predict(X_test)



In [155]:
actions[np.argmax(res[1])]

'right'

In [156]:
actions[np.argmax(y_test[1])]

'right'

# Save/load weigths

To use the model later, I save the weights.

In [157]:
model.save('actions_model-single_hand.h5')

In [158]:
#model.load_weights('actions_model-single_hand.h5')

# Model evaluation

It is also important to evaluate the accuracy of the model on the test data. We can confirm that the model we trained provides high accuracy.

In [160]:
yhat = model.predict(X_test)



In [161]:
# convert from one-hot encoding back into categorical
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [162]:
# confusion matrix to see true and false positives/negatives
multilabel_confusion_matrix(ytrue, yhat)

array([[[149,   0],
        [  0,  51]],

       [[156,   0],
        [  0,  44]],

       [[145,   1],
        [  0,  54]],

       [[149,   0],
        [  1,  50]]], dtype=int64)

In [163]:
# get accuracy
accuracy_score(ytrue, yhat)

0.995

# Real-time detection

The last thing to do is set up real time detection and prediction. We will utilise a camera live feed loop similar to the one we used to collect the data in the beginning. We will also visualise the probabilities of the individual predictions for easier use and transparency.

In [16]:
colors = [(0,127,255), (0,255,0), (255,255,0), (255,0,127), (127,0,255)]
def prob_viz(res, actions, input_frame, colors):
    '''
    Visualise the probabilities of the individual categories for the predictions for a given frame.
        
        Parameters:
            res (numpy.ndarray): an array with the probability predictions for each category
            actions (array): an array containing the name of the individual categories. Has to be of the same 
                length as the colors array
            input_frame (numpy.ndarray): an ndarray representing an image for which we want to make a prediction
            colors (array): an array of tuple representations of colors to visualise individual categories. Has be of the same
                length as the actions array
                
        Returns:
            otuput_frame (numpy.ndarray): an ndarray with overlay of visualised probabilities for each action
    '''
    
    output_frame = input_frame.copy()
    
    for i in range(len(res[0])):
        cv2.rectangle(output_frame, (0,60+i*40), (int(res[0][i]*100), 90+i*40), colors[i], -1)
        cv2.putText(output_frame, actions[i], (0, 85+i*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
    return output_frame

In [227]:
# access webcam
capture = cv2.VideoCapture(0)

predictions = []
frameNr = 0
history = []
threshold = 0.7

# access mediapipe holistic model
# set initial detection confidence and subsequent tracking confidence
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while capture.isOpened():

        # read the video
        ret, frame = capture.read()

        # make detections with the holistic model
        image, results = mp_detection(frame, holistic)
        
        # draw landmarks on the live feed
        draw_landmarks(image, results)
        
        keypoints = get_single_hand_keypoints(results)
        
        res = model.predict(keypoints.reshape(-1, 21, 3, 1), verbose=0)
        predictions.append(np.argmax(res))
        
        if res[0][np.argmax(res)] > threshold:
            if len(history) > 0:
                if actions[np.argmax(res)] != history[-1]:
                    history.append(actions[np.argmax(res)])
            else:
                history.append(actions[np.argmax(res)])

            if len(history) > 5:
                history = history[-5:]
                
        # visualise probabilities
        image = prob_viz(res, actions, image, colors)
        
        # show the video in frame
        cv2.imshow('OpenCV Feed', image)

        # escape the video feed
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    capture.release()
    cv2.destroyAllWindows()