In [None]:
import json
from pathlib import Path

import cv2
import mediapipe as mp

mp_pose = mp.solutions.pose


def normalized_to_pixel(coord, image_shape):
    h, w = image_shape[:2]
    return int(coord.x * w), int(coord.y * h)


def keypoints_to_dict(landmarks, image_shape):
    """Convert MediaPipe landmarks to a dict of named pixel coords."""
    names = mp_pose.PoseLandmark
    kp = {}
    for lm_name in names:
        idx = lm_name.value
        lm = landmarks[idx]
        x, y = normalized_to_pixel(lm, image_shape)
        kp[lm_name.name] = {
            "x": x,
            "y": y,
            "visibility": lm.visibility if hasattr(lm, "visibility") else None,
        }
    return kp


def get_project_dirs():
    """Return Path objects for the shared video/keypoint directories."""
    if "__file__" in globals():
        base = Path(__file__).resolve().parent.parent
    else:
        base = Path.cwd()
        if base.name == "src":
            base = base.parent
    data_dir = base / "data"
    video_dir = data_dir / "video"
    keypoint_dir = data_dir / "keypoint"
    video_dir.mkdir(parents=True, exist_ok=True)
    keypoint_dir.mkdir(parents=True, exist_ok=True)
    return video_dir, keypoint_dir


video_folder, keypoint_folder = get_project_dirs()

# Get list of video files
video_files = sorted(video_folder.glob("*.mp4"))

for video_file in video_files:
    base_name = video_file.stem
    output_path = keypoint_folder / f"{base_name}.json"

    print(f"Processing {video_file.name}...")

    cap = cv2.VideoCapture(str(video_file))
    if not cap.isOpened():
        print(f"Error opening video file {video_file.name}")
        continue

    frame_count = 0
    all_keypoints = []

    with mp_pose.Pose(
        min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=1
    ) as pose:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Convert the BGR image to RGB.
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Process the image and find poses
            results = pose.process(image_rgb)

            if results.pose_landmarks:
                kp = keypoints_to_dict(results.pose_landmarks.landmark, frame.shape)
                all_keypoints.append({"frame": frame_count, "keypoints": kp})

            frame_count += 1

    cap.release()

    # Save to JSON
    with open(output_path, "w") as f:
        json.dump(all_keypoints, f, indent=2)

    print(f"Saved keypoints to {output_path}")

print("All videos processed.")


In [None]:
import json

import matplotlib.pyplot as plt
import mediapipe as mp
from matplotlib.animation import FuncAnimation, PillowWriter

# Settings
_, keypoint_folder = get_project_dirs()

# Find a JSON file
json_files = sorted(keypoint_folder.glob("*.json"))

if not json_files:
    print("No keypoint JSON files found in", keypoint_folder)
else:
    # Process the first file found for demonstration
    target_file = json_files[0]
    json_path = target_file
    print(f"Visualizing {target_file.name}...")

    with open(json_path, "r") as f:
        data = json.load(f)

    if not data:
        print("JSON file is empty.")
    else:
        # Setup MediaPipe constants
        mp_pose = mp.solutions.pose
        # Convert connections set to list for consistent indexing
        connections = list(mp_pose.POSE_CONNECTIONS)

        # Setup Plot
        fig, ax = plt.subplots(figsize=(6, 6))

        # Determine plot limits based on all frames
        all_x = []
        all_y = []
        for frame in data:
            for kp in frame["keypoints"].values():
                all_x.append(kp["x"])
                all_y.append(kp["y"])

        if all_x and all_y:
            # Add some padding
            pad = 50
            ax.set_xlim(min(all_x) - pad, max(all_x) + pad)
            ax.set_ylim(max(all_y) + pad, min(all_y) - pad)  # Invert Y for image coords
            ax.set_aspect("equal")
            ax.set_title(f"Keypoint Visualization: {target_file.name}")

        # Initialize plot elements
        # Scatter for joints
        scat = ax.scatter([], [], s=20, c="red")
        # Lines for bones
        lines = [ax.plot([], [], "b-", lw=2)[0] for _ in connections]

        def update(frame_idx):
            frame_info = data[frame_idx]
            keypoints = frame_info["keypoints"]

            # Update joints
            x_vals = []
            y_vals = []
            for kp_name, kp_data in keypoints.items():
                x_vals.append(kp_data["x"])
                y_vals.append(kp_data["y"])

            scat.set_offsets(list(zip(x_vals, y_vals)))

            # Update bones
            for line, connection in zip(lines, connections):
                start_idx, end_idx = connection
                start_name = mp_pose.PoseLandmark(start_idx).name
                end_name = mp_pose.PoseLandmark(end_idx).name

                if start_name in keypoints and end_name in keypoints:
                    start_pt = keypoints[start_name]
                    end_pt = keypoints[end_name]
                    line.set_data(
                        [start_pt["x"], end_pt["x"]], [start_pt["y"], end_pt["y"]]
                    )
                else:
                    line.set_data([], [])

            return [scat] + lines

        # Create Animation
        ani = FuncAnimation(fig, update, frames=len(data), interval=50, blit=True)

        # Save GIF
        output_path = keypoint_folder / f"{target_file.stem}.gif"
        print(f"Saving GIF to {output_path}...")
        ani.save(str(output_path), writer=PillowWriter(fps=20))
        print("Done.")
        plt.close(fig)
