# Step 0: MediaPipe Installation, User Configuration, and Mounting Your Drive
**When using this program, be sure to run all the cells in this step from top to bottom once**

In [1]:
# Install MediaPipe
# You may be prompted to restart the session after installation (Runtime > Restart the session)
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting numpy<2 (from mediapipe)
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.2-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m21.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m23.

In [1]:
# Import libraries
import copy
import cv2
import numpy as np
import mediapipe as mp
import plotly.graph_objects as go
import plotly.colors as pcolors
from tqdm.notebook import tqdm

**Replace the following with your path and your desired parameters**

In [5]:
# --- File paths ---
# Input video paths for pose estimation
pose_video_cam1_path = '/content/drive/MyDrive/3D-Gesture/Proj-ISGS-Private/videos/cam1_pose.mp4'
pose_video_cam2_path = '/content/drive/MyDrive/3D-Gesture/Proj-ISGS-Private/videos/cam2_pose.mp4'

# Output video paths for pose estimation results
output_video_cam1_path = 'pose1.mp4'
output_video_cam2_path = 'pose2.mp4'

# Path for loading camera extrinsic parameters
extrinsics_path = '/content/drive/MyDrive/3D-Gesture/Proj-ISGS-Private/ex.npz'

# Path for saving the calculated 3D keypoints
keypoints_3d_path = '/content/drive/MyDrive/3D-Gesture/Proj-ISGS-Private/kpts3D.npz'


# --- MediaPipe Settings ---
# Set to True to enable detection for face and hands
detect_face = False
detect_hands = False
min_detection_confidence = 0.1  # Minimum confidence value for the detection and tracking to be considered successful


# Number of landmarks for each body part, used for initializing arrays
num_pose_landmarks = 33
num_hand_landmarks = 21
num_face_landmarks = 478

# This defines the connections between pose landmarks to draw a skeleton
pose_connections = [
    (0, 1), (1, 2), (2, 3), (3, 7), (0, 4), (4, 5), (5, 6), (6, 8), (9, 10),  # Face
    (11, 13), (13, 15), (15, 17), (15, 19), (15, 21), (17, 19),               # Left arm
    (12, 14), (14, 16), (16, 18), (16, 20), (16, 22), (18, 20),               # Right arm
    (11, 12), (12, 24), (23, 24), (11, 23),                                   # Torso
    (23, 25), (25, 27), (27, 29), (27, 31), (29, 31),                         # Left leg
    (24, 26), (26, 28), (28, 30), (28, 32), (30, 32)                          # Right leg
]

In [3]:
# Mount Google Drive to access files in your Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Step 1: 2D Pose Estimation from Videos
This step processes videos from two cameras to detect 2D landmarks for pose, face, and hands using MediaPipe. The result videos are saved.

For more details on pose estimation with MediaPipe, please see below:  
https://github.com/google-ai-edge/mediapipe/blob/master/docs/solutions/holistic.md

In [6]:
# Initialize MediaPipe drawing and holistic modules
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_holistic = mp.solutions.holistic

# Predefined input and output paths
input_video_paths = [pose_video_cam1_path, pose_video_cam2_path]
output_video_paths = [output_video_cam1_path, output_video_cam2_path]

# Lists to store the detected landmarks for each camera
pose_landmarks_per_cam = [[], []]
face_landmarks_per_cam = [[], []]
left_hand_landmarks_per_cam = [[], []]
right_hand_landmarks_per_cam = [[], []]


# Converts MediaPipe landmark data to a NumPy array of pixel coordinates
# If landmarks are not detected, returns an array of -1s
def extract_landmarks(landmarks_data, frame_width, frame_height, landmark_type):
    if landmarks_data:
        return np.array(
            [[lm.x * frame_width, lm.y * frame_height] for lm in landmarks_data.landmark],
            dtype=np.float32
        )

    # Determine the number of landmarks to create a placeholder array
    if landmark_type == 'hand':
        num_landmarks = num_hand_landmarks
    elif landmark_type == 'pose':
        num_landmarks = num_pose_landmarks
    else: # 'face'
        num_landmarks = num_face_landmarks

    # Return an array filled with -1, indicating no detection
    return np.full((num_landmarks, 2), -1, dtype=np.float32)


# Initialize the MediaPipe Holistic model
holistic_model = mp_holistic.Holistic(
    static_image_mode=False,
    model_complexity=2,
    smooth_landmarks=True,
    refine_face_landmarks=True,
    min_detection_confidence=min_detection_confidence,
    min_tracking_confidence=min_detection_confidence
)

# Iterate through each video for processing
for cam_idx, (input_path, output_path) in enumerate(zip(input_video_paths, output_video_paths)):

    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video file: {input_path}")

    # Get video properties for the output writer
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    pbar = tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), desc=f'Processing Camera {cam_idx+1}')

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print(f'Finished processing video {cam_idx+1} or encountered an error.')
            break

        pbar.update(1)

        # To improve performance, mark the image as not writeable to pass by reference
        frame.flags.writeable = False
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Run MediaPipe Holistic model
        results = holistic_model.process(frame_rgb)

        # Extract and store landmarks for the current frame
        pose_landmarks_per_cam[cam_idx].append(extract_landmarks(
            results.pose_landmarks, frame_width, frame_height, 'pose'))
        if detect_face:
            face_landmarks_per_cam[cam_idx].append(extract_landmarks(
                results.face_landmarks, frame_width, frame_height, 'face'))
        if detect_hands:
            left_hand_landmarks_per_cam[cam_idx].append(extract_landmarks(
                results.left_hand_landmarks, frame_width, frame_height, 'hand'))
            right_hand_landmarks_per_cam[cam_idx].append(extract_landmarks(
                results.right_hand_landmarks, frame_width, frame_height, 'hand'))

        # Draw landmark on the image
        frame.flags.writeable = True
        mp_drawing.draw_landmarks(
            frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
        )
        if detect_face:
            mp_drawing.draw_landmarks(
                frame, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                landmark_drawing_spec=None,
                connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_contours_style()
            )
        if detect_hands:
            mp_drawing.draw_landmarks(
                frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
            )
            mp_drawing.draw_landmarks(
                frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
            )

        # Write the frame to the output video
        writer.write(frame)

    # Release resources
    cap.release()
    pbar.close()
    writer.release()

Processing Camera 1:   0%|          | 0/879 [00:00<?, ?it/s]

Finished processing video 1 or encountered an error.


Processing Camera 2:   0%|          | 0/879 [00:00<?, ?it/s]

Finished processing video 2 or encountered an error.


# Step 2: 3D Triangulation and Data Storage
This step reconstructs the 3D positions of the keypoints using the 2D detections from both cameras and the pre-calculated extrinsic parameters. The final 3D keypoints are then saved.

In [7]:
# Read camera calibration parameters
params = np.load(extrinsics_path)
cam_matrix1 = params['cam_matrix1']
dist_coeffs1 = params['dist_coeffs1']
cam_matrix2 = params['cam_matrix2']
dist_coeffs2 = params['dist_coeffs2']
rotation2 = params['rotation']
translation2 = params['translation']

# Camera 1 is at the origin
projection_matrix1 = cam_matrix1 @ np.hstack((np.eye(3), np.zeros((3, 1))))
# Camera 2's projection matrix is derived from its rotation and translation
projection_matrix2 = cam_matrix2 @ np.hstack((rotation2, translation2))


# Performs 3D triangulation from 2D keypoints from two cameras
# Filters out frames where keypoints were not detected in either view
def triangulate_keypoints(keypoints_per_cam):
    keypoints_cam1 = np.array(keypoints_per_cam[0])
    keypoints_cam2 = np.array(keypoints_per_cam[1])

    # Identify and remove frames with missing detections
    # A detection is missing if all keypoint coordinates are -1
    is_missing_in_cam1 = (keypoints_cam1 < 0).all(axis=(1, 2))
    is_missing_in_cam2 = (keypoints_cam2 < 0).all(axis=(1, 2))
    frames_to_remove_mask = is_missing_in_cam1 | is_missing_in_cam2

    # Keep only the valid frames
    valid_keypoints_cam1 = keypoints_cam1[~frames_to_remove_mask]
    valid_keypoints_cam2 = keypoints_cam2[~frames_to_remove_mask]

    num_valid_frames = len(valid_keypoints_cam1)
    if num_valid_frames == 0:
        return np.array([]) # Return empty if no valid frames

    # Reshape for triangulation: (num_keypoints * num_frames, 2) -> (2, num_keypoints * num_frames)
    points2d_cam1 = valid_keypoints_cam1.reshape(-1, 2).T
    points2d_cam2 = valid_keypoints_cam2.reshape(-1, 2).T

    # Perform triangulation to get 4D homogeneous coordinates
    points_4d_hom = cv2.triangulatePoints(projection_matrix1, projection_matrix2, points2d_cam1, points2d_cam2)

    # Convert to 3D coordinates by dividing by the 4th coordinate
    points_3d = points_4d_hom[:3, :] / points_4d_hom[3, :]

    # Reshape back to (num_frames, num_keypoints, 3)
    return points_3d.T.reshape(num_valid_frames, -1, 3)


# Calculate 3D keypoints for each body part
pose_3d = triangulate_keypoints(pose_landmarks_per_cam)
face_3d = triangulate_keypoints(face_landmarks_per_cam) if detect_face else None
lhand_3d = triangulate_keypoints(left_hand_landmarks_per_cam) if detect_hands else None
rhand_3d = triangulate_keypoints(right_hand_landmarks_per_cam) if detect_hands else None

# Save the 3D keypoints to a .npz file
np.savez(keypoints_3d_path, pose_3d=pose_3d, face_3d=face_3d, lhand_3d=lhand_3d, rhand_3d=rhand_3d, allow_pickle=True)

print(f'3D keypoints saved to {keypoints_3d_path}')
print(f'Processed {len(pose_3d)} valid frames.')

3D keypoints saved to /content/drive/MyDrive/3D-Gesture/Proj-ISGS-Private/kpts3D.npz
Processed 879 valid frames.


# Step 3: Visualize 3D Trajectories and Heatmaps
This final step loads the 3D keypoints and creates two types of visualizations: a trajectory plot showing the movement of the wrists over time, and a 3D voxel heatmap showing the spatial density of wrist positions.

In [8]:
# User configuration
line_width = 1.5
# How many divisions are made for each axis. Larger values give higher resolution, but also require more calculations
grid_resolution = 8  # Number of bins per axis for the heatmaps

In [9]:
# Load the saved 3D keypoints
data = np.load(keypoints_3d_path, allow_pickle=True)
pose_3d = data['pose_3d']

# Extract trajectories of the left and right wrists (landmarks 15 and 16)
left_wrist_3d = pose_3d[:, 15, :]
right_wrist_3d = pose_3d[:, 16, :]

# Calculate the average pose over all frames for a static skeleton reference
average_pose_landmarks = np.mean(pose_3d, axis=0)

# Creates Plotly Scatter3D traces for drawing a skeleton
bone_x, bone_y, bone_z = [], [], []
for p1_idx, p2_idx in pose_connections:
    p1 = average_pose_landmarks[p1_idx]
    p2 = average_pose_landmarks[p2_idx]
    bone_x.extend([p1[0], p2[0], None])  # 'None' breaks the line
    bone_y.extend([p1[1], p2[1], None])
    bone_z.extend([p1[2], p2[2], None])

skeleton_lines = go.Scatter3d(
    x=bone_x, y=bone_y, z=bone_z,
    mode='lines', line=dict(color='black', width=line_width), name='Average Pose'
)

skeleton_points = go.Scatter3d(
    x=average_pose_landmarks[:, 0], y=average_pose_landmarks[:, 1], z=average_pose_landmarks[:, 2],
    mode='markers', marker=dict(color='black', size=line_width), showlegend=False
)

# Combine all points to find the scene boundaries
all_plot_points = np.vstack([left_wrist_3d, right_wrist_3d, average_pose_landmarks])
min_bound = all_plot_points.min(axis=0)
max_bound = all_plot_points.max(axis=0)
scene_center = (min_bound + max_bound) / 2
max_range = (max_bound - min_bound).max() * 1.1  # Add 10% margin

# Define a cubic plot range for consistent aspect ratio
half_side = max_range / 2
plot_range = [
    [scene_center[0] - half_side, scene_center[0] + half_side],
    [scene_center[1] - half_side, scene_center[1] + half_side],
    [scene_center[2] - half_side, scene_center[2] + half_side]
]

# Set camera position to view the skeleton from the front
camera_pos = {
    'x': scene_center[0],
    'y': scene_center[1] - max_range,  # Position camera in front
    'z': scene_center[2] - max_range * 3
}

# Sets up the 3D scene layout, including aspect ratio, titles, and camera view
def setup_figure_layout(fig, title):
    fig.update_layout(
        title=title,
        scene=dict(
            xaxis_title='X Axis', yaxis_title='Y Axis', zaxis_title='Z Axis',
            xaxis=dict(range=plot_range[0]),
            yaxis=dict(range=plot_range[1]),
            zaxis=dict(range=plot_range[2]),
            camera=dict(eye=camera_pos, up=dict(x=0, y=-1, z=0)),
            aspectmode='cube'  # Enforce cubic aspect ratio
        )
    )

This code visualizes the 3D movement of the left and right wrists over time:
* A static skeleton to provide a reference for the body's average position.
* A red (blue) line that traces the path (trajectory) of the right (left) wrist.

The color of the lines changes from start to end as the color gets progressively darker to help indicate the direction and progression of the movement.

In [10]:
# Plot 1: Wrist Trajectories
trajectory_fig = go.Figure()

# Add the static average skeleton
trajectory_fig.add_traces([skeleton_lines, skeleton_points])

# Create a time array to color the trajectory from start to end
time_colors = np.linspace(0, 1, len(left_wrist_3d))

# Add right wrist trajectory
trajectory_fig.add_trace(go.Scatter3d(
    x=right_wrist_3d[:, 0], y=right_wrist_3d[:, 1], z=right_wrist_3d[:, 2],
    mode='lines', showlegend=False,
    line=dict(color=time_colors, colorscale='Reds', width=line_width*5)
))

# Add left wrist trajectory
trajectory_fig.add_trace(go.Scatter3d(
    x=left_wrist_3d[:, 0], y=left_wrist_3d[:, 1], z=left_wrist_3d[:, 2],
    mode='lines', showlegend=False,
    line=dict(color=time_colors, colorscale='Blues', width=line_width*5)
))

# Apply the common layout settings
setup_figure_layout(trajectory_fig, "Wrist Trajectories (Color Mapped by Time)")
trajectory_fig.show()

This code creates a 3D heatmap showing where the hands spent the most time.
It works by dividing the 3D space into a grid of voxels and counting how often the wrists appeared in each voxel:
* Colored voxels represent the areas occupied by the hands. The more opaque a voxel is, the more time the hand spent in that location. The right (left) wrist is red (blue).
* A static skeleton is the same as above.

In [11]:
# Plot 2: 3D Heatmap

# Creates a Mesh3d object representing a single voxel (cube)
def create_voxel_mesh(position, size, color):
    x, y, z = np.meshgrid(
        np.linspace(position[0] - size[0]/2, position[0] + size[0]/2, 2),
        np.linspace(position[1] - size[1]/2, position[1] + size[1]/2, 2),
        np.linspace(position[2] - size[2]/2, position[2] + size[2]/2, 2)
    )
    return go.Mesh3d(
        x=x.flatten(), y=y.flatten(), z=z.flatten(),
        alphahull=1, flatshading=True, color=color
    )


# Calculates 3D histogram and returns a list of voxel traces for plotting
def get_histogram_traces(data_points, grid_res, color_rgb):
    if len(data_points) == 0:
        return []

    counts, edges = np.histogramdd(sample=data_points, bins=grid_res)
    centers = [(e[:-1] + e[1:]) / 2 for e in edges]
    voxel_size = [(e[1] - e[0]) for e in edges]

    traces = []
    max_count = counts.max()
    if max_count == 0: return []

    # Iterate through all bins and create a voxel if the count is > 0
    it = np.nditer(counts, flags=['multi_index'])
    for count in it:
        if count > 0:
            idx = it.multi_index
            pos = [centers[0][idx[0]], centers[1][idx[1]], centers[2][idx[2]]]
            opacity = 0.8 * count / max_count
            color = f'rgba({color_rgb}, {opacity})'
            traces.append(create_voxel_mesh(pos, voxel_size, color))

    return traces


histogram_fig = go.Figure()

# Add the static average skeleton for reference
histogram_fig.add_traces([skeleton_lines, skeleton_points])

# Add histogram traces for both hands
print(f"--- Creating 3D histograms (Resolution: {grid_resolution}^3) ---")
histogram_fig.add_traces(get_histogram_traces(right_wrist_3d, grid_resolution, '255,100,0'))  # Red
histogram_fig.add_traces(get_histogram_traces(left_wrist_3d, grid_resolution, '0,100,255'))  # Blue

# Apply the common layout settings
setup_figure_layout(histogram_fig, "3D Heatmap of Wrist Positions")
histogram_fig.show()

--- Creating 3D histograms (Resolution: 8^3) ---
