In [1]:
# %% [markdown]
# # Sprint Video Pose (YOLOv8) — Single "Most Moving" Subject
# - Reports FPS & frame count
# - Real-time playback with ONE selected runner (most moving / highest conf)
# - Exports a chosen frame range as stamped images (frame#, timestamp)
# - Writes a CSV manifest

# If needed, uncomment:
# !pip -q install ultralytics opencv-python tqdm pillow

import os, csv, math
from pathlib import Path
from datetime import timedelta

import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO

# ---------- USER SETTINGS ----------
VIDEO_PATH   = r"output_masked.mp4"   # ← change this
OUTPUT_DIR   = Path("./exported_frames_yolo_single")
MODEL_WEIGHTS = "yolov8x-pose.pt"
CONF_THR      = 0.25      # detection confidence threshold
IMGSZ         = 960       # inference size (reduce if CPU is slow)
CONF_GAMMA    = 0.5       # how much to weight detection conf in score (0=ignore conf)
HYSTERESIS    = 0.2       # keep previous subject if its score >= (1 - HYSTERESIS)*top_score
MIN_BBOX_AREA = 900       # skip tiny people (in pixels^2)
DO_ANNOTATE   = True      # draw skeleton on exported frames
# -----------------------------------

OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Open video & report stats
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video: {VIDEO_PATH}")

FPS = cap.get(cv2.CAP_PROP_FPS) or 0.0
FRAME_COUNT = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
WIDTH  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
DURATION_SEC = (FRAME_COUNT / FPS) if FPS > 0 else 0.0

print(f"Video: {VIDEO_PATH}")
print(f"Resolution: {WIDTH} x {HEIGHT}")
print(f"FPS: {FPS:.3f}")
print(f"Total frames: {FRAME_COUNT}")
print(f"Duration: {DURATION_SEC:.2f} s")

# --- Utilities ---
def fmt_timecode(frame_idx: int, fps: float) -> str:
    """hh:mm:ss.mmm for a frame index at given fps."""
    if fps <= 0:
        return "00:00:00.000"
    total_ms = int(round((frame_idx / fps) * 1000))
    h = total_ms // (3600 * 1000)
    rem = total_ms % (3600 * 1000)
    m = rem // (60 * 1000)
    rem %= (60 * 1000)
    s = rem // 1000
    ms = rem % 1000
    return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"

def iou_xyxy(a, b):
    """IoU for boxes in [x1,y1,x2,y2]. Returns 0..1."""
    ax1, ay1, ax2, ay2 = a
    bx1, by1, bx2, by2 = b
    inter_x1, inter_y1 = max(ax1, bx1), max(ay1, by1)
    inter_x2, inter_y2 = min(ax2, bx2), min(ay2, by2)
    iw, ih = max(0, inter_x2 - inter_x1), max(0, inter_y2 - inter_y1)
    inter = iw * ih
    area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1)
    area_b = max(0, bx2 - bx1) * max(0, by2 - by1)
    denom = area_a + area_b - inter + 1e-9
    return inter / denom

# COCO keypoint edges for drawing a simple skeleton
COCO_EDGES = [
    (5,7), (7,9),      # left shoulder->elbow->wrist
    (6,8), (8,10),     # right shoulder->elbow->wrist
    (5,6),             # shoulders
    (5,11), (6,12),    # shoulders->hips
    (11,12),           # hips
    (11,13), (13,15),  # left hip->knee->ankle
    (12,14), (14,16),  # right hip->knee->ankle
    (0,5), (0,6),      # nose->shoulders
    (0,1), (0,2), (1,3), (2,4)  # head
]

def draw_skeleton(image, kps_xy, kps_conf=None, kp_thr=0.2, color=(0,255,0)):
    """Draws a COCO-17 skeleton for one person."""
    # Points
    for i, (x, y) in enumerate(kps_xy):
        if kps_conf is not None and kps_conf[i] < kp_thr: 
            continue
        cv2.circle(image, (int(x), int(y)), 3, color, -1, cv2.LINE_AA)
    # Lines
    for i, j in COCO_EDGES:
        if i >= len(kps_xy) or j >= len(kps_xy):
            continue
        if kps_conf is not None and (kps_conf[i] < kp_thr or kps_conf[j] < kp_thr):
            continue
        xi, yi = kps_xy[i]
        xj, yj = kps_xy[j]
        cv2.line(image, (int(xi), int(yi)), (int(xj), int(yj)), color, 2, cv2.LINE_AA)

# Load model
pose_model = YOLO(MODEL_WEIGHTS)
print("YOLOv8-Pose model loaded.")


Video: output_masked.mp4
Resolution: 1920 x 1080
FPS: 30.000
Total frames: 371
Duration: 12.37 s
YOLOv8-Pose model loaded.


In [2]:
# %% [markdown]
# ## Real-time playback (ONE subject: "most moving" / weighted by confidence)
# Keys: 'q' quit, 'p' pause/resume

win_name = "YOLOv8 Pose — Single Subject"
cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)

delay_ms = int(1000 / FPS) if FPS > 0 else 1
delay_ms = max(1, delay_ms)

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video: {VIDEO_PATH}")

prev_gray = None
prev_subject_box = None  # [x1,y1,x2,y2]

while True:
    ok, frame = cap.read()
    if not ok:
        break

    # Optical flow
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow_mag = None
    if prev_gray is not None:
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray,
                                            None, 0.5, 3, 15, 3, 5, 1.1, 0)
        mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
        flow_mag = mag

    # Pose inference
    results = pose_model.predict(frame, conf=CONF_THR, imgsz=IMGSZ, verbose=False)
    r = results[0]

    best_idx = None
    best_score = -1.0
    best_box  = None

    if len(r.boxes) > 0 and r.keypoints is not None:
        boxes_xyxy = r.boxes.xyxy.cpu().numpy()
        confs      = r.boxes.conf.cpu().numpy().reshape(-1)
        kps_xy     = r.keypoints.xy.cpu().numpy()      # [N, 17, 2]
        kps_conf   = getattr(r.keypoints, "conf", None)
        kps_conf   = kps_conf.cpu().numpy() if kps_conf is not None else None

        # Find candidate matching previous subject (for hysteresis)
        prev_match_idx = None
        if prev_subject_box is not None:
            ious = [iou_xyxy(prev_subject_box, b) for b in boxes_xyxy]
            if len(ious) > 0 and max(ious) > 0.25:
                prev_match_idx = int(np.argmax(ious))

        # Score each person
        scores = []
        for i, (box, conf) in enumerate(zip(boxes_xyxy, confs)):
            x1, y1, x2, y2 = [int(v) for v in box]
            w, h = max(0, x2 - x1), max(0, y2 - y1)
            if w * h < MIN_BBOX_AREA:
                scores.append(-1.0)
                continue

            motion = 0.0
            if flow_mag is not None:
                x1c, y1c = max(0, x1), max(0, y1)
                x2c, y2c = min(WIDTH-1, x2), min(HEIGHT-1, y2)
                roi = flow_mag[y1c:y2c, x1c:x2c]
                if roi.size > 0:
                    motion = float(np.median(roi))

            score = motion * (conf ** CONF_GAMMA)
            scores.append(score)

        if len(scores) > 0:
            top_idx = int(np.argmax(scores))
            top_score = float(scores[top_idx])

            # Hysteresis: keep previous subject if close enough to the top
            if prev_match_idx is not None:
                prev_score = float(scores[prev_match_idx])
                if prev_score >= (1.0 - HYSTERESIS) * top_score:
                    best_idx = prev_match_idx
                    best_score = prev_score
                else:
                    best_idx = top_idx
                    best_score = top_score
            else:
                best_idx = top_idx
                best_score = top_score

            best_box = [int(v) for v in boxes_xyxy[best_idx]]

            # Draw ONLY the best subject
            disp = frame.copy()
            x1, y1, x2, y2 = best_box
            cv2.rectangle(disp, (x1, y1), (x2, y2), (0, 255, 255), 2)

            # skeleton
            kxy = kps_xy[best_idx]
            kc  = kps_conf[best_idx] if kps_conf is not None else None
            draw_skeleton(disp, kxy, kc, kp_thr=0.2, color=(0, 255, 0))

            # label
            label = f"Score: {best_score:.3f}"
            cv2.putText(disp, label, (x1, max(0, y1 - 10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,255), 2, cv2.LINE_AA)

        else:
            disp = frame
    else:
        disp = frame

    # Timestamp overlay
    curr_frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
    timecode = fmt_timecode(curr_frame_idx, FPS)
    info = f"Frame: {curr_frame_idx}   Time: {timecode}"
    (tw, th), base = cv2.getTextSize(info, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
    cv2.rectangle(disp, (10, 10), (10 + tw + 20, 10 + th + 20), (0,0,0), -1)
    cv2.putText(disp, info, (20, 10 + th + 5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2, cv2.LINE_AA)

    cv2.imshow(win_name, disp)

    # Update for next iter
    prev_gray = gray.copy()
    prev_subject_box = best_box

    key = cv2.waitKey(delay_ms) & 0xFF
    if key == ord('q'):
        break
    if key == ord('p'):
        while True:
            k2 = cv2.waitKey(30) & 0xFF
            if k2 in (ord('p'), ord('q')):
                if k2 == ord('q'):
                    cap.release()
                    cv2.destroyAllWindows()
                    raise SystemExit
                break

cap.release()
cv2.destroyAllWindows()


In [3]:
# %% [markdown]
# ## Export chosen frame range (only the selected subject is drawn)
# Set START_FRAME and END_FRAME (inclusive).

START_FRAME = 150
END_FRAME   = 160   # inclusive

START_FRAME = max(0, START_FRAME)
END_FRAME = min(FRAME_COUNT - 1, END_FRAME)
if END_FRAME < START_FRAME:
    raise ValueError("END_FRAME must be >= START_FRAME")

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video: {VIDEO_PATH}")

saved_records = []
prev_gray = None
prev_subject_box = None

for fidx in tqdm(range(START_FRAME, END_FRAME + 1), desc="Exporting frames"):
    # random seek (codec dependent); fallback: sequential read if needed
    cap.set(cv2.CAP_PROP_POS_FRAMES, fidx)
    ok, frame = cap.read()
    if not ok:
        ok2, frame2 = cap.read()
        if not ok2:
            print(f"[WARN] Could not read frame {fidx}, skipping.")
            continue
        frame = frame2

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow_mag = None
    if prev_gray is not None:
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray,
                                            None, 0.5, 3, 15, 3, 5, 1.1, 0)
        mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
        flow_mag = mag

    results = pose_model.predict(frame, conf=CONF_THR, imgsz=IMGSZ, verbose=False)
    r = results[0]

    out_img = frame.copy()
    best_box = None

    if len(r.boxes) > 0 and r.keypoints is not None and DO_ANNOTATE:
        boxes_xyxy = r.boxes.xyxy.cpu().numpy()
        confs      = r.boxes.conf.cpu().numpy().reshape(-1)
        kps_xy     = r.keypoints.xy.cpu().numpy()
        kps_conf   = getattr(r.keypoints, "conf", None)
        kps_conf   = kps_conf.cpu().numpy() if kps_conf is not None else None

        prev_match_idx = None
        if prev_subject_box is not None:
            ious = [iou_xyxy(prev_subject_box, b) for b in boxes_xyxy]
            if len(ious) > 0 and max(ious) > 0.25:
                prev_match_idx = int(np.argmax(ious))

        scores = []
        for i, (box, conf) in enumerate(zip(boxes_xyxy, confs)):
            x1, y1, x2, y2 = [int(v) for v in box]
            w, h = max(0, x2 - x1), max(0, y2 - y1)
            if w * h < MIN_BBOX_AREA:
                scores.append(-1.0)
                continue

            motion = 0.0
            if flow_mag is not None:
                x1c, y1c = max(0, x1), max(0, y1)
                x2c, y2c = min(WIDTH-1, x2), min(HEIGHT-1, y2)
                roi = flow_mag[y1c:y2c, x1c:x2c]
                if roi.size > 0:
                    motion = float(np.median(roi))

            score = motion * (conf ** CONF_GAMMA)
            scores.append(score)

        if len(scores) > 0:
            top_idx = int(np.argmax(scores))
            top_score = float(scores[top_idx])

            if prev_match_idx is not None and float(scores[prev_match_idx]) >= (1.0 - HYSTERESIS) * top_score:
                best_idx = prev_match_idx
            else:
                best_idx = top_idx

            best_box = [int(v) for v in boxes_xyxy[best_idx]]

            # Draw only the best subject
            x1, y1, x2, y2 = best_box
            cv2.rectangle(out_img, (x1, y1), (x2, y2), (0, 255, 255), 2)
            kxy = kps_xy[best_idx]
            kc  = kps_conf[best_idx] if kps_conf is not None else None
            draw_skeleton(out_img, kxy, kc, kp_thr=0.2, color=(0, 255, 0))

    # Stamp frame number + timestamp
    timecode = fmt_timecode(fidx, FPS)
    label = f"Frame: {fidx}   Time: {timecode}"
    (tw, th), base = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
    cv2.rectangle(out_img, (10, 10), (10 + tw + 20, 10 + th + 20), (0,0,0), -1)
    cv2.putText(out_img, label, (20, 10 + th + 5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2, cv2.LINE_AA)

    out_path = OUTPUT_DIR / f"frame_{fidx:06d}.jpg"
    cv2.imwrite(str(out_path), out_img)

    saved_records.append({
        "frame": fidx,
        "time_seconds": (fidx / FPS) if FPS > 0 else 0.0,
        "timecode": timecode,
        "image_path": str(out_path.resolve())
    })

    prev_gray = gray.copy()
    prev_subject_box = best_box

cap.release()
print(f"Saved {len(saved_records)} frames to: {OUTPUT_DIR.resolve()}")


Exporting frames: 100%|██████████| 11/11 [00:05<00:00,  1.94it/s]

Saved 11 frames to: C:\Users\yenul\PycharmProjects\YOLO_speed_tracker\exported_frames_yolo_single





In [4]:
# %% [markdown]
# ## Write CSV

CSV_PATH = OUTPUT_DIR / "exported_frames_manifest.csv"
with open(CSV_PATH, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["frame","time_seconds","timecode","image_path"])
    writer.writeheader()
    writer.writerows(saved_records)

print(f"CSV written to: {CSV_PATH.resolve()}")


CSV written to: C:\Users\yenul\PycharmProjects\YOLO_speed_tracker\exported_frames_yolo_single\exported_frames_manifest.csv
