<img src='http://imgur.com/1ZcRyrc.png' style='float: left; margin: 20px; height: 55px'>


# DSI-SG-42 Capstone Project:
### Silent Echoes: From Hand Waves to Written Phrases

# 4. Extract Keypoints With MediaPipe

In this notebook, we will just be extracting the keypoints (coordinates) using MediaPipe library and exporting it each video (60 frames) into an empty folder. This will allow more efficient compiling for modeling in the next notebook.

In [1]:
# import libraries
import os
import shutil
import logging
import cv2
import time
import mediapipe as mp
import pandas as pd
import numpy as np

# Configure logging to write to a file
log_dir = ('../log_files')
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

log_file = os.path.join(log_dir, 'extract_keypoints.log')

# Setup Logger
logger = logging.getLogger()  # Get the root logger
for handler in logger.handlers:  # Remove all old handlers
    logger.removeHandler(handler)


logging.basicConfig(
    level=logging.INFO,
    format = '%(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file),
        logging.StreamHandler()  
    ]
)

# set display settings
%matplotlib inline
pd.set_option('display.width', 100000)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)


We intend to build 3 models with varying complexity. The first model would be the most complex, taking in landmark values from the face, pose, left, and right hands.

In [2]:
# These are the actions that we intend to train and predict

# the first 3 words that we will be modeling upon
first_three = ['please', 'sorry', 'hello']

## 4.1 Extracting keypoints with OpenCV and MediaPipe


We will extract the keypoints from each video file and save it as a numpy array file in the respective folders created earlier. We would also like to view how the landmarks are superimposed on the video to see how it tracks the signer's movement.

We will be intending to build 3 models:
- the first model will be extracting face, pose, left, and right hand landmarks and will be known as the `Comprehensive Model`
- the second model will be extracting pose, left and right hand landmarks and will be known as the `PH Model`
- the third model will be extracting left and right hand landmarks and will be known as the `Hands Model`


Before proceeding with keypoint extraction with MediaPipe and OpenCV library, we will need to set two lines of code to allow us to initiate tools to track body positions.
- The `mp.solutions.holistic` is a pre-trained model from MediaPipe library for body pose estimation which will be vital in tracking the hand movements of the video.
- The `mp.solutions.drawing_utils` allows the ability to draw landmarks, connections, and annotations in images and videos. They will be extremely useful to visualize the landmarks and connections of how it tracks the movement of the hand movements of the videos.

In [3]:
# Assign and initiate MediaPipe tools

mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utililties

For the model to work as intended, our values of our videos are in Red-Green-Blue but for our model to perform a pose estimation, the colours will need to be converted to Blue-Green-Red due to its programmed nature. Here we create a function that changes the video colours from RGB to BGR and let the model process that image and returns us back to RGB format.

In [4]:
# to perform pose estimation

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Color conversion BGR 2 RGB
    image.flags.writeable = False # Image is no longer writeable
    results = model.process(image) # make predictions
    image.flags.writeable = True # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # Color conversion RGB 2 BGR
    return image, results

We would like to observe how the model tracks the body movement. We will create a function to draw the respective landamrks from the face, body pose, and both hands. 

Depending on the type of model complexity, a conditional statement is placed in this function and will need to be specified during the function call to either inspect the video clip or extract the datapoints out. 

In [5]:
# visualize the drawing of landmarks and connections on the body
def draw_landmarks(image, results, model_type):

    '''
    As we will be conducting 3 models in varying complexity, we will include a conditional if statement where we can 
    assign which model_type (model complexity) we will be extracting the landmark values on

    The first argument 'image' takes in the image
    The second argument 'results.POSITIONAL_landmarks' refers to the type of landmarks to be used
    The third argument 'mp_holistic.POSTIONAL_connections/contours' refers to the connecting joints for which POSITIONAL landmark
    The fourth argument 'mp_drawing.DrawingSpec' draws the node/points on the body part
    The fifth arguement 'mp_drawing.DrawingSpec' draws the connecting lines between adjacent landmarks/coordinates
    '''

    # comprehensive model
    if model_type == 'full':
        # Draw face connections
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                                mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                                mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                ) 
        # Draw pose connections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                ) 
        # Draw left hand connections
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                ) 
        # Draw right hand connections  
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                ) 
        
    # PH Model
    elif model_type == 'ph':
        
        # Draw pose connections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                ) 
        # Draw left hand connections
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                ) 
        # Draw right hand connections  
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                ) 

    # Hands Model 
    elif model_type == 'hands':
        
        # Draw left hand connections
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                ) 
        # Draw right hand connections  
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                ) 
    
    # if none selected print the following
    else:
        print('Please choose "full", "ph", or "hands".')

As we will be conducting 3 models in varying complexity, we will include a conditional if statement where we can 
assign which model_type (model complexity) we will be extracting the landmark values on.

Each of these groupings will be extracted and flatten iinto an array. However, there will be instances where the 
landmark or body part is not visible from the camera and we will assign a value of zeroes to ensure that the 
array shape is consistent.

In [6]:
# to extract the landmark values, x,y,z, visibility(pose) of the video
def extract_keypoints(results, model_type):
    '''
    As we will be conducting 3 models in varying complexity, we will extract the landmark values according to the model specified
    Each of the conditional statements have an object that extracts the landmark according to their coordinates. All landmarks have
    x,y,z values except for 'pose' which has an added value termed 'visibility'.
    
    The origin (0,0,0) is on the top-left of the frame. The greater the value the landmark is lower in the frame. Conversely, 
    The smaller the value the the landmark is higher in the frame. This will apply for both x and y values. 
    The z-value measures the distance/depth from the camera with greater value indicating the point further from the camera
    The visibility value measures the confidence that the landmark is in frame with a higher value indicating that the landmark 
    is present and are accurately detected


    The x-value is the horizontal coordinate of the frame, 
        y-value is the vertical coordinate of the frame, 
        z-value is the distance coordinate to the camera in the frame
        visibility is the confidence score of the landmark being detected

    A conditional statement is established such that if the landmark is not detected, fill the row with zeros to ensure correct shape will be output.
    Each landmark has different number of coordinates and we will multiply the number of landmarks by the number of coordinates present in each landmark.
    '''

    # Comprehensive Model
    if model_type == 'full':
        pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
        face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
        return np.concatenate([face, pose, lh, rh])
    
    # PH Model
    elif model_type == 'ph':

        pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
        return np.concatenate([pose, lh, rh])
    
    # Hands Model
    elif model_type == 'hands':
        
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
        return np.concatenate([lh, rh])
    
    # if none chosen, print the following
    else:
        print('Please choose "full", "ph", or "hands".')

We will create a function to loop through each actions in the respective folder. In this function will need to specifywhich video dataset we are using - `train_data`, `test_data`, and `val_data` - and a decide a folder name that the files will be created in. 


In [17]:
# custom function to loop through each words and save extracted keypoints in numpy array
def process_dataset_words(test_train_val_data: str, extracted_keypoints_folder: str, model_type: str):
    '''
    This function will loop through each of the train/test/val folder in the directory, extracting the relevant word
    class in the respective folder - 'please', 'sorry', 'hello'. The last argument in the function will take in
    what landmarks to be extracted based on the model type
    
    This function will also call on functions 'draw_landmarks' and 'extract_keypoints' that was created earlier.
    During the extraction of landmarks, we can inspect what MediaPipe detects in each of the video.
    
    '''

    # iterate through the first three words
    for word in first_three:
        # get the path to the folder containing videos for the current word
        videos_path = os.path.join('../videos', test_train_val_data, 'output', word)
        videos = os.listdir(videos_path)  # Get the list of videos for the current word

        # Check if there are any videos to process
        if videos:
            # iterate through each video
            for count, vid in enumerate(videos):
                # create the path for storing extracted keypoints
                video_folder_path = os.path.join('../data', extracted_keypoints_folder, word, str(count))

                # check if the directory exists, if not create one
                if not os.path.exists(video_folder_path):
                    os.makedirs(video_folder_path, exist_ok=True)
                    # print(f"Creating directory: {video_folder_path}")  # debug
                # else:                                                  # debug
                #     print(f"Directory already exists: {video_folder_path}")  # debug

                # construct video path    
                VIDEO_PATH = os.path.join(videos_path, vid)

                # read the video path
                cap = cv2.VideoCapture(VIDEO_PATH)
                frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                max_frames = min(frame_count, 60)

                # initialize MediaPipe Holistic Model
                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    frames_processed = 0

                    # process each frame in the video
                    while frames_processed < max_frames:
                        ret, frame = cap.read()
                        if not ret:
                            break

                        # perform detection using MediaPipe
                        image, results = mediapipe_detection(frame, holistic)
                        draw_landmarks(image, results, model_type)
                        keypoints = extract_keypoints(results, model_type)

                        # create npy file and write keypoint value inside
                        npy_path = os.path.join(video_folder_path, str(frames_processed) + '.npy')
                        np.save(npy_path, keypoints)

                        # display the image with landmarks
                        cv2.imshow(test_train_val_data, image)

                        # break gracefully
                        if cv2.waitKey(10) & 0xFF == ord('q'):
                            break
                        frames_processed += 1

                    # Pad remaining frames
                    while frames_processed < 60:
                        npy_path = os.path.join(video_folder_path, str(frames_processed) + '.npy')
                        keypoints_pad = np.zeros_like(keypoints)
                        np.save(npy_path, keypoints_pad)
                        frames_processed += 1
                        
                # Release video capture object and close all OpenCV windows
                cap.release()
                cv2.destroyAllWindows()
        else:
            print(f"No videos found for {word}. No directory created.")

    return 'All Videos Processed'



We will be collecting the landmark values in a numpy array. Here, we will create a new folder `training_keypoints` in the `data` folder which the datapoints can be saved as a .npy file. Each numbered folder in the list represents one video file.

In [13]:
# a function to create a folder for each video to store landmark values for each word action

def make_numpy_directory(test_train_val_data: str, extracted_keypoints_folder:str):
    
    '''
    This functions serves to create a folder for each word class in the data folder 
    to allow the numpy arrays to be stored
    '''


    # create directory and ensure that directory is present
    os.makedirs('../data/' + extracted_keypoints_folder + '/', exist_ok=True) 

    # Loop for each signed word in the list of words
    for word in first_three:
        count = 0  # Start numbering video files from 1

        # Iterate each video file in the signed word folder
        video_directory = '../videos/' + test_train_val_data + '/' + word + '/cropped_videos/'
        videos = os.listdir(video_directory)
        if videos:  # Check if there are any videos to process
            for vid in videos:
                # Create a new folder to store the numpy array values
                numpy_folder_path = os.path.join('../data', extracted_keypoints_folder, word, str(count))
                os.makedirs(numpy_folder_path, exist_ok=True)
                # print(f"Creating or verifying directory: {numpy_folder_path}")  # Debug: log directory creation
                count += 1  # Increment the counter for the next video
        else:
            print(f"No videos found in {video_directory}")  # Inform if no videos found


## 4.2 Extraction of Face, Pose, Left, and Right Hand Keypoints


This will be the most comprehensive dataset of all the models as it will contain information on the face, pose, and hands.



In [19]:
# create folders to store the data from the extraction process

# create folders for train,test, and val data
make_numpy_directory('train_data', 'training_full_keypoints')
make_numpy_directory('test_data', 'testing_full_keypoints')
make_numpy_directory('val_data', 'val_full_keypoints')

In [20]:
%%time

# extract datapoints for the comprehensive model 

# training model
process_dataset_words('train_data', 'training_full_keypoints', 'full')

# # testing model
process_dataset_words('test_data', 'testing_full_keypoints', 'full')

# val model
process_dataset_words('val_data', 'val_full_keypoints', 'full')

CPU times: total: 1h 8min 20s
Wall time: 58min 10s


'All Videos Processed'

## 4.3 Extraction of Pose, Left, and Right Hand Keypoints


We will no reduce our model's complexity and remove facial landmarks and only be extracting datapoints from the pose, left and right hands

In [21]:
# create folders to store the data from the extration process

# create folders for train,test, and val data
make_numpy_directory('train_data', 'training_ph_keypoints')
make_numpy_directory('test_data', 'testing_ph_keypoints')
make_numpy_directory('val_data', 'val_ph_keypoints')

In [22]:
%%time

# extract datapoints for the model that only has datapoints from the pose, left and right hands

# training model
process_dataset_words('train_data', 'training_ph_keypoints', 'ph')

# testing model
process_dataset_words('test_data', 'testing_ph_keypoints', 'ph')

# val model
process_dataset_words('val_data', 'val_ph_keypoints', 'ph')

CPU times: total: 1h 2min 33s
Wall time: 53min 3s


'All Videos Processed'

## 4.4 Extraction of Left and Right Hand Keypoints


The last model we will be building is the simplest model which will only contain the datapoints from both hands.

In [23]:
# create folders to store the data from the extration process

# create folders for train,test, and val data
make_numpy_directory('train_data', 'training_hands_keypoints')
make_numpy_directory('test_data', 'testing_hands_keypoints')
make_numpy_directory('val_data', 'val_hands_keypoints')

In [24]:
%%time

# extract datapoints for the the model that only has hand landmarks

# training model
process_dataset_words('train_data', 'training_hands_keypoints', 'hands')

# testing model
process_dataset_words('test_data', 'testing_hands_keypoints', 'hands')

# val model
process_dataset_words('val_data', 'val_hands_keypoints', 'hands')

CPU times: total: 1h 2min 18s
Wall time: 52min 52s


'All Videos Processed'