# Visualization utilities

In [5]:
#@markdown To better demonstrate the Pose Landmarker API, we have created a set of visualization tools that will be used in this colab. These will draw the landmarks on a detect person, as well as the expected connections between those markers.

from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np


# ----------------------------
# STEP 4: Draw landmarks on image
# ----------------------------
def draw_landmarks_on_image(rgb_image, detection_result):
    """
    rgb_image: NumPy array (RGB)
    detection_result: PoseLandmarkerResult from detector.detect()
    Returns: annotated_image (RGB NumPy array) ready for Pillow display
    """
    # Convert RGB -> BGR for drawing
    annotated_image = cv2.cvtColor(np.copy(rgb_image), cv2.COLOR_RGB2BGR)

    if detection_result.pose_landmarks:
        for pose_landmarks_list in detection_result.pose_landmarks:
            # Wrap the list of NormalizedLandmark into NormalizedLandmarkList proto
            pose_proto = landmark_pb2.NormalizedLandmarkList()
            pose_proto.landmark.extend([
                landmark_pb2.NormalizedLandmark(x=l.x, y=l.y, z=l.z)
                for l in pose_landmarks_list
            ])
            # Draw landmarks
            mp_drawing.draw_landmarks(
                annotated_image,
                pose_proto,
                mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                connection_drawing_spec=mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2)
            )

    # Convert back to RGB for Pillow
    annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
    return annotated_image

## Download test image

In [6]:
import urllib.request
import sys
import os
from PIL import Image

# ----------------------------
# 1. Download the image
# ----------------------------
url = "https://cdn.pixabay.com/photo/2019/03/12/20/39/girl-4051811_960_720.jpg"
image_path = "image.jpg"

if not os.path.exists(image_path):
    headers = {'User-Agent': 'Mozilla/5.0'}
    req = urllib.request.Request(url, headers=headers)
    with urllib.request.urlopen(req) as response, open(image_path, 'wb') as out_file:
        out_file.write(response.read())

# ----------------------------
# 2. Open the image with Pillow
# ----------------------------
img = Image.open(image_path)

# ----------------------------
# 3. Display the image
# ----------------------------
IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    # If in Colab, use cv2_imshow to display inline
    import cv2
    import numpy as np
    from google.colab.patches import cv2_imshow

    img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    cv2_imshow(img_cv)
else:
    # On local machine, open in default image viewer (separate window)
    img.show()  # fully interactive and closable window



Optionally, you can upload your own image. If you want to do so, uncomment and run the cell below.

In [3]:
# from google.colab import files
# uploaded = files.upload()

# for filename in uploaded:
#   content = uploaded[filename]
#   with open(filename, 'wb') as f:
#     f.write(content)

# if len(uploaded.keys()):
#   IMAGE_FILE = next(iter(uploaded))
#   print('Uploaded file:', IMAGE_FILE)

In [None]:
'''import sys
import os
from PIL import Image

# ----------------------------
# 1. Detect environment
# ----------------------------
IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    # Colab file upload
    from google.colab import files
    uploaded = files.upload()

    IMAGE_FILE = None
    if len(uploaded.keys()):
        IMAGE_FILE = next(iter(uploaded))
        # Save file locally
        with open(IMAGE_FILE, 'wb') as f:
            f.write(uploaded[IMAGE_FILE])
        print('Uploaded file:', IMAGE_FILE)

else:
    # Local Python / VS Code
    # Ask user to provide a path or choose file interactively
    IMAGE_FILE = input("Enter path to image file: ").strip()
    if not os.path.exists(IMAGE_FILE):
        raise FileNotFoundError(f"File not found: {IMAGE_FILE}")
    print('Using local file:', IMAGE_FILE)

# ----------------------------
# 2. Read and display the image
# ----------------------------
# Open the image using Pillow
img = Image.open(IMAGE_FILE)

if IN_COLAB:
    # Convert Pillow image to OpenCV format for inline display in Colab
    import cv2
    import numpy as np
    img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    from google.colab.patches import cv2_imshow
    cv2_imshow(img_cv)
else:
    # Local machine: open in a separate, closable window
    img.show()  # This opens the image in the default image viewer
'''

'# ----------------------------\n# 1. Detect environment\n# ----------------------------\nIN_COLAB = "google.colab" in sys.modules\n\nif IN_COLAB:\n    # Colab file upload\n    from google.colab import files\n    uploaded = files.upload()\n\n    IMAGE_FILE = None\n    if len(uploaded.keys()):\n        IMAGE_FILE = next(iter(uploaded))\n        # Save file locally\n        with open(IMAGE_FILE, \'wb\') as f:\n            f.write(uploaded[IMAGE_FILE])\n        print(\'Uploaded file:\', IMAGE_FILE)\n\nelse:\n    # Local Python / VS Code\n    # Ask user to provide a path or choose file interactively\n    IMAGE_FILE = input("Enter path to image file: ").strip()\n    if not os.path.exists(IMAGE_FILE):\n        raise FileNotFoundError(f"File not found: {IMAGE_FILE}")\n    print(\'Using local file:\', IMAGE_FILE)\n\n# ----------------------------\n# 2. Read and display the image\n# ----------------------------\n\nimg = cv2.imread(IMAGE_FILE)\n\nif IN_COLAB:\n    from google.colab.patches impor

## Running inference and visualizing the results
The final step is to run pose landmark detection on your selected image. This involves creating your PoseLandmarker object, loading your image, running detection, and finally, the optional step of displaying the image with visualizations.

Check out the [MediaPipe documentation](https://developers.google.com/mediapipe/solutions/vision/pose_landmarker/python) to learn more about configuration options that this solution supports.

In [7]:
# STEP 0: Imports
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import sys
import numpy as np
from PIL import Image
import cv2  # needed only for Colab inline display and BGR<->RGB conversion

# Detect environment
IN_COLAB = "google.colab" in sys.modules
if IN_COLAB:
    from google.colab.patches import cv2_imshow

# STEP 0.1: Drawing utils
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
from mediapipe.framework.formats import landmark_pb2

# ----------------------------
# STEP 1: Create PoseLandmarker
# ----------------------------
model_path = '../models/pose_landmarker_heavy.task'
base_options = python.BaseOptions(model_asset_path=model_path)
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    output_segmentation_masks=True
)
detector = vision.PoseLandmarker.create_from_options(options)

# ----------------------------
# STEP 2: Load input image
# ----------------------------
image_path = "image.jpg"
image = mp.Image.create_from_file(image_path)
image_np = image.numpy_view()  # RGB NumPy array

# ----------------------------
# STEP 3: Detect pose landmarks
# ----------------------------
detection_result = detector.detect(image)

annotated_image = draw_landmarks_on_image(image_np, detection_result)

# ----------------------------
# STEP 5: Display annotated image
# ----------------------------
if IN_COLAB:
    cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))
else:
    pil_img = Image.fromarray(annotated_image)
    pil_img.show()  # fully interactive, closable window




I0000 00:00:1765968841.104998  162395 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 90.5), renderer: Apple M1 Max
W0000 00:00:1765968841.181968  172436 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1765968841.222268  172443 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


Visualize the pose segmentation mask.

In [6]:
 # ----------------------------
# STEP 6: Display segmentation mask (optional)
# ----------------------------
if detection_result.segmentation_masks:
    mask = detection_result.segmentation_masks[0].numpy_view()  # HxW array
    # Convert mask to 3-channel RGB
    mask_rgb = np.repeat(mask[:, :, np.newaxis], 3, axis=2) * 255
    mask_rgb = mask_rgb.astype(np.uint8)

    if IN_COLAB:
        cv2_imshow(mask_rgb)
    else:
        pil_mask = Image.fromarray(mask_rgb)
        pil_mask.show()  # separate, closable window

In [7]:
import csv

pose_landmarks_list = detection_result.pose_landmarks

with open("../data/pose_landmarks.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["pose_idx", "landmark_idx", "x", "y", "z"])
    for pose_idx, pose_landmarks in enumerate(pose_landmarks_list):
        for lm_idx, lm in enumerate(pose_landmarks):
            writer.writerow([pose_idx, lm_idx, lm.x, lm.y, lm.z])


# For Videos

In [2]:
# -------------------------------------------------------
# MediaPipe PoseLandmarker
# Export pose landmarks to CSV + annotated video (macOS)
# -------------------------------------------------------

import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
from mediapipe.framework.formats import landmark_pb2
import csv
import sys
import subprocess

# ----------------------------
# CONFIG
# ----------------------------
VIDEO_PATH = "../data/videos/video.mp4"
MODEL_PATH = "../models/pose_landmarker_heavy.task"
OUT_VIDEO = "../data/annotated_videos/annotated_video.mp4"
OUT_CSV = "../data/csv/pose_landmarks2.csv"

# ----------------------------
# STEP 1: PoseLandmarker
# ----------------------------
base_options = python.BaseOptions(model_asset_path=MODEL_PATH)
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    output_segmentation_masks=False
)
detector = vision.PoseLandmarker.create_from_options(options)

# ----------------------------
# STEP 2: Video I/O
# ----------------------------
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    print("‚ùå Could not open video.")
    sys.exit(1)

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(OUT_VIDEO, fourcc, fps, (width, height))

# ----------------------------
# STEP 3: CSV writer
# ----------------------------
csv_file = open(OUT_CSV, "w", newline="")
csv_writer = csv.writer(csv_file)

csv_writer.writerow([
    "frame",
    "timestamp_ms",
    "landmark_id",
    "x_norm",
    "y_norm",
    "z_norm",
    "visibility",
    "x_px",
    "y_px"
])

print(f"‚ñ∂ Processing {total_frames} frames...")

# ----------------------------
# STEP 4: Frame loop
# ----------------------------
frame_idx = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # BGR ‚Üí RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)

    timestamp_ms = int((frame_idx / fps) * 1000)

    result = detector.detect_for_video(mp_image, timestamp_ms)

    annotated_rgb = np.copy(frame_rgb)

    if result.pose_landmarks:
        pose_landmarks = result.pose_landmarks[0]  # first person

        # ----- EXPORT LANDMARKS -----
        for lm_id, lm in enumerate(pose_landmarks):
            x_px = int(lm.x * width)
            y_px = int(lm.y * height)

            csv_writer.writerow([
                frame_idx,
                timestamp_ms,
                lm_id,
                lm.x,
                lm.y,
                lm.z,
                lm.visibility,
                x_px,
                y_px
            ])

        # ----- DRAW LANDMARKS -----
        pose_proto = landmark_pb2.NormalizedLandmarkList()
        pose_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(
                x=lm.x, y=lm.y, z=lm.z
            ) for lm in pose_landmarks
        ])

        mp.solutions.drawing_utils.draw_landmarks(
            annotated_rgb,
            pose_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
                color=(0, 255, 0), thickness=2, circle_radius=2
            ),
            connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
                color=(255, 0, 0), thickness=2
            )
        )

    # RGB ‚Üí BGR
    annotated_bgr = cv2.cvtColor(annotated_rgb, cv2.COLOR_RGB2BGR)
    writer.write(annotated_bgr)

    # Progress
    if frame_idx % 30 == 0:
        pct = 100 * frame_idx / total_frames
        print(f"\r‚è≥ {frame_idx}/{total_frames} ({pct:.1f}%)", end="")

    frame_idx += 1

# ----------------------------
# STEP 5: Cleanup
# ----------------------------
cap.release()
writer.release()
csv_file.close()

print("\n‚úÖ Done!")
print(f"üé¨ Video: {OUT_VIDEO}")
print(f"üìä CSV:   {OUT_CSV}")

# ----------------------------
# STEP 6: Open in QuickTime
# ----------------------------
try:
    subprocess.run(["open", OUT_VIDEO])
except Exception:
    pass


I0000 00:00:1766145571.903250  111825 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 90.5), renderer: Apple M1 Max
W0000 00:00:1766145571.987698  121126 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1766145572.019666  121126 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1766145572.121951  121133 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


‚ñ∂ Processing 205 frames...
‚è≥ 180/205 (87.8%)
‚úÖ Done!
üé¨ Video: ../data/annotated_videos/annotated_video.mp4
üìä CSV:   ../data/csv/pose_landmarks2.csv


# use with a link 

In [None]:
# -------------------------------------------------------
# MediaPipe PoseLandmarker
# Process video directly from a YouTube / online link
# -------------------------------------------------------

import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
from mediapipe.framework.formats import landmark_pb2
import csv
import sys
import subprocess
import json

# ----------------------------
# CONFIG
# ----------------------------
VIDEO_URL = "https://www.youtube.com/watch?v=XXXXXX"
MODEL_PATH = "../models/pose_landmarker_heavy.task"
OUT_VIDEO = "annotated_video.mp4"
OUT_CSV = "pose_landmarks.csv"

# ----------------------------
# STEP 0: Resolve direct video stream URL
# ----------------------------
print("üîó Resolving video stream URL...")

proc = subprocess.run(
    [
        "yt-dlp",
        "-f", "best[ext=mp4]/best",
        "-g",
        "--no-playlist",
        VIDEO_URL
    ],
    capture_output=True,
    text=True,
    check=True
)

STREAM_URL = proc.stdout.strip().split("\n")[0]

if not STREAM_URL.startswith("http"):
    print("‚ùå Failed to resolve video stream.")
    sys.exit(1)

# ----------------------------
# STEP 1: PoseLandmarker
# ----------------------------
base_options = python.BaseOptions(model_asset_path=MODEL_PATH)
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.VIDEO,
    output_segmentation_masks=False
)
detector = vision.PoseLandmarker.create_from_options(options)

# ----------------------------
# STEP 2: Video I/O (stream URL)
# ----------------------------
cap = cv2.VideoCapture(STREAM_URL)
if not cap.isOpened():
    print("‚ùå Could not open video stream.")
    sys.exit(1)

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps    = cap.get(cv2.CAP_PROP_FPS)

if fps <= 0:
    fps = 30.0  # fallback (important!)

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(OUT_VIDEO, fourcc, fps, (width, height))

# ----------------------------
# STEP 3: CSV writer
# ----------------------------
csv_file = open(OUT_CSV, "w", newline="")
csv_writer = csv.writer(csv_file)

csv_writer.writerow([
    "frame",
    "timestamp_ms",
    "landmark_id",
    "x_norm",
    "y_norm",
    "z_norm",
    "visibility",
    "x_px",
    "y_px"
])

print("‚ñ∂ Processing stream...")

# ----------------------------
# STEP 4: Frame loop
# ----------------------------
frame_idx = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)

    timestamp_ms = int((frame_idx / fps) * 1000)

    result = detector.detect_for_video(mp_image, timestamp_ms)

    annotated_rgb = np.copy(frame_rgb)

    if result.pose_landmarks:
        pose_landmarks = result.pose_landmarks[0]

        # ----- EXPORT LANDMARKS -----
        for lm_id, lm in enumerate(pose_landmarks):
            x_px = int(lm.x * width)
            y_px = int(lm.y * height)

            csv_writer.writerow([
                frame_idx,
                timestamp_ms,
                lm_id,
                lm.x,
                lm.y,
                lm.z,
                lm.visibility,
                x_px,
                y_px
            ])

        # ----- DRAW LANDMARKS -----
        pose_proto = landmark_pb2.NormalizedLandmarkList()
        pose_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z)
            for lm in pose_landmarks
        ])

        mp.solutions.drawing_utils.draw_landmarks(
            annotated_rgb,
            pose_proto,
            mp.solutions.pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
                color=(0, 255, 0), thickness=2, circle_radius=2
            ),
            connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
                color=(255, 0, 0), thickness=2
            )
        )

    annotated_bgr = cv2.cvtColor(annotated_rgb, cv2.COLOR_RGB2BGR)
    writer.write(annotated_bgr)

    if frame_idx % 30 == 0:
        print(f"\r‚è≥ Frame {frame_idx}", end="")

    frame_idx += 1

# ----------------------------
# STEP 5: Cleanup
# ----------------------------
cap.release()
writer.release()
csv_file.close()

print("\n‚úÖ Done!")
print(f"üé¨ Video: {OUT_VIDEO}")
print(f"üìä CSV:   {OUT_CSV}")
