In [None]:
import cv2

# Load the video
video_path = 'output.mp4'  # Replace with your video path
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video.")
else:
    # Get FPS
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(f"FPS: {fps}")

    # Get total number of frames
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total Frames: {total_frames}")

    # Calculate video duration (in seconds)
    duration_seconds = total_frames / fps
    print(f"Duration: {duration_seconds} seconds")

    # Optionally, you can release the video capture object if needed
    cap.release()

Mediapipe observation : 
- Ran on few videos and realised that Mediapipe Pose landmarkers are not able to detect a person if the head is not shown in the frame. Likely because of the top down approach of pose detection.
- Pose landmarkers are trained for closeup cases, so it is difficult to detect a person from far away in the frame. 

In [None]:
!wget -O /video_output/fight_1.mp4 'https://dm0qx8t0i9gc9.cloudfront.net/watermarks/video/rWFRtEk4Ejrjb627h/videoblocks-2083z_wresathletmd005_halnigvqm5__d4ecc47fac438673c76503676f653e7d__P360.mp4'

In [None]:
!pip install ultralytics
!pip install opencv-python
!pip install ipywidgets

In [None]:
import os
from ultralytics import YOLO
import cv2 
import time # To estimate FPS if needed
from datetime import datetime  # Import datetime here
import json
import torch

MODEL_NAME = 'yolo11s-pose.pt'
VIDEO_SOURCE_DIR = 'video_data'
OUTPUT_DIR = 'pose_outputs'
RUN_NAME = datetime.now().strftime('%Y%m%d')
CONFIDENCE_THRESHOLD = 0.5

# Create output directory if it doesn't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
output_run_dir = os.path.join(OUTPUT_DIR, RUN_NAME)
os.makedirs(output_run_dir, exist_ok=True) # Ensure the run-specific directory exists

# Check if the video file exists
if not os.path.exists(VIDEO_SOURCE_DIR):
    print(f"Error: Video files path not found at {VIDEO_SOURCE_DIR}")
else:
    print(f"Loading YOLO model: {MODEL_NAME}")
    model = YOLO(MODEL_NAME)
    print("Model loaded successfully.")

    for filename in os.listdir(VIDEO_SOURCE_DIR):
        
        VIDEO_SOURCE_PATH = os.path.join(VIDEO_SOURCE_DIR, filename)

        cap = cv2.VideoCapture(VIDEO_SOURCE_PATH)
        if not cap.isOpened():
            print(f"Error: Could not open video source at {VIDEO_SOURCE_PATH}")
        else:
            frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)
            # If FPS read is 0 or invalid, use a default
            if fps <= 0:
                print("Warning: Could not read FPS from video source, using default 25 FPS.")
                fps = 25
            cap.release() 

            # --- Define Video Writer ---
            output_video_filename = os.path.splitext(os.path.basename(VIDEO_SOURCE_PATH))[0] + '_annotated.mp4'
            output_json_filename = os.path.splitext(os.path.basename(VIDEO_SOURCE_PATH))[0] + '_keypoints.json'
            output_video_path = os.path.join(output_run_dir, output_video_filename)
            output_json_path = os.path.join(output_run_dir, output_json_filename)

            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

            if not writer.isOpened():
                print(f"Error: Could not open video writer for path {output_video_path}. Check codec support ('mp4v').")
            else:
                print(f"Starting pose estimation, output will be saved to: {output_video_path}")
                # --- Run Pose Estimation (WITHOUT internal save) ---
                results_generator = model(
                    source=VIDEO_SOURCE_PATH,
                    stream=True,
                    save=False, # <<< Important: Set save to False or defaults to avi
                    conf=CONFIDENCE_THRESHOLD,
                    task='pose',
                    show=False,
                    imgsz=(frame_height, frame_width) # Optional: Ensure model uses consistent size
                )

                # --- Process Results and Write Frames ---
                frame_data = []
                frame_count = 0
                start_time = time.time()
                print("Processing frames and writing MP4...")
                try:
                    for frame_number, result in enumerate(results_generator):
                        
                        annotated_frame = result.plot() # This returns a NumPy array (BGR format)

                        if annotated_frame.shape[1] != frame_width or annotated_frame.shape[0] != frame_height:
                            annotated_frame = cv2.resize(annotated_frame, (frame_width, frame_height))

                        # Write the frame to the video file
                        writer.write(annotated_frame)

                        frame_keypoints = []
                        for person in result.keypoints.xy:
                            keypoints = person.cpu().numpy().tolist()  # Convert tensor to list
                            frame_keypoints.append(keypoints)

                        frame_data.append({
                            "frame_number": frame_count,
                            "keypoints": frame_keypoints
                        })         

                        frame_count = frame_number + 1
                        if frame_count % 50 == 0:
                            elapsed_time = time.time() - start_time
                            current_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
                            print(f"  Processed {frame_count} frames... (Current FPS: {current_fps:.2f})")

                except Exception as e:
                    print(f"\nAn error occurred during processing/writing: {e}")
                finally:
                    # Release the video writer
                    writer.release()
                    end_time = time.time()
                    print(f"\nFinished processing and writing video.")
                    print(f"Total frames processed: {frame_count}")
                    total_time = end_time - start_time
                    avg_fps = frame_count / total_time if total_time > 0 else 0
                    print(f"Total time: {total_time:.2f} seconds")
                    print(f"Average processing FPS: {avg_fps:.2f}")
                    print(f"✅ Output MP4 video saved successfully at: {output_video_path}")
                    with open(output_json_path, 'w') as json_file:
                     json.dump(frame_data, json_file, indent=4)


In [17]:
from ultralytics import YOLO

MODEL_NAME = 'yolo11s-pose.pt'

model = YOLO(MODEL_NAME)

result = model.track( source='video_data/', save=True)



errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/3 (frame 1/559) /home/shad/work/stick-figure-fight-generator/stick-figure-fight-generator/video_data/fight_1.mp4: 384x640 1 person, 25.4ms
video 1/3 (frame 2/559) /home/shad/work/stick-figure-fight-generator/stick-figure-fight-generator/video_data/fight_1.mp4: 384x640 1 person, 23.2ms
video 1/3 (frame 3/559) /home/shad/work/stick-figure-fight-generator/stick-figure-fight-generator/video_data/fight_1.mp4: 384x640 1 person, 12.0ms
video 1/3 (frame 4/559) /home/shad/work/stick-figure-fight-generator/stick-figure-fight-generator/video_data/fight_1.mp4: 384x640 1

In [29]:
import cv2
import numpy as np
import torch

# Assuming you have a list of frames, each containing keypoint data

# Function to draw keypoints on an image
def draw_keypoints(image, xy, xyn=None, conf=None):
    for point in xy:
        cv2.circle(image, (int(point[0]), int(point[1])), 5, (0, 0, 255), -1)
    return image

# Function to join frames into a GIF
def create_gif(frames, output_path='output.gif'):
    # Create a VideoWriter object to save the GIF
    height, width = frames[0].keypoints.data.cpu().shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'gif ')
    out = cv2.VideoWriter(output_path, fourcc, 1.0, (width, height))

    for frame in frames:
        img = np.zeros((height, width, 3), dtype=np.uint8)
        keypoints_data = frame.keypoints.data.cpu().permute(1, 2, 0).numpy() * 255
        if keypoints_data.shape != (height, width, frame.keypoints.data.cpu().shape[0]):
            raise ValueError("Keypoint data dimensions do not match image dimensions")
        
        img[:height, :width] = keypoints_data
        img = draw_keypoints(img.clone(), frame['xy'])
        out.write(img)

    out.release()

# Example usage
create_gif(result, 'keypoint_animation.gif')

[ WARN:0@6367.409] global cap.cpp:779 open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.11.0) /io/opencv/modules/videoio/src/cap_images.cpp:415: error: (-215:Assertion failed) !filename_pattern.empty() in function 'CvVideoWriter_Images'




ValueError: Keypoint data dimensions do not match image dimensions

In [None]:
import pandas as pd

data = []
# Assuming 'results' is from your prediction
for frame_id, result in enumerate(results):
    for person_id, person in enumerate(result.keypoints):
        keypoints = person.xy  # Get keypoints
        confidences = person.conf  # Get confidences
        for kp_id, (xy, conf) in enumerate(zip(keypoints, confidences)):
            label = labels[kp_id]  # Get the label from predefined labels list
            data.append([frame_id, person_id, label, *xy, conf])

# Convert the data into a pandas DataFrame
df = pd.DataFrame(data, columns=['frame_id', 'person_id', 'label', 'x', 'y', 'confidence'])
