# Applying Pose Estimation

In this script, we will apply Tensorflow's pose estimation model [MoveNet Thunder](https://www.tensorflow.org/hub/tutorials/movenet) to two short videos of two speeches in the German Bundestag. The videos of those speeches are stored in a GitHub repository under the following link: [github.com/orittmann/ssdl_body_language.git](https://github.com/orittmann/ssdl_body_language.git). We start by loading the content of this repository into our environment:

In [1]:
!git clone https://github.com/orittmann/ssdl_body_language.git

Cloning into 'ssdl_body_language'...
remote: Enumerating objects: 596, done.[K
remote: Counting objects: 100% (43/43), done.[K
remote: Compressing objects: 100% (33/33), done.[K
remote: Total 596 (delta 8), reused 39 (delta 7), pack-reused 553 (from 1)[K
Receiving objects: 100% (596/596), 222.38 MiB | 13.22 MiB/s, done.
Resolving deltas: 100% (12/12), done.
Updating files: 100% (564/564), done.


Once this code run succesfully, you should be able to find all files of the repository, including its folder structure, in this Colab notebook. The two videos that we would like to analyze are located at `ssdl_body_language/videos/speech1_gabriela_heinrichs.mp4` and `ssdl_body_language/videos/speech2_klaus_ernst.mp4`.

The repository also includes the file `ssdl_body_language/model/lite-model_movenet_singlepose_thunder_3.tflite`. This is the pre-trained pose estimation model that we will use to analyze the videos. You can find more details [here](https://www.tensorflow.org/hub/tutorials/movenet).

Now that we have our files in place, we make sure to load all necessary dependencies:

In [11]:
# import dependencies
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
import cv2

# to store resulting data aas json
import json

# to get filenames in directory
import os
import fnmatch

To start, we load and prepare the pose estimation model:

In [12]:
interpreter = tf.lite.Interpreter(model_path='ssdl_body_language/model/lite-model_movenet_singlepose_thunder_3.tflite')
interpreter.allocate_tensors()

Next, we set up the function `make_keypoint_detection`, which processes an input video frame-by-frame and performs keypoint detection using TensorFlow's MoveNet model. The function resizes each frame of the video to the required size, prepares it for the model, runs inference to get keypoints, stores the keypoints, and finally returns all the keypoints detected from the video.

In [13]:
def make_keypoint_detection(video_path):

    # "output_images" is an empty list that will store the keypoints detected
    # in each frame of the video.
    output_images = []

    # "cap" is a video capturing object that allows us to read the frames of
    # the video stored at video_path
    cap = cv2.VideoCapture(video_path)

    # We initialize a while-loop that that loops through all frames of the video
    # The loop is open as long as the video capture object "cap" is open.
    # That is, until all frames are analyzed.
    while cap.isOpened():

        # We read the current frame of the video
        # "ret" is boolean, indicating if the frame was read successfully
        # "frame" is the actual frame of the video
        ret, frame = cap.read()

        # We only proceed if the frame is read correctly (i.e., if ret is TRUE)
        if not ret:
            print("Stream end.")
            break

        # Image Preparation:
        # We create a copy of the current frame to avoid modyfing the original.
        img = frame.copy()

        # MoveNet Thunder requires a frame size of 256x256
        # For that reason, we resize the frame.
        # This includes padding since the original video is not square
        img = tf.image.resize_with_pad(np.expand_dims(img, axis=0), 256, 256)

        # We convert the resized image to a TensorFlow float32 tensor,
        # so that we can feed it into the model
        input_image = tf.cast(img, dtype=tf.float32)

        # Setting Up Model Input and Output:
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # Running inference:
        # - We set up the input tensor with the prepare input frame
        # - We run the model
        # - We retrieve the output tensor, which contains the keypoints
        interpreter.set_tensor(input_details[0]['index'], input_image.numpy())
        interpreter.invoke()
        keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])

        # Storing the Results:
        # We transform numpy array to a list (this makes it easier to store as
        # the output as a .json file later) and append it to "output_images"
        # for storage
        output_images.append(keypoints_with_scores.tolist())

    # Final Steps:
    # - We release the video capturing object
    # - We close any OpenCV windows
    # - We return the list of keypoints detected in each frame of the video
    cap.release()
    cv2.destroyAllWindows()

    return output_images

We will apply this function to two videos only. Nevertheless, we'll write the code in a way that allows us to efficiently apply it to as many videos as we want. To do so, we create a vector, `video_files`, that contains the filenames of all videos we would like to analyze.

In [14]:
video_files = fnmatch.filter(os.listdir("ssdl_body_language/videos"), "*.mp4")

print(video_files)

['speech2_klaus_ernst.mp4', 'speech1_gabriela_heinrich.mp4']


...and loop through this vector, applying our function to each video in `video_files`.

In [15]:
keypoints_result_tmp = make_keypoint_detection("ssdl_body_language/videos/speech1_gabriela_heinrich.mp4")


Stream end.


error: OpenCV(4.10.0) /io/opencv/modules/highgui/src/window.cpp:1295: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvDestroyAllWindows'


In [9]:
# loop over all videos
for i in np.arange(0, len(video_files)):
    # We start by retrieving the file path to the current video
    current_file = video_files[i]
    current_path = "ssdl_body_language/videos/" + current_file

    # We print a message indicating the start of inference for the current video
    print("Start inference for video " + str(i) + ": " + current_file)

    # Executing keypoint detection:
    # We call the "make_keypoint_detection" function with the path to the
    # current video and store the resulting keypoints in "keypoints_result_tmp"
    keypoints_result_tmp = make_keypoint_detection(current_path)

    # Store data:
    # We specify the filepath and file name of the output file
    # and store the output
    res_json_file = current_file.replace("mp4", "json")
    res_json_file_path = "ssdl_body_language/movenet_results/" + res_json_file

    with open(res_json_file_path, 'w') as fp:
        json.dump(keypoints_result_tmp, fp)

    # We delete the temporary keypoint results to free up memory
    del keypoints_result_tmp

    # Finally, we print a message indicating the end of inference for the
    # current video
    print("End inference for video " + str(i) + ": " + current_file)


Start inference for video 0: speech2_klaus_ernst.mp4
Stream end.


error: OpenCV(4.10.0) /io/opencv/modules/highgui/src/window.cpp:1295: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvDestroyAllWindows'


After r