# PoseRAC: Front Raises Skeleton Extraction
This notebook extracts skeleton data (33 landmarks and 5 average joint angles) as required by the [PoseRAC](https://github.com/MiracleDance/PoseRAC) paper: *Pose Saliency Transformer for Repetitive Action Counting*.

## 1. Setup Environment
Cloning the repository and installing dependencies.

In [None]:
# @title 0. Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# @title 1. Environment Setup
!git clone https://github.com/MiracleDance/PoseRAC.git
!pip install mediapipe yt-dlp opencv-python numpy pandas matplotlib tqdm
!mkdir -p models
# Download MediaPipe model if not exists
import os
import requests

model_url = "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task"
model_path = "models/pose_landmarker.task"
if not os.path.exists(model_path):
    print(f"Downloading {model_path}...")
    r = requests.get(model_url)
    with open(model_path, 'wb') as f:
        f.write(r.content)
    print("Download complete.")


In [None]:
# @title 1.1 RepCount_pose Dataset Configuration
import pandas as pd
import os

# Set path to the RepCount_pose folder on your Google Drive
repcount_pose_path = "/content/drive/MyDrive/RepCount_pose" # @param {type:"string"}
annotations_dir = os.path.join(repcount_pose_path, "annotation")
extracted_poses_dir = os.path.join(repcount_pose_path, "test_poses_5_ave")

# Load annotations and filter for 'front_raise'
def get_front_raise_records(split="test"):
    csv_path = os.path.join(annotations_dir, f"{split}.csv")
    if not os.path.exists(csv_path):
        print(f"Annotation file not found: {csv_path}")
        return pd.DataFrame()
    
    df = pd.read_csv(csv_path)
    # Filter for 'front_raise' or similar category
    front_raise_df = df[df['type'].str.contains('front_raise', case=False, na=False)]
    print(f"Found {len(front_raise_df)} 'front_raise' records in {split} set.")
    return front_raise_df

# Example: Get test set front raise videos
front_raise_test_df = get_front_raise_records("test")
if not front_raise_test_df.empty:
    display(front_raise_test_df.head())


In [None]:
# @title 1.2 Load Pre-extracted PoseRAC Data (.npy)
import numpy as np

def load_repcount_poserac_data(video_id, dataset_dir=extracted_poses_dir):
    """Loads 33*3 landmarks + 5 angles from the pre-extracted .npy files."""
    npy_path = os.path.join(dataset_dir, f"{video_id}.npy")
    if os.path.exists(npy_path):
        data = np.load(npy_path)
        print(f"Loaded {data.shape} data for {video_id}")
        # PoseRAC data shape: (num_frames, 104) where 104 = 33*3 (landmarks) + 5 (angles)
        return data
    else:
        print(f"Data for video {video_id} not found at {npy_path}")
        return None

# Test loading the first front raise video data
if not front_raise_test_df.empty:
    first_video_id = front_raise_test_df.iloc[0]['name']
    test_data = load_repcount_poserac_data(first_video_id)


In [None]:
# @title 2. Imports and PoseRAC Angle Logic
import cv2
import mediapipe as mp
import numpy as np
import json
import subprocess
import copy
from google.colab import files
from IPython.display import HTML, display
from base64 import b64encode
import yt_dlp
from tqdm.notebook import tqdm

def calculate_angle(a, b, c):
    """Calculates the angle at point B (A-B-C) in 3D space."""
    a = np.array([a['x'], a['y'], a['z']])
    b = np.array([b['x'], b['y'], b['z']])
    c = np.array([c['x'], c['y'], c['z']])
    
    v1 = a - b
    v2 = c - b
    
    # Cosine of the angle
    dot = np.dot(v1, v2)
    norm1 = np.linalg.norm(v1)
    norm2 = np.linalg.norm(v2)
    
    if norm1 == 0 or norm2 == 0:
        return 0.0
        
    cosine = dot / (norm1 * norm2)
    cosine = np.clip(cosine, -1.0, 1.0)
    angle = np.arccos(cosine)
    return np.degrees(angle)

def get_poserac_angles(landmarks):
    """Calculates the 5 average joint angles as specified in PoseRAC."""
    if not landmarks or len(landmarks) < 33:
        return [0.0] * 5

    # Landmarks indices (MediaPipe)
    # L_SHOULDER: 11, R_SHOULDER: 12
    # L_ELBOW: 13, R_ELBOW: 14
    # L_WRIST: 15, R_WRIST: 16
    # L_HIP: 23, R_HIP: 24
    # L_KNEE: 25, R_KNEE: 26
    # L_ANKLE: 27, R_ANKLE: 28
    # L_FOOT_INDEX: 31, R_FOOT_INDEX: 32

    # 1. Elbow (Shoulder-Elbow-Wrist)
    angle_elbow_l = calculate_angle(landmarks[11], landmarks[13], landmarks[15])
    angle_elbow_r = calculate_angle(landmarks[12], landmarks[14], landmarks[16])
    avg_elbow = (angle_elbow_l + angle_elbow_r) / 2.0

    # 2. Shoulder (Hip-Shoulder-Elbow)
    angle_shoulder_l = calculate_angle(landmarks[23], landmarks[11], landmarks[13])
    angle_shoulder_r = calculate_angle(landmarks[24], landmarks[12], landmarks[14])
    avg_shoulder = (angle_shoulder_l + angle_shoulder_r) / 2.0

    # 3. Hip (Shoulder-Hip-Knee)
    angle_hip_l = calculate_angle(landmarks[11], landmarks[23], landmarks[25])
    angle_hip_r = calculate_angle(landmarks[12], landmarks[24], landmarks[26])
    avg_hip = (angle_hip_l + angle_hip_r) / 2.0

    # 4. Knee (Hip-Knee-Ankle)
    angle_knee_l = calculate_angle(landmarks[23], landmarks[25], landmarks[27])
    angle_knee_r = calculate_angle(landmarks[24], landmarks[26], landmarks[28])
    avg_knee = (angle_knee_l + angle_knee_r) / 2.0

    # 5. Ankle (Knee-Ankle-Foot Index)
    angle_ankle_l = calculate_angle(landmarks[25], landmarks[27], landmarks[31])
    angle_ankle_r = calculate_angle(landmarks[26], landmarks[28], landmarks[32])
    avg_ankle = (angle_ankle_l + angle_ankle_r) / 2.0

    return [avg_elbow, avg_shoulder, avg_hip, avg_knee, avg_ankle]


In [None]:
# @title 3. Extraction Logic and Front Raise Processing
def process_video_poserac(video_path, start_frame=0, end_frame=None, use_gpu=False):
    """Processes video and extracts PoseRAC format skeleton data (33*3 landmarks + 5 angles)."""
    BaseOptions = mp.tasks.BaseOptions
    PoseLandmarker = mp.tasks.vision.PoseLandmarker
    PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
    VisionRunningMode = mp.tasks.vision.RunningMode

    options = PoseLandmarkerOptions(
        base_options=BaseOptions(
            model_asset_path='models/pose_landmarker.task',
            delegate=BaseOptions.Delegate.GPU if use_gpu else BaseOptions.Delegate.CPU
        ),
        running_mode=VisionRunningMode.VIDEO
    )

    poserac_data = []
    with PoseLandmarker.create_from_options(options) as landmarker:
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if fps <= 0: fps = 30.0

        if end_frame is None or end_frame <= 0: end_frame = total_frames
        start_frame, end_frame = max(0, int(start_frame)), min(total_frames, int(end_frame))
        
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        frame_count = start_frame
        pbar = tqdm(total=end_frame - start_frame, desc="Processing PoseRAC Skeleton", leave=False)

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret or frame_count >= end_frame: break

            timestamp_ms = int((frame_count / fps) * 1000)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)

            detection_result = landmarker.detect_for_video(mp_image, timestamp_ms)

            frame_landmarks = []
            if detection_result.pose_landmarks:
                for landmark in detection_result.pose_landmarks[0]:
                    frame_landmarks.append({'x': landmark.x, 'y': landmark.y, 'z': landmark.z, 'visibility': landmark.visibility})

            if len(frame_landmarks) == 33:
                angles = get_poserac_angles(frame_landmarks)
                # Combine 33*3 landmarks + 5 angles = 104 values (or 33*4 if including visibility)
                # PoseRAC typically uses 3D coordinates.
                poserac_data.append({
                    'frame': frame_count,
                    'timestamp': timestamp_ms,
                    'landmarks': frame_landmarks,
                    'poserac_angles': angles
                })

            frame_count += 1
            pbar.update(1)

        pbar.close()
        cap.release()
    return poserac_data

def save_poserac_data(data, output_file):
    """Saves PoseRAC data in JSON format."""
    with open(output_file, 'w') as f:
        json.dump(data, f, indent=4)
    print(f"PoseRAC data saved to {output_file}")


In [None]:
# @title 4. Choose Input Source
input_source = "RepCount_pose Dataset" # @param ["Upload Video (Local)", "YouTube URL", "Google Drive", "RepCount_pose Dataset"]
# For RepCount_pose, specify the video name/ID (e.g., 'front_raise_001')
repcount_video_id = "front_raise_001" # @param {type:"string"}
youtube_url = "https://www.youtube.com/watch?v=-t7fuZ0KhDA" # @param {type:"string"}
drive_file_path = "" # @param {type:"string"}
frame_ranges_str = "0-500" # @param {type:"string"}
use_gpu = True # @param {type:"boolean"}

source_path = ""
input_filename = "front_raises.mp4"
is_repcount = False

if input_source == "RepCount_pose Dataset":
    # Try to find the video file in raw_videos_dir (if provided) or just load the .npy
    raw_videos_dir = os.path.join(repcount_pose_path, "original_data", "videos")
    # Video extension might be .mp4, .avi, etc.
    source_path = os.path.join(raw_videos_dir, f"{repcount_video_id}.mp4")
    if not os.path.exists(source_path):
        # Check for other extensions
        for ext in ['.avi', '.MP4', '.mov']:
            if os.path.exists(os.path.join(raw_videos_dir, f"{repcount_video_id}{ext}")):
                source_path = os.path.join(raw_videos_dir, f"{repcount_video_id}{ext}")
                break
    
    if os.path.exists(source_path):
        print(f"Using RepCount video: {source_path}")
    else:
        print(f"Warning: Video file for {repcount_video_id} not found at {raw_videos_dir}. Extraction will require raw video.")
    
    is_repcount = True
    input_filename = f"{repcount_video_id}.mp4"

elif input_source == "Upload Video (Local)":
    from google.colab import files
    uploaded = files.upload()
    if uploaded:
        source_path = list(uploaded.keys())[0]
        input_filename = source_path
        print(f"Uploaded {source_path}")

elif input_source == "YouTube URL":
    if youtube_url:
        print("Downloading YouTube video...")
        ydl_opts = {'format': 'best[ext=mp4]/best', 'outtmpl': 'input_video.mp4', 'noplaylist': True}
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url])
            source_path = 'input_video.mp4'; input_filename = 'front_raises.mp4'
        except Exception as e: print(f"Error downloading YouTube video: {e}")

elif input_source == "Google Drive":
    if drive_file_path and os.path.exists(drive_file_path):
        source_path = drive_file_path
        input_filename = os.path.basename(drive_file_path)
    else:
        print(f"Drive file not found: {drive_file_path}")

if source_path:
    print(f"Ready to process: {source_path}")


In [None]:
# @title 5. Run Extraction & Save Data
output_json = "poserac_front_raises.json"
all_poserac_data = []

# If RepCount dataset is used, we can load the pre-extracted data if it exists
if input_source == "RepCount_pose Dataset":
    npy_data = load_repcount_poserac_data(repcount_video_id)
    if npy_data is not None:
        print(f"Using pre-extracted data for {repcount_video_id}. No extraction needed.")
        # Convert npy (frames, 104) back to our list of dicts format for visualization consistency
        for i, row in enumerate(npy_data):
            # landmarks: row[0:99] (33*3), angles: row[99:104] (5)
            all_poserac_data.append({
                'frame': i,
                'poserac_angles': row[99:104].tolist()
            })
    elif os.path.exists(source_path):
        print(f"Data not found, running extraction on {source_path}...")
        all_poserac_data = process_video_poserac(source_path, use_gpu=use_gpu)
else:
    # Handle other sources with potential frame ranges
    frame_ranges = []
    try:
        for r in frame_ranges_str.split(','):
            start, end = map(int, r.strip().split('-'))
            frame_ranges.append((start, end))
    except: frame_ranges = [(0, 1000)]
    
    for start, end in frame_ranges:
        print(f"Processing range {start}-{end}...")
        range_data = process_video_poserac(source_path, start_frame=start, end_frame=end, use_gpu=use_gpu)
        all_poserac_data.extend(range_data)

if all_poserac_data:
    save_poserac_data(all_poserac_data, output_json)
    files.download(output_json)


In [None]:
# @title 6. Visualize Results (Front Raises Analysis)
import matplotlib.pyplot as plt

if all_poserac_data:
    frames = [d['frame'] for d in all_poserac_data]
    elbow_angles = [d['poserac_angles'][0] for d in all_poserac_data]
    shoulder_angles = [d['poserac_angles'][1] for d in all_poserac_data]

    plt.figure(figsize=(15, 6))
    plt.plot(frames, shoulder_angles, label='Shoulder Angle (Hip-Shoulder-Elbow)', color='blue')
    plt.plot(frames, elbow_angles, label='Elbow Angle (Shoulder-Elbow-Wrist)', color='green')
    plt.xlabel('Frame')
    plt.ylabel('Angle (Degrees)')
    plt.title('PoseRAC Angles: Front Raises Analysis')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    # Peak Shoulder angle identifies the top of the raise
    peak_frames = [f for f, s in zip(frames, shoulder_angles) if s > 120]
    print(f"Identified {len(peak_frames)} potential frames at peak contraction.")
else:
    print("No data processed yet. Please run Cell 5.")


In [None]:
# @title 7. Integrating with PoseRAC Model
# This cell explains how to use the extracted data with the cloned PoseRAC repo.
# PoseRAC expects a specific folder structure and salient pose annotations.

print("--- Integration with PoseRAC ---")
print("1. Locate 'PoseRAC' folder created in Cell 1.")
print("2. Use the 'pre_train_angles.py' script if you have many videos.")
print("3. For front raises specifically, ensure your 'poserac_front_raises.json' contains sequences corresponding to single repetitions or clear sets.")
print("4. The 5 angles (Elbow, Shoulder, Hip, Knee, Ankle) are now calculated and ready.")

# Example: Check if the cloned repo has some specific utility we can use
try:
    import sys
    sys.path.append('PoseRAC')
    # If there are specific utilities, we could import them here.
    print("PoseRAC path added to sys.path.")
except Exception as e:
    print(f"Could not find PoseRAC utilities: {e}")
