In [None]:
# Install and Imports

In [1]:
# Cell 1 — Install Dependencies
# Run this in your terminal or Jupyter cell (with ! or % as needed)
%pip install opencv-python mediapipe matplotlib yt-dlp

Note: you may need to restart the kernel to use updated packages.


In [3]:
# Cell 2 — Imports & Utilities (with local trim)
import cv2
import numpy as np
import re
import mediapipe as mp
from yt_dlp import YoutubeDL
import matplotlib.pyplot as plt

# MediaPipe pose setup
mp_pose = mp.solutions.pose
pose    = mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

def download_youtube_mp4(youtube_url, output_path, filename="dive_raw.mp4"):
    """Download a YouTube URL (including shorts) via yt-dlp."""
    ydl_opts = {
        'format': 'mp4',
        'outtmpl': f'{output_path}/{filename}',
        'quiet': True,
    }
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([youtube_url])
    return f"{output_path}/{filename}"

def trim_clip_local(input_path, start_s, end_s, output_path):
    """
    Trim a portion [start_s, end_s] from input_path and save to output_path
    using OpenCV only (no ffmpeg).
    """
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    start_frame = int(start_s * fps)
    end_frame   = int(end_s * fps)
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
    for _ in range(start_frame, end_frame):
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    cap.release()
    out.release()
    return output_path

def extract_frames(video_path):
    """Load all frames from a video into a list."""
    cap, frames = cv2.VideoCapture(video_path), []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames


I0000 00:00:1745364443.437116 16831707 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M2


In [9]:
# Cell 3 — Download Your Shorts Clip
raw_url = "https://youtube.com/shorts/cWi67iT1Mes?si=TDfU78iKeNOsKn6D"

# 1) Extract the video ID from a /shorts/ or watch URL
m = re.search(r"(?:shorts/|watch\?v=)([\w-]+)", raw_url)
if not m:
    raise ValueError(f"Could not parse video ID from {raw_url}")
video_id = m.group(1)

# 2) Normalize to a watch URL and download
watch_url = f"https://www.youtube.com/watch?v={video_id}"
print("Downloading from:", watch_url)
raw_path = download_youtube_mp4(watch_url, output_path=".", filename="dive_raw1.mp4")
print("Downloaded to:", raw_path)


Downloading from: https://www.youtube.com/watch?v=cWi67iT1Mes
Downloaded to: ./dive_raw1.mp4


In [None]:
# Cell 4 — Auto‑Detect & Trim the Three Dives
frames_all = extract_frames(raw_path)

# Compute frame‐difference magnitudes
diffs = [
    np.sum(cv2.absdiff(
        cv2.cvtColor(frames_all[i-1], cv2.COLOR_BGR2GRAY),
        cv2.cvtColor(frames_all[i],   cv2.COLOR_BGR2GRAY)
    ))
    for i in range(1, len(frames_all))
]

# Pick the top‑3 splash peaks
peaks = np.argpartition(diffs, -3)[-3:]
peaks = sorted(peaks)

# Convert to seconds
cap = cv2.VideoCapture(raw_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
entry_times = [p/fps for p in peaks]
print("Detected splash times (s):", entry_times)

# Trim a small window around each splash
clip_paths = []
for idx, t in enumerate(entry_times, start=1):
    start, end = max(0, t-1), t+2  # 1 s before entry, 2 s after
    out_name = f"dive{idx}.mp4"
    trim_clip_local(raw_path, start, end, out_name)
    clip_paths.append(out_name)
    print(f" → Created {out_name}")

# `clip_paths` now holds ["dive1.mp4","dive2.mp4","dive3.mp4"]


In [None]:
# Cell 5 — Core Analysis Functions (updated for LEFT_/RIGHT_ landmarks)
import math

def get_landmarks(frames):
    lm_list = []
    for f in frames:
        rgb = cv2.cvtColor(f, cv2.COLOR_BGR2RGB)
        res = pose.process(rgb).pose_landmarks
        if not res:
            lm_list.append(None)
            continue
        pts = {
            mp_pose.PoseLandmark(i).name: (lm.x, lm.y)
            for i, lm in enumerate(res.landmark)
        }
        lm_list.append(pts)
    return lm_list

def detect_takeoff_and_entry(frames, landmarks):
    # 1) Hip vertical velocity (average of left+right hips)
    y_vals = []
    for lm in landmarks:
        if lm and 'LEFT_HIP' in lm and 'RIGHT_HIP' in lm:
            y_vals.append((lm['LEFT_HIP'][1] + lm['RIGHT_HIP'][1]) / 2)
        else:
            y_vals.append(None)
    vels = []
    for i in range(1, len(y_vals)):
        if y_vals[i] is not None and y_vals[i-1] is not None:
            vels.append(y_vals[i-1] - y_vals[i])  # upward = positive
        else:
            vels.append(0)
    takeoff_idx = int(np.argmax(vels))

    # 2) Splash via frame-difference
    diffs = []
    for i in range(1, len(frames)):
        g1 = cv2.cvtColor(frames[i-1], cv2.COLOR_BGR2GRAY)
        g2 = cv2.cvtColor(frames[i],   cv2.COLOR_BGR2GRAY)
        diffs.append(np.sum(cv2.absdiff(g2, g1)))
    entry_idx = int(np.argmax(diffs))

    return takeoff_idx, entry_idx

def compute_metrics(frames, landmarks, to_idx, en_idx):
    lm_e = landmarks[en_idx]
    # Average shoulders and hips
    shoulder = (np.array(lm_e['LEFT_SHOULDER']) +
                np.array(lm_e['RIGHT_SHOULDER'])) / 2
    hip      = (np.array(lm_e['LEFT_HIP']) +
                np.array(lm_e['RIGHT_HIP'])) / 2

    # Entry angle: torso vector vs vertical
    vec = hip - shoulder
    vertical = np.array([0, 1])
    entry_angle = math.degrees(
        math.acos(np.dot(vec/np.linalg.norm(vec), vertical))
    )

    # Straightness: average deviation of ankle from shoulder‑hip line
    devs = []
    for lm in landmarks[to_idx:en_idx]:
        if not lm: 
            continue
        ankle = (np.array(lm['LEFT_ANKLE']) +
                 np.array(lm['RIGHT_ANKLE'])) / 2
        dev = np.linalg.norm(
            np.cross(hip-shoulder, shoulder-ankle)
        ) / np.linalg.norm(hip-shoulder)
        devs.append(dev)
    straightness = max(0, 1 - np.mean(devs)*10)

    # Splash area: diff mask at entry
    g1 = cv2.cvtColor(frames[en_idx-1], cv2.COLOR_BGR2GRAY)
    g2 = cv2.cvtColor(frames[en_idx],   cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(g2, g1)
    _, thresh = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)
    splash_area = np.sum(thresh > 0)

    return {
        'entry_angle': entry_angle,
        'straightness': straightness,
        'splash_area': splash_area
    }

def score_dive(metrics, weights=(0.4, 0.3, 0.3)):
    a = max(0, 1 - metrics['entry_angle']/30)
    b = metrics['straightness']
    c = max(0, 1 - metrics['splash_area']/50000)
    w1, w2, w3 = weights
    return 10 * (w1*a + w2*b + w3*c)


In [None]:
# Cell 6 — Analyze All Three Clips (with landmark‐existence checks)

def find_valid_frame(landmarks, idx):
    """
    Return the nearest index to `idx` where landmarks[idx] is not None.
    Search outward until you find one or exhaust the list.
    """
    n = len(landmarks)
    if landmarks[idx] is not None:
        return idx
    for d in range(1, n):
        for sign in (+1, -1):
            i = idx + sign*d
            if 0 <= i < n and landmarks[i] is not None:
                return i
    return None  # no valid frame found

results = {}

for path in clip_paths:
    fr = extract_frames(path)
    lm = get_landmarks(fr)
    to, en = detect_takeoff_and_entry(fr, lm)

    # Find valid landmark frames
    valid_to = find_valid_frame(lm, to)
    valid_en = find_valid_frame(lm, en)
    if valid_to is None or valid_en is None:
        print(f"Warning: no valid landmarks for {path}, skipping.")
        continue

    # Compute metrics using the valid indices
    m = compute_metrics(fr, lm, valid_to, valid_en)
    s = score_dive(m)
    results[path] = {
        'takeoff_frame': valid_to,
        'entry_frame':   valid_en,
        'metrics':       m,
        'score':         round(s, 2)
    }

import pprint
pprint.pprint(results)


In [None]:
# Cell 7 — Presentable Table of Dive Results
import pandas as pd
from IPython.display import display

# Flatten your `results` dict
rows = []
for clip, data in results.items():
    rows.append({
        'Clip': clip,
        'Takeoff Frame': data['takeoff_frame'],
        'Entry Frame': data['entry_frame'],
        'Entry Angle (°)': round(data['metrics']['entry_angle'], 2),
        'Straightness': round(data['metrics']['straightness'], 3),
        'Splash Area': data['metrics']['splash_area'],
        'Score': data['score']
    })

# Create and show the DataFrame
df = pd.DataFrame(rows)
display(df)


In [None]:
# Cell 8 — Visualize One Dive’s Velocity & Splash (corrected)

# 1) Load frames & landmarks for the first clip
fr = extract_frames(clip_paths[0])
lm = get_landmarks(fr)

# 2) Compute vertical hip velocity (avg of left+right hips)
vels = []
for i in range(1, len(lm)):
    prev, cur = lm[i-1], lm[i]
    if prev and cur and all(k in prev for k in ('LEFT_HIP','RIGHT_HIP')) \
               and all(k in cur  for k in ('LEFT_HIP','RIGHT_HIP')):
        y_prev = (prev['LEFT_HIP'][1] + prev['RIGHT_HIP'][1]) / 2
        y_cur  = (cur ['LEFT_HIP'][1] + cur ['RIGHT_HIP'][1]) / 2
        vels.append(y_prev - y_cur)
    else:
        vels.append(0)

# 3) Plot velocity
import matplotlib.pyplot as plt
plt.figure(figsize=(8,3))
plt.plot(vels)
plt.title("Hip Vertical Velocity")
plt.xlabel("Frame")
plt.ylabel("Δy")
plt.show()

# 4) Show splash mask at entry
entry_idx = results[clip_paths[0]]['entry_frame']
g1 = cv2.cvtColor(fr[entry_idx-1], cv2.COLOR_BGR2GRAY)
g2 = cv2.cvtColor(fr[entry_idx],   cv2.COLOR_BGR2GRAY)
diff = cv2.absdiff(g2, g1)
_, mask = cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)

plt.figure(figsize=(4,4))
plt.imshow(mask, cmap='gray')
plt.title(f"Splash Mask (Frame {entry_idx})")
plt.axis('off')
plt.show()


In [None]:
# Cell 9 — Presentable Recommendations (Markdown format)

from IPython.display import display, Markdown

def present_recommendations(results):
    for clip, data in results.items():
        angle     = data['metrics']['entry_angle']
        straight  = data['metrics']['straightness']
        splash    = data['metrics']['splash_area']
        score     = data['score']
        
        recs = []
        # Entry Angle
        if angle > 30:
            recs.append(f"**Entry Angle**: {angle:.1f}° is off‑vertical; aim for under 10° for a pencil entry.")
        else:
            recs.append(f"**Entry Angle**: {angle:.1f}° – excellent alignment; keep it up!")
        # Body Straightness
        if straight < 0.5:
            recs.append(f"**Body Straightness**: {straight:.2f} – focus on a tighter streamline: shoulders, hips, and ankles in one line.")
        else:
            recs.append(f"**Body Straightness**: {straight:.2f} – great mid‑air alignment!")
        # Splash Size
        if splash > 50000:
            recs.append(f"**Splash Size**: {splash} px – work on reducing splash by keeping the body tight at entry.")
        else:
            recs.append(f"**Splash Size**: {splash} px – good entry with minimal splash!")
        
        md = (
            f"---\n"
            f"### {clip}  ‒  Score: **{score:.2f}**\n\n" +
            "\n".join(f"- {r}" for r in recs) +
            "\n"
        )
        display(Markdown(md))

# Call it
present_recommendations(results)


In [None]:
# Cell 10 — Export Annotated Videos with Overlays

import os
from cv2 import putText, rectangle, FONT_HERSHEY_SIMPLEX
import mediapipe as mp

mp_drawing = mp.solutions.drawing_utils

def export_annotated(video_path, out_path, landmarks, takeoff_idx, entry_idx, score):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
    
    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # draw skeleton if we have landmarks
        lm = landmarks[frame_idx]
        if lm:
            mp_drawing.draw_landmarks(
                frame, 
                pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).pose_landmarks,
                mp_pose.POSE_CONNECTIONS
            )
        
        # highlight takeoff
        if frame_idx == takeoff_idx:
            rectangle(frame, (10,10), (160,50), (0,255,0), 2)
            putText(frame, "TAKEOFF", (15,40), FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
        # highlight entry
        if frame_idx == entry_idx:
            rectangle(frame, (10,60), (160,100), (0,0,255), 2)
            putText(frame, "ENTRY", (15,95), FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
            
        # stamp score
        putText(frame, f"Score: {score:.1f}", (w-200,h-20),
                FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)
        
        out.write(frame)
        frame_idx += 1
    
    cap.release()
    out.release()
    print(f"Annotated video written to {out_path}")

# Run it for each clip
for clip, data in results.items():
    ann_path = f"annotated_{clip}"
    fr = extract_frames(clip)
    lm = get_landmarks(fr)
    export_annotated(
        clip,
        ann_path,
        lm,
        data['takeoff_frame'],
        data['entry_frame'],
        data['score']
    )