# Create metadata dataset
1.   get vids from google drive
2.   create a df from the vids in the google drive, parse the filename for col values
3.   upload dataset metadata to hugging face



In [None]:
!pip install huggingface_hub datasets -q

import os
import pandas as pd
from datasets import Dataset
from huggingface_hub import login
from google.colab import drive

DRIVE_PATH = "/content/drive/MyDrive/poseEstRawVids"
HF_DATASET_NAME = "pose-est_metadata"
HF_TOKEN = ""

drive.mount('/content/drive')

In [None]:
# Parse video files and create metadata dataframe
def parse_video_metadata(root, file):
    """Extract metadata from filename following convention"""
    try:
        name, ext = os.path.splitext(file)
        parts = name.split('_')

        exercise, view, correctness, participant, unique_id = parts

        return {
            "file_name": file,
            "exercise": exercise,
            "view": view,
            "correctness": correctness,
            "participant": participant,
            "unique_id": unique_id,
            "drive_path": os.path.join(root, file),
            "extension": ext
        }
    except Exception as e:
        print(f"Skipping {file}: {str(e)}")
        return None

metadata = []
for root, _, files in os.walk(DRIVE_PATH):
    for file in files:
      meta = parse_video_metadata(root, file)
      if meta:
          metadata.append(meta)

In [None]:
# Create Hugging Face Dataset
df = pd.DataFrame(metadata)
dataset = Dataset.from_pandas(df)
df.head()

In [None]:
# Upload metadata to Hugging Face
login(token=HF_TOKEN)
dataset.push_to_hub(HF_DATASET_NAME)

print("✅ Metadata dataset uploaded to Hugging Face Hub!")
print(f"📊 Dataset contains {len(df)} video entries")

# Load dataset
1.   load metadata dataframe from huggingface
2.   mount videos from google drive

In [None]:
!pip install -U datasets

In [None]:
from datasets import load_dataset
from google.colab import drive
import pandas as pd

HF_TOKEN = ""
dataset = load_dataset("jrs-a/test") #pose-est_metadata

In [None]:
drive.mount('/content/drive')

In [None]:
print(dataset)

only run on first time

In [None]:
# sample
sample = dataset['train'][0]
print(f"Analyzing: {sample['exercise']} ({sample['correctness']})")

# If you need to access the video data/path:
video_path = sample.get('drive_path')  # or whatever key stores your video data
print(f"Video source: {video_path}")

# Process video (example using OpenCV)
!pip install opencv-python -q
import cv2

cap = cv2.VideoCapture(video_path)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video loaded with {frame_count} frames")

# Detect with OpenPose

In [None]:
# requirements
!apt-get update
!apt-get install -y pigz libgoogle-glog-dev libboost-all-dev libatlas-base-dev libopencv-dev
!pip install huggingface_hub numpy opencv-python

import subprocess
import os
from huggingface_hub import hf_hub_download

## get openpose bin from huggingface

In [None]:
repo_id = "jrs-a/openpose-colab"
tarball_path = hf_hub_download(repo_id=repo_id, filename="openpose_colab_build.tar.gz")

# Extract to /content
!tar --use-compress-program=pigz -xvf "{tarball_path}"

## get openpose keypoints
1.   detect keypoints
2.   store keypoints to df


### extract json keypoints

In [None]:
def process_video_with_openpose(video_path, output_dir, model_dir, json_filename, video_filename):
    """
    Process a video using OpenPose binary and save keypoints as JSON files
    """
    # Create output directories
    os.makedirs(os.path.join(output_dir, "json"), exist_ok=True)
    os.makedirs(os.path.join(output_dir, "rendered"), exist_ok=True)

    # Build command
    cmd = [
        f"{OPENPOSE_BUILD_PATH}/examples/openpose/openpose.bin",
        "--video", video_path,
        "--write_json", os.path.join(output_dir, "json", json_filename),
        # "--write_video", os.path.join(output_dir, "rendered", f"{video_filename}.avi"),
        "--display", "0",
        "--render_pose", "0",
        "--model_folder", model_dir,
        "--number_people_max", "1"  # Only track the main person
    ]

    # Run OpenPose
    print(f"Processing video: {video_path}")
    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.returncode != 0:
        print(f"Error processing video: {result.stderr}")
        return False

    print(f"Successfully processed video. JSON output in: {os.path.join(output_dir, 'json')}")
    return True

In [None]:
OPENPOSE_ROOT = "/content/openpose/openpose"
OPENPOSE_BUILD_PATH = f"{OPENPOSE_ROOT}/build"
MODEL_DIR = f"{OPENPOSE_ROOT}/models"
OUTPUT_DIR = "/content/output"

In [None]:
for item in iter(dataset['train']):
    video_path = item['drive_path']

    # Extract base name for output filenames
    base_name = os.path.splitext(os.path.basename(video_path))[0]
    json_filename = f"{base_name}_keypoints"
    video_filename = f"{base_name}_rendered"

    print(f"Processing: {item['exercise']} ({item['correctness']}) - {item['file_name']}")

    # Process the video
    success = process_video_with_openpose(
        video_path=video_path,
        output_dir=OUTPUT_DIR,
        model_dir=MODEL_DIR,
        json_filename=json_filename,
        video_filename=video_filename
    )

    if success:
        print(f"✓ Successfully processed: {item['file_name']}")
    else:
        print(f"✗ Failed to process: {item['file_name']}")
    print("-" * 50)

### json keypoints to time series

In [None]:
import json
import numpy as np
import os

def json_to_time_series(json_dir):
    """
    Convert OpenPose JSON output to time series array
    Format: (frames, keypoints, 3) where 3 = [x, y, confidence]
    """
    # Get sorted list of JSON files
    json_files = sorted([
        os.path.join(json_dir, f) for f in os.listdir(json_dir)
        if f.endswith(".json")
    ])

    keypoints_sequence = []

    for json_file in json_files:
        with open(json_file) as f:
            data = json.load(f)

            if not data["people"]:
                # No person detected - use zero padding
                keypoints_sequence.append(np.zeros((25, 3)))
                continue

            # Get keypoints of main person (first person)
            keypoints = np.array(data["people"][0]["pose_keypoints_2d"], dtype=np.float32)
            keypoints = keypoints.reshape(-1, 3)  # Reshape to (25, 3)
            keypoints_sequence.append(keypoints)

    return np.array(keypoints_sequence)

In [None]:
def build_dataframe(dataset, base_json_dir):
    """
    Process JSON files and include metadata from the original dataset,
    returning a pandas DataFrame
    """
    train_dataset = dataset['train']
    all_rows = []

    # Create mapping from video base name to metadata
    video_metadata = {}
    for i in range(len(train_dataset)):
        item = train_dataset[i]
        base_name = os.path.splitext(os.path.basename(item['drive_path']))[0]
        video_metadata[base_name] = item

    # Process all JSON directories
    video_dirs = [
        d for d in os.listdir(base_json_dir)
        if os.path.isdir(os.path.join(base_json_dir, d)) and d.endswith('_keypoints')
    ]

    for video_dir in video_dirs:
        video_path = os.path.join(base_json_dir, video_dir)
        video_id = video_dir.replace('_keypoints', '')

        # Get metadata if available
        metadata = video_metadata.get(video_id, {})

        print(f"\nProcessing: {video_id}")
        time_series = json_to_time_series(video_path)

        if time_series is not None:
            # Create a row for the DataFrame
            if isinstance(time_series, np.ndarray):
                time_series = time_series.tolist()

            row = {
                'video_id': video_id,
                'time_series': time_series
            }
            # Add all metadata fields to the row
            row.update(metadata)

            all_rows.append(row)
            print(f"Added to DataFrame: {video_id}")

    # Create DataFrame
    return pd.DataFrame(all_rows)

In [None]:
json_dir = f"{OUTPUT_DIR}/json"
openpose_keypoints_df = build_dataframe(dataset, json_dir)

In [None]:
openpose_keypoints_df.head()

## store full dataset in huggingface
find out how to create dataset for all pose est models in one repo

In [None]:
# Install required packages
!pip install huggingface_hub datasets -q

import os

from datasets import Dataset
from huggingface_hub import login

HF_DATASET_NAME = "exercises-keypoints-openpose"
HF_TOKEN = ""

In [None]:
# Create Hugging Face Dataset
df = pd.DataFrame(openpose_keypoints_df)
openpose_keypoints_dataset = Dataset.from_pandas(df)
df.head()

In [None]:
# Upload metadata to Hugging Face
login(token=HF_TOKEN)
openpose_keypoints_dataset.push_to_hub(HF_DATASET_NAME)

print("✅ Metadata dataset uploaded to Hugging Face Hub!")
print(f"📊 Dataset contains {len(df)} video entries")

## preprocessing for rocket classifier

In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler

def preprocess_keypoints(sequence):
    """
    Preprocess keypoints for time series classification:
    1. Handle missing values
    2. Normalize coordinates
    3. Compute velocities (1st derivative)
    4. Flatten features per frame
    """
    # 1. Fill missing values (zero confidence points)
    conf_threshold = 0.2
    for i in range(len(sequence)):
        frame = sequence[i]
        low_conf = frame[:, 2] < conf_threshold
        frame[low_conf, :2] = np.nan  # Mark low confidence points

    # 2. Interpolate missing values
    from scipy import interpolate
    for k in range(25):  # For each keypoint
        x = sequence[:, k, 0]
        y = sequence[:, k, 1]

        # Create interpolation functions
        valid = ~np.isnan(x)
        indices = np.arange(len(x))

        if np.sum(valid) > 1:  # Enough points to interpolate
            x_interp = interpolate.interp1d(indices[valid], x[valid],
                                           bounds_error=False,
                                           fill_value="extrapolate")
            y_interp = interpolate.interp1d(indices[valid], y[valid],
                                           bounds_error=False,
                                           fill_value="extrapolate")

            sequence[:, k, 0] = x_interp(indices)
            sequence[:, k, 1] = y_interp(indices)

    # 3. Normalize coordinates
    # Compute bounding box for each frame
    for i in range(len(sequence)):
        frame = sequence[i]
        non_zero = frame[:, 2] > conf_threshold

        if np.any(non_zero):
            min_x, min_y = np.min(frame[non_zero, :2], axis=0)
            max_x, max_y = np.max(frame[non_zero, :2], axis=0)

            width = max_x - min_x
            height = max_y - min_y

            if width > 0 and height > 0:
                # Normalize to 0-1 range
                frame[:, 0] = (frame[:, 0] - min_x) / width
                frame[:, 1] = (frame[:, 1] - min_y) / height

    # 4. Compute velocities (1st derivative)
    velocities = np.diff(sequence[:, :, :2], axis=0)
    # Pad to same length
    velocities = np.pad(velocities, ((0, 1), (0, 0), (0, 0)))

    # 5. Combine positions and velocities
    combined = np.concatenate([
        sequence[:, :, :2],  # Positions
        velocities,           # Velocities
        sequence[:, :, [2]]   # Confidence
    ], axis=2)

    # 6. Flatten per frame: (frames, 25 * 5) = (frames, 125)
    # Ensure we always return a 2D array
    flattened = combined.reshape(combined.shape[0], -1)

    # If only one frame exists, ensure it's still 2D
    if flattened.ndim == 1:
        flattened = flattened.reshape(1, -1)

    return flattened

# Preprocess keypoints
processed_data = preprocess_keypoints(keypoints_sequence)
print("Processed data shape:", processed_data.shape)

# Detect with AlphaPose

## Install Alphapose

In [None]:
!git clone https://github.com/MVIG-SJTU/AlphaPose.git
%cd AlphaPose

In [None]:
import os
os.environ['PATH'] = '/usr/local/cuda/bin:' + os.environ['PATH']
os.environ['LD_LIBRARY_PATH'] = '/usr/local/cuda/lib64:' + os.environ.get('LD_LIBRARY_PATH', '')

!pip install cython cython-bbox
!sudo apt-get install libyaml-dev

In [None]:
!python3 setup.py build develop --user

In [None]:
import gdown

# downloading model
url = 'https://drive.google.com/open?id=1kfyedqyn8exjbbNmYq8XGd2EooQjPtF9'
output_path = 'pretrained_models/fast_421_res152_256x192.pth'
gdown.download(url, output_path, quiet=False,fuzzy=True)

# downloading detector
!mkdir detector/yolo/data
url = 'https://drive.google.com/open?id=1D47msNOOiJKvPOXlnpyzdKA3k6E97NTC'
output_path = 'detector/yolo/data/yolov3-spp.weights'
gdown.download(url, output_path, quiet=False,fuzzy=True)

## get alphapose keypoints

### extract json keypoints

In [None]:
!pip3 install cython-bbox

In [None]:
%cd AlphaPose/

In [None]:
cfg_file = "/content/AlphaPose/configs/coco/resnet/256x192_res152_lr1e-3_1x-duc.yaml"
trained_model = '/content/AlphaPose/pretrained_models/fast_421_res152_256x192.pth'
path_to_video = '/content/drive/MyDrive/poseEstRawVids/bs_front_correct_5a1aa285_pnumjerico.mp4'

In [None]:
!python scripts/demo_inference.py --cfg {cfg_file} --checkpoint {trained_model} --video {path_to_video} --format 'open' --outdir examples/res --detbatch 32 --debug

In [None]:
#test img
!python scripts/demo_inference.py --cfg {cfg_file} --checkpoint {trained_model} --image test.png --format 'open' --outdir examples/res --save_img --debug --min_box_area 100

### json keypoints to time series

In [None]:
import json
import pprint

data = json.load(open('/content/AlphaPose/examples/res/alphapose-results.json', 'rb'))
pprint.pprint(data, depth=2)

# Detect with RTM Pose

## Install RTMPose

In [None]:
!pip install rtmlib -i https://pypi.org/simple
!pip install numpy opencv-python opencv-contrib-python onnxruntime onnxruntime-gpu

In [None]:
import cv2

from rtmlib import BodyWithFeet, Body, draw_skeleton
from google.colab.patches import cv2_imshow

device = 'cuda'  # cpu, cuda, mps
backend = 'onnxruntime'  # opencv, onnxruntime, openvino
img = cv2.imread('test.png')

openpose_skeleton = False  # True for openpose-style, False for mmpose-style

wholebody = BodyWithFeet(to_openpose=openpose_skeleton,
                  backend=backend, device=device)

keypoints, scores = wholebody(img)

img_show = draw_skeleton(img, keypoints, scores, kpt_thr=0.5)
cv2_imshow(img_show)

In [None]:
keypoints

In [None]:
print(keypoints.shape)

In [None]:
scores

# Detect with MediaPipe

In [None]:
!pip install -q mediapipe
!pip install numpy==2.3.0

!wget -O pose_landmarker.task -q https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task

In [None]:
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
import cv2 # Import cv2 for color conversion

def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  # Convert RGB to BGR for OpenCV drawing functions
  annotated_image = cv2.cvtColor(np.copy(rgb_image), cv2.COLOR_RGB2BGR)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [None]:
# STEP 1: Import the necessary modules.
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from google.colab.patches import cv2_imshow

# STEP 2: Create an PoseLandmarker object.
base_options = python.BaseOptions(model_asset_path='pose_landmarker.task')
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    output_segmentation_masks=False)
detector = vision.PoseLandmarker.create_from_options(options)

# STEP 3: Load the input image.
image = mp.Image.create_from_file("test.png")

# STEP 4: Detect pose landmarks from the input image.
detection_result = detector.detect(image)

# STEP 5: Process the detection result. In this case, visualize it.
annotated_image = draw_landmarks_on_image(image.numpy_view(), detection_result)
cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))

In [None]:
detection_result.pose_landmarks

In [None]:
import numpy as np
import json

# Step 1: Define Keypoint Mapping
# This mapping is based on common conventions.
# You might need to verify against official documentation if discrepancies arise.
# MediaPipe (33 keypoints) to OpenPose (25 keypoints)
# MediaPipe indices: https://google.github.io/mediapipe/solutions/pose#pose_landmarks_lite
# OpenPose indices: https://github.com/CMU-Perceptual-Computing-Lab/openpose/blob/master/doc/output.md#pose-output-format-body_25
mediapipe_to_openpose_map = {
    0: 0,   # Nose
    1: None,  # Left eye inner - No direct equivalent in provided OpenPose list
    2: 16,  # Left eye
    3: None,  # Left eye outer - No direct equivalent in provided OpenPose list
    4: None,  # Right eye inner - No direct equivalent in provided OpenPose list
    5: 15,  # Right eye
    6: None,  # Right eye outer - No direct equivalent in provided OpenPose list
    7: 18,  # Left ear
    8: 17,   # Right ear
    9: None,  # Mouth left - No direct equivalent
    10: None, # Mouth right - No direct equivalent
    11: 5,  # Left shoulder
    12: 2,  # Right shoulder
    13: 6,  # Left elbow
    14: 3,  # Right elbow
    15: 7,  # Left wrist
    16: 4,  # Right wrist
    17: None, # Left pinky - No direct equivalent
    18: None, # Left index - No direct equivalent
    19: None, # Left thumb - No direct equivalent
    20: None, # Right pinky - No direct equivalent
    21: None, # Right index - No direct equivalent
    22: None, # Right thumb - No direct equivalent
    23: 12, # Left hip
    24: 9, # Right hip
    25: 13, # Left knee
    26: 10, # Right knee
    27: 14, # Left ankle
    28: 11, # Right ankle
    29: 21, # Left heel
    30: 24, # Right heel
    31: 19, # Left foot index (LBigToe)
    32: 22  # Right foot index (RBigToe)
}

# Step 2 & 3: Create Conversion Function and Initialize Structure
def convert_mediapipe_to_openpose(mediapipe_landmarks_list):
    """
    Converts MediaPipe pose landmarks to OpenPose format.

    Args:
        mediapipe_landmarks_list: A list of lists, where each inner list
                                   contains NormalizedLandmark objects for a person.

    Returns:
        A list of dictionaries, each representing keypoints for a person
        in OpenPose format (25 keypoints: [x, y, confidence]).
    """
    openpose_data = []

    for person_landmarks in mediapipe_landmarks_list:
        # Initialize OpenPose structure (25 keypoints, x, y, confidence)
        openpose_keypoints = np.zeros((25, 3), dtype=np.float32)

        # Step 4: Map and Populate Keypoints
        for mp_idx, op_idx in mediapipe_to_openpose_map.items():
            if op_idx is not None and mp_idx < len(person_landmarks):
                landmark = person_landmarks[mp_idx]
                # Use visibility as confidence, presence can also be considered
                confidence = landmark.visibility if landmark.visibility is not None else landmark.presence
                openpose_keypoints[op_idx] = [landmark.x, landmark.y, confidence]
            # Step 5: Handle Missing Keypoints - Already initialized to zeros

        # Convert NumPy array to list of lists, representing the 25 keypoints
        openpose_keypoints_list = openpose_keypoints.tolist()
        openpose_data.append({"pose_keypoints_2d": openpose_keypoints_list})


    return openpose_data

# Step 6: Test Conversion with sample data from cell 3aYy-sGQW1sm output
# Assuming 'detection_result' from cell qG7ziJQ4VFXj is available
if 'detection_result' in locals() and hasattr(detection_result, 'pose_landmarks'):
    sample_mediapipe_landmarks = detection_result.pose_landmarks
    openpose_formatted_data = convert_mediapipe_to_openpose(sample_mediapipe_landmarks)

    # Print the converted data (formatted for readability)
    print("Converted OpenPose data:")
    print(json.dumps(openpose_formatted_data, indent=2))
else:
    print("MediaPipe detection_result not found. Please run cell qG7ziJQ4VFXj first.")

In [None]:
openpose_formatted_data

# format for rocket classifier

In [None]:
def create_rocket_dataset(processed_data, labels, output_path, problem_name="PoseClassification"):
    """
    Create dataset in sktime format with proper length specification
    """
    import pandas as pd
    import numpy as np
    from sktime.datasets import write_dataframe_to_tsfile

    # Ensure input is list of time series (even for single video)
    if not isinstance(processed_data, list):
        processed_data = [processed_data]

    # Convert to nested DataFrame format
    df_list = []

    for i, video_data in enumerate(processed_data):
        # Ensure 2D array: [time, features]
        if video_data.ndim == 1:
            video_data = video_data.reshape(-1, 1)

        n_features = video_data.shape[1]
        row_dict = {}

        for j in range(n_features):
            row_dict[f"dim_{j}"] = pd.Series(video_data[:, j])

        df_list.append(row_dict)

    df = pd.DataFrame(df_list)

    # Determine time series properties
    lengths = [len(video_data) for video_data in processed_data]
    equal_length = len(set(lengths)) == 1
    series_length = lengths[0] if equal_length else None

    # Write to ts file
    write_dataframe_to_tsfile(
        data=df,
        path=output_path,
        problem_name=problem_name,
        class_label=np.unique(labels).tolist(),
        class_value_list=labels,
        comment="OpenPose keypoints time series",
        fold="_TRAIN",
        equal_length=equal_length,
        series_length=series_length
    )
    print(f"Dataset saved to {output_path}")

In [None]:
# Wrap single video in a list to match expected format
processed_data_list = [processed_data]

# Convert labels to NumPy array (one label per video)
labels = np.array(["exercise1"])  # Use actual labels for your videos

# Create dataset
create_rocket_dataset(processed_data_list, labels, "/content/dataset/POSE_TRAIN.ts")