In [None]:
from google.colab import files, drive
import os

drive.mount('/content/drive')
video_path = '/content/drive/MyDrive/Colab/lift_video-demos/in/deadlift-demo2.mp4'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import cv2, mediapipe as mp, json, numpy as np, os, tempfile, subprocess
from tqdm import tqdm

mp_pose = mp.solutions.pose

def landmark_to_dict(lm):
    return {'x': lm.x, 'y': lm.y, 'z': lm.z, 'visibility': lm.visibility}

def draw_landmarks(frame, landmarks):
    h, w = frame.shape[:2]
    for lm in landmarks:
        cx, cy = int(lm['x'] * w), int(lm['y'] * h)
        cv2.circle(frame, (cx, cy), 4, (0,255,255), -1)
    pairs = [(11,13),(13,15),(12,14),(14,16),(11,12),(23,24),(11,23),(12,24)]
    for a,b in pairs:
        if a < len(landmarks) and b < len(landmarks):
            pa = (int(landmarks[a]['x']*w), int(landmarks[a]['y']*h))
            pb = (int(landmarks[b]['x']*w), int(landmarks[b]['y']*h))
            cv2.line(frame, pa, pb, (0,200,255), 2)

def wrist_midpoint(landmarks):
    try:
        l = landmarks[15]
        r = landmarks[16]
        return ((l['x'] + r['x'])/2.0, (l['y'] + r['y'])/2.0)
    except:
        return None

def smooth(values, k=5):
    if len(values) < k:
        return values
    arr = np.array(values)
    kernel = np.ones(k)/k
    sm = np.convolve(arr, kernel, mode='same')
    return sm.tolist()

def process(video_path, out_json='out.json', out_video='out_annotated.mp4', model_complexity=1):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)

    #try cv2.VideoWriter first
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(out_video, fourcc, fps, (width, height))
    use_image_fallback = not writer.isOpened()

    tmpdir = None
    frame_index = 0
    if use_image_fallback:
        print("cv2.VideoWriter failed; falling back to image-sequence + ffmpeg")
        tmpdir = tempfile.mkdtemp(prefix='frames_')
    else:
        print("cv2.VideoWriter opened successfully for", out_video)

    results_list = []
    wrist_positions = []
    times = []
    idx = 0

    with mp_pose.Pose(static_image_mode=False, model_complexity=model_complexity,
                      min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        for _ in tqdm(range(frame_count), desc='Processing'):
            ret, frame = cap.read()
            if not ret:
                break
            img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = pose.process(img_rgb)

            if res.pose_landmarks:
                lms = [landmark_to_dict(lm) for lm in res.pose_landmarks.landmark]
                results_list.append({'frame': idx, 'landmarks': lms})
                draw_landmarks(frame, lms)
                wm = wrist_midpoint(lms)
                if wm:
                    wrist_positions.append((wm[0]*width, wm[1]*height))
                    times.append(idx / fps)
                    cv2.circle(frame, (int(wm[0]*width), int(wm[1]*height)), 6, (0,120,255), -1)
                else:
                    wrist_positions.append(None)
                    times.append(idx / fps)
            else:
                results_list.append({'frame': idx, 'landmarks': None})
                wrist_positions.append(None)
                times.append(idx / fps)

            # write frame via chosen path
            if use_image_fallback:
                fname = os.path.join(tmpdir, f"frame_{frame_index:06d}.png")
                cv2.imwrite(fname, frame)
                frame_index += 1
            else:
                writer.write(frame)

            idx += 1

    cap.release()
    if not use_image_fallback:
        writer.release()

    # If fallback used, assemble with ffmpeg
    if use_image_fallback:
        assembled_cmd = [
            'ffmpeg', '-y',
            '-framerate', str(int(fps)),
            '-i', os.path.join(tmpdir, 'frame_%06d.png'),
            '-c:v', 'libx264', '-pix_fmt', 'yuv420p',
            out_video
        ]
        print("Assembling frames with ffmpeg...")
        subprocess.run(assembled_cmd, check=True)
        print("Assembled video:", out_video)

    # compute velocities
    velocities = []
    prev = None
    prev_t = None
    for i, pos in enumerate(wrist_positions):
        if pos is None:
            velocities.append(None)
            prev = None
            prev_t = None
            continue
        t = times[i]
        if prev is None:
            velocities.append(0.0)
        else:
            dy = pos[1] - prev[1]
            dt = t - prev_t if t - prev_t > 0 else (1.0 / fps)
            v = dy / dt
            velocities.append(v)
        prev = pos
        prev_t = t

    numeric_vs = [v if v is not None else 0.0 for v in velocities]
    smoothed = smooth(numeric_vs, k=5)

    for i, fr in enumerate(results_list):
        fr_time = i / fps
        fr['time'] = fr_time
        fr['wrist_midpoint'] = None if wrist_positions[i] is None else {'x': wrist_positions[i][0], 'y': wrist_positions[i][1]}
        fr['velocity_px_per_s'] = None if velocities[i] is None else float(smoothed[i])

    with open(out_json, 'w') as f:
        json.dump({'fps': fps, 'frames': results_list}, f, indent=2)

    print("Wrote:", out_json, out_video)
    return out_json, out_video

out_json, out_video = process(video_path, out_json='out.json', out_video='out_annotated.mp4')
print(out_json, out_video)

cv2.VideoWriter opened successfully for out_annotated.mp4


Processing: 100%|██████████| 340/340 [00:28<00:00, 12.01it/s]


Wrote: out.json out_annotated.mp4
out.json out_annotated.mp4


In [None]:
from google.colab import files
files.download('out.json')
files.download('/content/out_annotated.mp4')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>