# Data Preprocessing

This notebook loads pose landmarks, extracts joint angles, and prepares sequences for model training.

In [2]:
import sys
sys.path.append('/content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation')

In [3]:
# ✅ Environment Setup
from utils.setup import setup_environment
base_path = setup_environment(mount_gdrive=True)

# ✅ Imports
import os
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Tuple, List
import glob
from sklearn.preprocessing import MinMaxScaler

from utils.config import PROCESSED_DIR, ENGINEERED_DIR, EXERCISES

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Environment set. Project base path: /content/drive/MyDrive/pose-estimation-research


In [4]:
WINDOW_SIZE = 30
STRIDE = 10
EXERCISES = ["squat", "lunges", "bench_press", "pull_ups"]

# === Joint Configs by Exercise ===
EXERCISE_JOINTS = {
    "squat": ["hip_angle_left", "knee_angle_left", "hip_angle_right", "knee_angle_right"],
    "bench_press": ["elbow_angle_left", "elbow_angle_right"],
    "pull_ups": ["elbow_angle_left", "elbow_angle_right", "shoulder_angle_left", "shoulder_angle_right"],
    "lunges": ["hip_angle_left", "knee_angle_left", "hip_angle_right", "knee_angle_right"],
}

In [5]:
def save_dataset(X, y, feature_names, save_dir, exercise, is_featured=False):
    os.makedirs(save_dir, exist_ok=True)

    variant = "featured" if is_featured else "base"
    fname_X = "X_featured.npy" if is_featured else "X.npy"
    fname_y = "y.npy"  # y is shared in both

    np.save(os.path.join(save_dir, fname_X), X)

    # Save y only once
    if not is_featured or not os.path.exists(os.path.join(save_dir, fname_y)):
        np.save(os.path.join(save_dir, fname_y), y)

    # Meta
    meta = {
        "exercise": exercise,
        "variant": variant,
        "n_samples": X.shape[0],
        "n_features": X.shape[-1],
        "feature_names": feature_names,
        "window_size": WINDOW_SIZE,
        "stride": STRIDE
    }

    meta_fname = "meta_featured.json" if is_featured else "meta.json"
    with open(os.path.join(save_dir, meta_fname), "w") as f:
        json.dump(meta, f, indent=4)

In [6]:
# === Load & Normalize CSV ===
def load_and_window_csv(csv_path: str, window_size: int, stride: int) -> np.ndarray:
    df = pd.read_csv(csv_path)
    df = df.drop(columns=["timestamp_ms"], errors="ignore")

    scaler = MinMaxScaler()
    normalized = scaler.fit_transform(df)

    windows = []
    for i in range(0, len(normalized) - window_size + 1, stride):
        windows.append(normalized[i:i + window_size])

    return np.array(windows), df.columns.tolist()

In [7]:
# === Extract Degradation Label from JSON ===
def extract_degradation_score(json_path: str, joints: List[str]) -> float:
    with open(json_path, "r") as f:
        data = json.load(f)
    try:
        metrics = data["metrics"]
        scores = [
            metrics[f"{joint}_dtw_degradation"]["value"]
            for joint in joints if f"{joint}_dtw_degradation" in metrics
        ]
        return float(np.mean(scores))
    except KeyError:
        return np.nan

In [8]:
def process_video_folder(video_folder: str, exercise: str, use_featured_csv: bool = False) -> Tuple[np.ndarray, np.ndarray, List[str]]:
    suffix = "_angles_featured.csv" if use_featured_csv else "_angles.csv"
    angle_files = glob.glob(os.path.join(video_folder, f"*{suffix}"))
    json_files = glob.glob(os.path.join(video_folder, "*_features_metadata.json"))

    if not angle_files or not json_files:
        print(f"⚠️ Skipping {video_folder}: missing {suffix} or JSON.")
        return np.empty((0,)), np.empty((0,)), []

    csv_path = angle_files[0]
    json_path = json_files[0]
    joints = EXERCISE_JOINTS.get(exercise)

    if not joints:
        print(f"⚠️ No joint config for exercise: {exercise}")
        return np.empty((0,)), np.empty((0,)), []

    try:
        X, used_features = load_and_window_csv(csv_path, WINDOW_SIZE, STRIDE)
        y_val = extract_degradation_score(json_path, joints)
        if np.isnan(y_val):
            print(f"⚠️ Skipping {csv_path}: invalid score.")
            return np.empty((0,)), np.empty((0,)), []

        y = np.full((X.shape[0],), y_val)
        return X, y, used_features
    except Exception as e:
        print(f"❌ Error in {video_folder}: {e}")
        return np.empty((0,)), np.empty((0,)), []

In [9]:
def build_dataset_from_processed(processed_dir: str, exercise: str) -> Tuple[np.ndarray, np.ndarray]:
    exercise_dir = os.path.join(processed_dir, exercise)
    video_folders = [
        os.path.join(exercise_dir, f)
        for f in os.listdir(exercise_dir)
        if os.path.isdir(os.path.join(exercise_dir, f))
    ]

    datasets = {}

    for use_featured in [False, True]:
        X_all, y_all, features_set = [], [], set()
        variant = "X_featured" if use_featured else "X"

        for folder_path in tqdm(video_folders, desc=f"🔄 Processing {exercise} | {variant}", unit="video"):
            X, y, used_features = process_video_folder(folder_path, exercise, use_featured_csv=use_featured)
            if X.shape[0] > 0:
                X_all.append(X)
                y_all.append(y)
                features_set.update(used_features)

        if X_all:
            X_total = np.concatenate(X_all, axis=0)
            y_total = np.concatenate(y_all, axis=0)

            save_path = os.path.join(ENGINEERED_DIR, exercise)
            save_dataset(X_total, y_total, list(features_set), save_path, exercise, is_featured=use_featured)

            print(f"✅ Saved {variant} for {exercise}: {X_total.shape}, y={y_total.shape}")
            datasets[variant] = (X_total, y_total)
        else:
            print(f"❌ No valid data for {exercise} | {variant}")
            datasets[variant] = (np.empty((0,)), np.empty((0,)))

    return datasets

In [10]:
def build_all_exercises_from_processed(processed_dir: str) -> dict:
    results = {}
    for exercise in EXERCISES:
        result = build_dataset_from_processed(processed_dir, exercise)
        results[exercise] = result  # Dict: {"X": (X, y), "X_featured": (Xf, y)}
    return results

In [11]:
datasets = build_all_exercises_from_processed(PROCESSED_DIR)

# X_squat, y_squat = datasets["squat"]

🔄 Processing squat | X: 100%|██████████| 112/112 [02:01<00:00,  1.08s/video]


✅ Saved X for squat: (1168, 30, 4), y=(1168,)


🔄 Processing squat | X_featured: 100%|██████████| 112/112 [01:06<00:00,  1.70video/s]


✅ Saved X_featured for squat: (1168, 30, 30), y=(1168,)


🔄 Processing lunges | X: 100%|██████████| 127/127 [02:12<00:00,  1.04s/video]


✅ Saved X for lunges: (2517, 30, 4), y=(2517,)


🔄 Processing lunges | X_featured: 100%|██████████| 127/127 [01:07<00:00,  1.88video/s]


✅ Saved X_featured for lunges: (2517, 30, 22), y=(2517,)


🔄 Processing bench_press | X:  15%|█▌        | 24/160 [00:26<02:16,  1.00s/video]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g24_c03/v_BenchPress_g24_c03_angles.csv: invalid score.


🔄 Processing bench_press | X:  83%|████████▎ | 133/160 [02:21<00:27,  1.02s/video]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g20_c06/v_BenchPress_g20_c06_angles.csv: invalid score.


🔄 Processing bench_press | X:  92%|█████████▏| 147/160 [02:35<00:12,  1.03video/s]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g06_c03/v_BenchPress_g06_c03_angles.csv: invalid score.


🔄 Processing bench_press | X: 100%|██████████| 160/160 [02:48<00:00,  1.05s/video]


✅ Saved X for bench_press: (1342, 30, 2), y=(1342,)


  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))
🔄 Processing bench_press | X_featured:  15%|█▌        | 24/160 [00:12<01:14,  1.82video/s]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g24_c03/v_BenchPress_g24_c03_angles_featured.csv: invalid score.


  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))
🔄 Processing bench_press | X_featured:  83%|████████▎ | 133/160 [01:10<00:14,  1.83video/s]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g20_c06/v_BenchPress_g20_c06_angles_featured.csv: invalid score.


  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))
🔄 Processing bench_press | X_featured:  92%|█████████▏| 147/160 [01:17<00:07,  1.82video/s]

⚠️ Skipping /content/drive/MyDrive/msc_data_analytics_thesis_project_pose_estimation/data/processed/bench_press/v_BenchPress_g06_c03/v_BenchPress_g06_c03_angles_featured.csv: invalid score.


🔄 Processing bench_press | X_featured: 100%|██████████| 160/160 [01:25<00:00,  1.88video/s]


✅ Saved X_featured for bench_press: (1342, 30, 15), y=(1342,)


🔄 Processing pull_ups | X: 100%|██████████| 100/100 [01:42<00:00,  1.03s/video]


✅ Saved X for pull_ups: (1069, 30, 4), y=(1069,)


🔄 Processing pull_ups | X_featured: 100%|██████████| 100/100 [00:53<00:00,  1.87video/s]

✅ Saved X_featured for pull_ups: (1069, 30, 22), y=(1069,)



