---
title: "How to build a motion tracking pipeline: Using MediaPipe with optional masking"
authors: Wim Pouw, adapted by Šárka Kadavá
---

## Info documents

This python notebook runs you through the procedure of taking videos as inputs with a single person in the video, and outputting the 1 outputs of the kinematic timeseries, and optionally masking video with facial, hand, and arm kinematics ovelayen.

The masked-piper tool is a simple but effective modification of the the Holistic Tracking by Google's Mediapipe so that we can use it as a CPU-based light weigth tool to mask your video data while maintaining background information, and also preserving information about body kinematics. 

## Additional information backbone of the tool (Mediapipe Holistic Tracking)
https://google.github.io/mediapipe/solutions/holistic.html

## Citation of mediapipe
citation: Lugaresi, C., Tang, J., Nash, H., McClanahan, C., Uboweja, E., Hays, M., ... & Grundmann, M. (2019). Mediapipe: A framework for building perception pipelines. arXiv preprint arXiv:1906.08172.

## Citation of masked piper
* citation: Owoyele, B., Trujillo, J., De Melo, G., & Pouw, W. (2022). Masked-Piper: Masking personal identities in visual recordings while preserving multimodal information. SoftwareX, 20, 101236. 
* Original Repo: https://github.com/WimPouw/TowardsMultimodalOpenScience


The first thing we **always** want to do when we open a notebook, is to setup our environment - that means importing packages, setting up folder dependencies etc.

In [None]:
import mediapipe as mp #mediapipe
import cv2 #opencv
import math #basic operations
import numpy as np #basic operations
import pandas as pd #data wrangling
import csv #csv saving
import os #some basic functions for inspecting folder structure etc.
import glob


curfolder = os.getcwd() #get the current working directory

inputfolder = curfolder + "/Input_Videos/" #set the path to the input videos
videofiles = glob.glob(inputfolder + "*.avi",) #get all the mp4 files in the input folder

outputf_mask = curfolder + "/Output_Videos/" #video with skeleton/mask
#create the output folder if it does not exist
if not os.path.exists(outputf_mask):
    os.makedirs(outputf_mask)

outtputf_ts = curfolder + "/Output_TimeSeries/" #time series output folder
#create the output folder if it does not exist
if not os.path.exists(outtputf_ts):
    os.makedirs(outtputf_ts)

#check videos to be processed
print("The following folder is set as the output folder where all the pose time series are stored")
print(os.path.abspath(outtputf_ts))
print("\n The following folder is set as the output folder for saving the masked videos ")
print(os.path.abspath(outputf_mask))
print("\n The following video(s) will be processed for masking: ")
print(videofiles)

The following folder is set as the output folder where all the pose time series are stored
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe\Output_TimeSeries

 The following folder is set as the output folder for saving the masked videos 
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe\Output_Videos

 The following video(s) will be processed for masking: 
['c:\\Users\\Sarka Kadava\\Documents\\Github\\MotionTrackingPipeline_Workshop_ZAS2025\\01_MotionCapture\\Mediapipe/Input_Videos\\0_1_trial_20_p0_verdrietig_combinatie_video_raw_cam2.avi', 'c:\\Users\\Sarka Kadava\\Documents\\Github\\MotionTrackingPipeline_Workshop_ZAS2025\\01_MotionCapture\\Mediapipe/Input_Videos\\0_1_trial_21_p0_koud_combinatie_video_raw_cam2.avi', 'c:\\Users\\Sarka Kadava\\Documents\\Github\\MotionTrackingPipeline_Workshop_ZAS2025\\01_MotionCapture\\Mediapipe/Input_Videos\\0_1_trial_22_p0_staan_combinat

Now, we can start to prepare mediapipe by loading in functions and setting the keypoinnts of interest.

Unless you are a developer and want to modify the code to utilize some special functions, this code can most likely remain unchanged.

In [4]:
#initialize modules and functions

#load in mediapipe modules
mp_holistic = mp.solutions.holistic
# Import drawing_utils and drawing_styles.
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

##################FUNCTIONS AND OTHER VARIABLES
#landmarks 33x that are used by Mediapipe (Blazepose)
markersbody = ['NOSE', 'LEFT_EYE_INNER', 'LEFT_EYE', 'LEFT_EYE_OUTER', 'RIGHT_EYE_OUTER', 'RIGHT_EYE', 'RIGHT_EYE_OUTER',
          'LEFT_EAR', 'RIGHT_EAR', 'MOUTH_LEFT', 'MOUTH_RIGHT', 'LEFT_SHOULDER', 'RIGHT_SHOULDER', 'LEFT_ELBOW', 
          'RIGHT_ELBOW', 'LEFT_WRIST', 'RIGHT_WRIST', 'LEFT_PINKY', 'RIGHT_PINKY', 'LEFT_INDEX', 'RIGHT_INDEX',
          'LEFT_THUMB', 'RIGHT_THUMB', 'LEFT_HIP', 'RIGHT_HIP', 'LEFT_KNEE', 'RIGHT_KNEE', 'LEFT_ANKLE', 'RIGHT_ANKLE',
          'LEFT_HEEL', 'RIGHT_HEEL', 'LEFT_FOOT_INDEX', 'RIGHT_FOOT_INDEX']

markershands = ['LEFT_WRIST', 'LEFT_THUMB_CMC', 'LEFT_THUMB_MCP', 'LEFT_THUMB_IP', 'LEFT_THUMB_TIP', 'LEFT_INDEX_FINGER_MCP',
              'LEFT_INDEX_FINGER_PIP', 'LEFT_INDEX_FINGER_DIP', 'LEFT_INDEX_FINGER_TIP', 'LEFT_MIDDLE_FINGER_MCP', 
               'LEFT_MIDDLE_FINGER_PIP', 'LEFT_MIDDLE_FINGER_DIP', 'LEFT_MIDDLE_FINGER_TIP', 'LEFT_RING_FINGER_MCP', 
               'LEFT_RING_FINGER_PIP', 'LEFT_RING_FINGER_DIP', 'LEFT_RING_FINGER_TIP', 'LEFT_PINKY_FINGER_MCP', 
               'LEFT_PINKY_FINGER_PIP', 'LEFT_PINKY_FINGER_DIP', 'LEFT_PINKY_FINGER_TIP',
              'RIGHT_WRIST', 'RIGHT_THUMB_CMC', 'RIGHT_THUMB_MCP', 'RIGHT_THUMB_IP', 'RIGHT_THUMB_TIP', 'RIGHT_INDEX_FINGER_MCP',
              'RIGHT_INDEX_FINGER_PIP', 'RIGHT_INDEX_FINGER_DIP', 'RIGHT_INDEX_FINGER_TIP', 'RIGHT_MIDDLE_FINGER_MCP', 
               'RIGHT_MIDDLE_FINGER_PIP', 'RIGHT_MIDDLE_FINGER_DIP', 'RIGHT_MIDDLE_FINGER_TIP', 'RIGHT_RING_FINGER_MCP', 
               'RIGHT_RING_FINGER_PIP', 'RIGHT_RING_FINGER_DIP', 'RIGHT_RING_FINGER_TIP', 'RIGHT_PINKY_FINGER_MCP', 
               'RIGHT_PINKY_FINGER_PIP', 'RIGHT_PINKY_FINGER_DIP', 'RIGHT_PINKY_FINGER_TIP']
facemarks = [str(x) for x in range(478)] #there are 478 points for the face mesh (see google holistic face mesh info for landmarks)

print("Note that we have the following number of pose keypoints for markers body")
print(len(markersbody))

print("\n Note that we have the following number of pose keypoints for markers hands")
print(len(markershands))

print("\n Note that we have the following number of pose keypoints for markers face")
print(len(facemarks ))

#set up the column names and objects for the time series data (add time as the first variable)
markerxyzbody = ['time']
markerxyzhands = ['time']
markerxyzface = ['time']

for mark in markersbody:
    for pos in ['X', 'Y', 'Z', 'visibility']: #for markers of the body you also have a visibility reliability score
        nm = pos + "_" + mark
        markerxyzbody.append(nm)
for mark in markershands:
    for pos in ['X', 'Y', 'Z']:
        nm = pos + "_" + mark
        markerxyzhands.append(nm)
for mark in facemarks:
    for pos in ['X', 'Y', 'Z']:
        nm = pos + "_" + mark
        markerxyzface.append(nm)

#check if there are numbers in a string
def num_there(s):
    return any(i.isdigit() for i in s)

#take some google classification object and convert it into a string
def makegoginto_str(gogobj):
    gogobj = str(gogobj).strip("[]")
    gogobj = gogobj.split("\n")
    return(gogobj[:-1]) #ignore last element as this has nothing

#make the stringifyd position traces into clean numerical values
def listpostions(newsamplemarks):
    newsamplemarks = makegoginto_str(newsamplemarks)
    tracking_p = []
    for value in newsamplemarks:
        if num_there(value):
            stripped = value.split(':', 1)[1]
            stripped = stripped.strip() #remove spaces in the string if present
            tracking_p.append(stripped) #add to this list  
    return(tracking_p)

Note that we have the following number of pose keypoints for markers body
33

 Note that we have the following number of pose keypoints for markers hands
42

 Note that we have the following number of pose keypoints for markers face
478


## Main procedure Masked-Piper
The following chunk of code loops through all the videos you have loaded into the input folder, then assess each frame for body poses, extract kinematic info, masks the body in a new frame that keeps the background, projects the kinematic info on the mask, and stores the kinematic info for that frame into the time series .csv for the hand + body + face.

In [5]:
# do you want to apply masking?
masking = True

# We will now loop over all the videos that are present in the video file
for vidf in videofiles:
    print("We will now process video:")
    print(vidf)
    print("This is video number " + str(videofiles.index(vidf)) + " of " + str(len(videofiles)) + " videos in total")
    
    # Capture the video and check video settings
    videoname = vidf.split("\\")[-1]
    #videoname2 = videoname.replace("mp4", "avi")
    videoloc = vidf
    capture = cv2.VideoCapture(videoloc)  # load the video capture
    frameWidth = capture.get(cv2.CAP_PROP_FRAME_WIDTH) #check frame width 1280 
    frameHeight = capture.get(cv2.CAP_PROP_FRAME_HEIGHT) #check frame height 720
    samplerate = capture.get(cv2.CAP_PROP_FPS)   #fps = frames per second 30
    #print(frameWidth, frameHeight, samplerate)

    # Create an empty video file to project the pose tracking on
    fourcc = cv2.VideoWriter_fourcc(*'XVID')  # for different video formats you could use e.g., *'XVID'
    #print(fourcc)
    out = cv2.VideoWriter(outputf_mask + videoname, fourcc, fps=samplerate, 
                          frameSize=(int(frameWidth), int(frameHeight)))

    # Initialize Mediapipe Holistic
    time = 0
    tsbody = [markerxyzbody]  # These are the time series objects starting with column names initialized above
    tshands = [markerxyzhands]
    tsface = [markerxyzface]
    
    # Adding real-world landmarks
    tsbody_world = [markerxyzbody]  # For world landmarks (3D coordinates)
    tsface_world = [markerxyzface]  # For normalized 3D face landmarks
    tshands_world = [markerxyzhands]  # For normalized 3D hand landmarks
    
    with mp_holistic.Holistic(
            static_image_mode=False, enable_segmentation=True, refine_face_landmarks=True) as holistic:
        while (True):
            ret, image = capture.read()  # read frame
            if ret == True:  # if there is a frame
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # ensure the image is in RGB format
                results = holistic.process(image)  # apply Mediapipe holistic processing
                
                h, w, c = image.shape
                if results.face_landmarks or results.pose_landmarks or results.left_hand_landmarks or results.right_hand_landmarks:
                    if masking == False:
                        original_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
                    elif masking == True:
                        # Masking logic here (if needed)
                        original_image = np.concatenate([image, np.full((h, w, 1), 255, dtype=np.uint8)], axis=-1)
                        mask_img = np.zeros_like(image, dtype=np.uint8) #set up basic mask image
                        mask_img[:, :] = (255,255,255) #set up basic mask image
                        segm_2class = 0.2 + 0.8 * results.segmentation_mask #set up a segmentation of the results of mediapipe
                        segm_2class = np.repeat(segm_2class[..., np.newaxis], 3, axis=2) #set up a segmentation of the results of mediapipe
                        annotated_image = mask_img * segm_2class * (1 - segm_2class) #take the basic mask image and make a sillhouette mask
                        # append Alpha channel to sillhouetted mask so that we can overlay it to the original image
                        mask = np.concatenate([annotated_image, np.full((h, w, 1), 255, dtype=np.uint8)], axis=-1)
                        # Zero background where we want to overlay
                        original_image[mask==0]=0 #for the original image we are going to set everything at zero for places where the mask has to go
                        original_image = cv2.cvtColor(original_image, cv2.COLOR_RGB2BGR)
                    
                    # Draw left hand, right hand, face, and body pose landmarks
                    mp_drawing.draw_landmarks(original_image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                    mp_drawing.draw_landmarks(original_image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                    mp_drawing.draw_landmarks(
                        original_image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION)
                    mp_drawing.draw_landmarks(
                        original_image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
                    
                    # Save pose landmarks (2D image-based)
                    samplebody = listpostions(results.pose_landmarks)
                    samplehands = listpostions([results.left_hand_landmarks, results.right_hand_landmarks])
                    sampleface = listpostions(results.face_landmarks)
                    samplebody.insert(0, time)
                    samplehands.insert(0, time)
                    sampleface.insert(0, time)
                    tsbody.append(samplebody)
                    tshands.append(samplehands)
                    tsface.append(sampleface)
                    
                    # Save pose world landmarks (3D coordinates in meters)
                    if results.pose_world_landmarks:
                        samplebody_world = listpostions(results.pose_world_landmarks)
                        samplebody_world.insert(0, time)
                        tsbody_world.append(samplebody_world)

                    # Save face and hand landmarks (as normalized 3D landmarks)
                    if results.face_landmarks:
                        sampleface_world = listpostions(results.face_landmarks)
                        sampleface_world.insert(0, time)
                        tsface_world.append(sampleface_world)

                    if results.left_hand_landmarks or results.right_hand_landmarks:
                        samplehands_world = listpostions([results.left_hand_landmarks, results.right_hand_landmarks])
                        samplehands_world.insert(0, time)
                        tshands_world.append(samplehands_world)

                else:
                    # If no landmarks detected, append NaNs
                    samplebody = [np.nan for x in range(len(markerxyzbody)-1)]
                    samplehands = [np.nan for x in range(len(markerxyzhands)-1)]
                    sampleface = [np.nan for x in range(len(markerxyzface)-1)]
                    samplebody.insert(0, time)
                    samplehands.insert(0, time)
                    sampleface.insert(0, time)
                    tsbody.append(samplebody)
                    tshands.append(samplehands)
                    tsface.append(sampleface)

                    # Append NaNs for world coordinates as well
                    samplebody_world = [np.nan for x in range(len(markerxyzbody)-1)]
                    samplebody_world.insert(0, time)
                    tsbody_world.append(samplebody_world)

                    sampleface_world = [np.nan for x in range(len(markerxyzface)-1)]
                    sampleface_world.insert(0, time)
                    tsface_world.append(sampleface_world)

                    samplehands_world = [np.nan for x in range(len(markerxyzhands)-1)]
                    samplehands_world.insert(0, time)
                    tshands_world.append(samplehands_world)

                # Show the video as we process
                cv2.imshow("resizedimage", original_image)
                out.write(original_image)  # save the frame to the new masked video
                time += (1000 / samplerate)  # update the time variable for the next frame
            
            if cv2.waitKey(1) == 27:  # allow the use of ESCAPE to break the loop
                break
            if ret == False:  # if there are no more frames, break the loop
                break

    # Once done, de-initialize all processes
    out.release()
    capture.release()
    cv2.destroyAllWindows()

    # Save CSV data for body, face, and hands
    filebody = open(outtputf_ts + videoname + '_body.csv', 'w+', newline='')
    with filebody:
        write = csv.writer(filebody)
        write.writerows(tsbody)
    
    filehands = open(outtputf_ts + videoname + '_hands.csv', 'w+', newline='')
    with filehands:
        write = csv.writer(filehands)
        write.writerows(tshands)
    
    fileface = open(outtputf_ts + videoname + '_face.csv', 'w+', newline='')
    with fileface:
        write = csv.writer(fileface)
        write.writerows(tsface)

    # Save world coordinates (in meters) to CSV for body, face, and hands
    filebody_world = open(outtputf_ts + videoname + '_body_world.csv', 'w+', newline='')
    with filebody_world:
        write = csv.writer(filebody_world)
        write.writerows(tsbody_world)

    fileface_world = open(outtputf_ts + videoname + '_face_world.csv', 'w+', newline='')
    with fileface_world:
        write = csv.writer(fileface_world)
        write.writerows(tsface_world)

    filehands_world = open(outtputf_ts + videoname + '_hands_world.csv', 'w+', newline='')
    with filehands_world:
        write = csv.writer(filehands_world)
        write.writerows(tshands_world)

print("Done with processing all folders; go look in your output folders!")

We will now process video:
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe/Input_Videos\0_1_trial_20_p0_verdrietig_combinatie_video_raw_cam2.avi
This is video number 0 of 14 videos in total
We will now process video:
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe/Input_Videos\0_1_trial_21_p0_koud_combinatie_video_raw_cam2.avi
This is video number 1 of 14 videos in total
We will now process video:
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe/Input_Videos\0_1_trial_22_p0_staan_combinatie_video_raw_cam2.avi
This is video number 2 of 14 videos in total
We will now process video:
c:\Users\Sarka Kadava\Documents\Github\MotionTrackingPipeline_Workshop_ZAS2025\01_MotionCapture\Mediapipe/Input_Videos\0_1_trial_23_p0_stil_combinatie_video_raw_cam2.avi
This is video number 3 of 14 videos in total
We will now process video

## Quality check

One thing is really really important - we must resist the temptation to treat code as a black box. It can be code that you are reproducing from the most-cited paper, and yet it can go wrong in any step of the way.

This is even more crucial for pipeline like MediaPipe where we fit some body model onto a video but do not, for example, calibrate the cameras. All keypoints are mere estimation - and this can be tricky especially for the depth (z-)dimension.

In [7]:
import glob
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation
from IPython.display import Video

# 3D tracking checking
outputcsv = glob.glob(outtputf_ts + '*.csv')
fileexample = outputcsv[1]

MT_tracking = pd.read_csv(fileexample)

# Create a figure and axis
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(111, projection='3d')

# Define the number of frames
num_frames = len(MT_tracking)

# Define the scatter plot
scatter = ax.scatter([], [], [], marker='o')

# Update function for animation
def update(frame):
    ax.clear()
    ax.set_xlabel('X Label')
    ax.set_ylabel('Z Label')
    ax.set_zlabel('Y Label')
    ax.set_title('3D Animation')
    
    # Set the limits of the axes based on the maximum values
    ax.set_xlim3d(0, 1)
    ax.set_ylim3d(20, 0)
    ax.set_zlim3d(0, 1) #we need to flip the vertical
    
    # Plot the data for the current frame
    frame_data = MT_tracking.iloc[frame]
    frame_data_body = MT_tracking.iloc[frame]
    x = frame_data.filter(like='X_')
    x_body = frame_data_body.filter(like='X_')
    y = frame_data.filter(like='Z_')
    y_body = frame_data_body.filter(like='Z_')
    z = frame_data.filter(like='Y_')
    z_body = frame_data_body.filter(like='Y_')
    # create scatter that has body in blue, and hands in red
    #scatter = ax.scatter(x, y, z, color='blue')
    scatter = ax.scatter(x_body, y_body, z_body*-1, color='red')

    return scatter

# Create the animation
ani = FuncAnimation(fig, update, frames=num_frames, interval=1000/60)

# Close the figure to prevent displaying the first frame
plt.close(fig)

# Save the animation as a video
ani.save('3D_animation.mp4', writer='ffmpeg')

# Display the animation
video_path = '3D_animation.mp4'
Video(video_path, embed=True, width=800)



We can also check the video directly in the notebook

In [8]:
import tempfile
import subprocess

# Create a temporary folder
temp_dir = tempfile.mkdtemp()

# Get the same video
videos = glob.glob(outputf_mask + '*.avi')
videoexample = videos[0]

mp4_path = os.path.join(temp_dir, "converted_rawtrackingvideo.mp4")  # Save MP4 in temp folder


# Convert AVI to MP4 with video and audio
if not os.path.exists(mp4_path):
    subprocess.run([
        "ffmpeg", "-i", videoexample, 
        "-vcodec", "libx264", "-acodec", "aac", "-b:a", "192k",  # Ensure audio is included
        "-crf", "23", "-preset", "fast", mp4_path
    ])

# Display the video inside Jupyter Notebook
Video(mp4_path, embed=True, width=400)