# Squat quality scoring with TensorFlow

This notebook trains a small BlazePose + LSTM regressor on your local squat videos and predicts a quality score per clip.


## 1) Environment setup
- Installs TensorFlow + MediaPipe + OpenCV.
- Use your own data/squats_train and data/squats_test folders; no downloads required.


In [None]:
!python -m pip install --upgrade pip
!python -m pip install "tensorflow<2.17" tensorflow-io jupyter
!python -m pip install mediapipe opencv-python


## 2) Imports and configuration
- Adjust paths or scoring scale if needed.
- Scores are expected in data/squat_scores.csv.


In [1]:
import os
import random
import csv
from pathlib import Path

import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

BATCH_SIZE = 4
NUM_FRAMES = 16
IMG_SIZE = 160
NUM_LANDMARKS = 33  # BlazePose outputs 33 landmarks
LANDMARK_DIMS = 4   # x, y, z, visibility
SCORE_SCALE = 100.0  # labels are 0-100; model trains on 0-1 internally

DATA_ROOT = Path("data")
TRAIN_DIR = DATA_ROOT / "squats_train"
TEST_DIR = DATA_ROOT / "squats_test"
LABELS_PATH = DATA_ROOT / "squat_scores.csv"
MODEL_DIR = Path("checkpoints")

VIDEO_EXTS = (".mp4", ".mov", ".avi", ".mkv")

MODEL_DIR.mkdir(parents=True, exist_ok=True)
TRAIN_DIR.mkdir(parents=True, exist_ok=True)
TEST_DIR.mkdir(parents=True, exist_ok=True)

print(tf.__version__)
print("Data root:", DATA_ROOT.resolve())
print("Model dir:", MODEL_DIR.resolve())


2.16.2
Data root: C:\Users\KarthikPC\vscode_projects\CS663_Project2_training\data
Model dir: C:\Users\KarthikPC\vscode_projects\CS663_Project2_training\checkpoints


## 3) Prepare local data and labels
- A CSV template is generated listing every video under data/squats_train and data/squats_test.
- Fill in the score column (0-100). At least two labeled train videos are required to run.


In [2]:
def list_videos(root: Path):
    return sorted(
        p for p in root.rglob("*")
        if p.suffix.lower() in VIDEO_EXTS and p.is_file()
    )


def ensure_label_file():
    existing = {}
    if LABELS_PATH.exists():
        with LABELS_PATH.open("r", newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                existing[row.get("relative_path", "")] = row.get("score", "")

    rows = []
    for p in list_videos(TRAIN_DIR) + list_videos(TEST_DIR):
        rel = p.relative_to(DATA_ROOT).as_posix()
        rows.append({"relative_path": rel, "score": existing.get(rel, "")})

    LABELS_PATH.parent.mkdir(parents=True, exist_ok=True)
    with LABELS_PATH.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=["relative_path", "score"])
        writer.writeheader()
        writer.writerows(rows)

    print(f"Label file ready at {LABELS_PATH}. Fill in 'score' (0-{int(SCORE_SCALE)}) for each row.")
    return rows


_ = ensure_label_file()


Label file ready at data\squat_scores.csv. Fill in 'score' (0-100) for each row.


### Label summary / sanity check


In [3]:
def load_labeled_samples():
    samples = []
    missing = []
    with LABELS_PATH.open("r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            rel = row.get("relative_path", "")
            score_str = row.get("score", "").strip()
            if not rel:
                continue

            full = DATA_ROOT / rel
            if not full.exists():
                missing.append(rel)
                continue

            if not score_str:
                continue

            try:
                score = float(score_str)
            except ValueError:
                print(f"Skipping {rel}: invalid score '{score_str}'")
                continue

            score = max(0.0, min(SCORE_SCALE, score))
            samples.append((str(full), score))

    if missing:
        print("Warning: paths not found on disk:", missing)

    print(f"Loaded {len(samples)} labeled samples.")
    return samples


labeled_samples = load_labeled_samples()
if len(labeled_samples) < 2:
    raise ValueError("Add scores in the CSV (at least 2 labeled videos) before training.")


Loaded 132 labeled samples.


## 4) Build a TensorFlow input pipeline
- Uniformly sample frames, run BlazePose to get 33 landmarks per frame, normalize, and emit flattened keypoints.
- Labels are normalized to 0-1 during training; final scores are rescaled to 0-100.


In [4]:
mp_pose = mp.solutions.pose


def _sample_frame_indices(total_frames: int, num_target: int) -> np.ndarray:
    if total_frames <= 0:
        return np.zeros((num_target,), dtype=np.int32)
    idxs = np.linspace(0, max(total_frames - 1, 0), num_target).astype(np.int32)
    return idxs


def _normalize_landmarks(landmarks: np.ndarray) -> np.ndarray:
    """Robust normalization.

    Rules:
    - Use visibility (4th channel) to decide which landmarks to trust.
    - Prefer hip midpoint + shoulder midpoint for center/scale when available.
    - Fallback to visible-point mean if those keypoints are missing.
    - Rotate using hip vector if available, otherwise shoulders, otherwise PCA of visible points.

    Returns normalized landmarks (same shape) or None if insufficient visible points.
    """
    # landmarks shape: (NUM_LANDMARKS, LANDMARK_DIMS)
    vis = landmarks[:, 3] >= 0.25
    if vis.sum() < 2:
        # Not enough points
        return None

    def valid_pair(i, j):
        try:
            return bool(vis[i] and vis[j])
        except Exception:
            return False

    LEFT_HIP, RIGHT_HIP = 23, 24
    LEFT_SHOULDER, RIGHT_SHOULDER = 11, 12

    if valid_pair(LEFT_HIP, RIGHT_HIP):
        left_hip, right_hip = landmarks[LEFT_HIP, :3], landmarks[RIGHT_HIP, :3]
        center_hip = (left_hip + right_hip) / 2.0
    else:
        visible_coords = landmarks[vis, :3]
        center_hip = visible_coords.mean(axis=0)

    if valid_pair(LEFT_SHOULDER, RIGHT_SHOULDER):
        left_sh, right_sh = landmarks[LEFT_SHOULDER, :3], landmarks[RIGHT_SHOULDER, :3]
        center_sh = (left_sh + right_sh) / 2.0
    elif valid_pair(LEFT_HIP, RIGHT_HIP):
        # crude fallback: estimate shoulders above hips
        center_sh = center_hip + np.array([0.0, 0.5, 0.0])
    else:
        center_sh = center_hip + np.array([0.0, 0.5, 0.0])

    torso = np.linalg.norm(center_sh[:2] - center_hip[:2])
    hip_dist = np.linalg.norm(landmarks[LEFT_HIP, :2] - landmarks[RIGHT_HIP, :2]) if valid_pair(LEFT_HIP, RIGHT_HIP) else torso
    scale = max(torso, hip_dist, 1e-3)

    landmarks[:, :3] = (landmarks[:, :3] - center_hip) / scale

    # compute rotation
    if valid_pair(LEFT_HIP, RIGHT_HIP):
        hip_vec = (landmarks[RIGHT_HIP, :2] - landmarks[LEFT_HIP, :2])
    elif valid_pair(LEFT_SHOULDER, RIGHT_SHOULDER):
        hip_vec = (landmarks[RIGHT_SHOULDER, :2] - landmarks[LEFT_SHOULDER, :2])
    else:
        visible_pts = landmarks[vis, :2]
        if visible_pts.shape[0] < 2:
            hip_vec = np.array([1.0, 0.0])
        else:
            pts_centered = visible_pts - visible_pts.mean(axis=0)
            u, s, vh = np.linalg.svd(pts_centered, full_matrices=False)
            hip_vec = vh[0]

    angle = np.arctan2(hip_vec[1], hip_vec[0] + 1e-6)
    cos_a, sin_a = np.cos(-angle), np.sin(-angle)
    rot = np.array([[cos_a, -sin_a], [sin_a, cos_a]], dtype=np.float32)
    landmarks[:, :2] = landmarks[:, :2] @ rot.T
    return landmarks


def _extract_keypoints_np(video_path: str) -> np.ndarray:
    """Extract normalized keypoints for NUM_FRAMES timesteps.

    - Uses a local MediaPipe Pose instance (safe for parallel runs)
    - Marks missing frames and interpolates them
    - Appends a per-frame validity scalar (1.0 if frame originally had pose) as extra feature
    - Caches outputs under checkpoints/keypoints_cache/ to speed repeated runs
    """
    import hashlib

    cache_dir = MODEL_DIR / "keypoints_cache"
    cache_dir.mkdir(parents=True, exist_ok=True)
    h = hashlib.sha1(video_path.encode("utf-8")).hexdigest()
    cache_file = cache_dir / f"{h}.npy"
    if cache_file.exists():
        try:
            return np.load(str(cache_file))
        except Exception:
            pass

    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()

    num_frames = len(frames)
    keypoints = np.full((NUM_FRAMES, NUM_LANDMARKS, LANDMARK_DIMS), np.nan, dtype=np.float32)
    if num_frames == 0:
        # nothing to do — return zeros + invalid mask
        filled = np.zeros((NUM_FRAMES, NUM_LANDMARKS * LANDMARK_DIMS), dtype=np.float32)
        valid_mask = np.zeros((NUM_FRAMES,), dtype=np.float32)
        out = np.concatenate([filled, valid_mask[:, None]], axis=1)
        np.save(str(cache_file), out)
        return out

    idxs = _sample_frame_indices(num_frames, NUM_FRAMES)

    mp_pose_local = mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        smooth_landmarks=True,
    )

    valid_per_frame = np.zeros((NUM_FRAMES,), dtype=bool)
    for out_i, frame_idx in enumerate(idxs):
        frame = frames[int(frame_idx)]
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = mp_pose_local.process(image_rgb)
        if results.pose_landmarks:
            lm = results.pose_landmarks.landmark
            coords = np.array([[p.x, p.y, p.z, p.visibility] for p in lm], dtype=np.float32)
            norm = _normalize_landmarks(coords)
            if norm is not None:
                keypoints[out_i] = norm
                valid_per_frame[out_i] = True

    try:
        mp_pose_local.close()
    except Exception:
        pass

    # interpolate missing values per-landmark and per-dimension
    idxs_time = np.arange(NUM_FRAMES)
    for li in range(NUM_LANDMARKS):
        for d in range(LANDMARK_DIMS):
            series = keypoints[:, li, d]
            good = ~np.isnan(series)
            if good.any():
                keypoints[:, li, d] = np.interp(idxs_time, idxs_time[good], series[good])
            else:
                keypoints[:, li, d] = 0.0

    frame_valid = valid_per_frame.astype(np.float32)

    keypoints_flat = keypoints.reshape((NUM_FRAMES, NUM_LANDMARKS * LANDMARK_DIMS))
    out = np.concatenate([keypoints_flat, frame_valid[:, None]], axis=1).astype(np.float32)

    try:
        np.save(str(cache_file), out)
    except Exception:
        pass

    return out


def load_keypoints(path: tf.Tensor) -> tf.Tensor:
    def _py_decode(p):
        return _extract_keypoints_np(p.numpy().decode("utf-8"))

    kpts = tf.py_function(_py_decode, [path], tf.float32)
    # shape = NUM_FRAMES x (NUM_LANDMARKS * LANDMARK_DIMS + 1)
    kpts.set_shape((NUM_FRAMES, NUM_LANDMARKS * LANDMARK_DIMS + 1))
    return kpts


def preprocess(path: tf.Tensor, score: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
    keypoints = load_keypoints(path)
    # already (NUM_FRAMES, features); no reshape needed
    score = tf.cast(score, tf.float32) / SCORE_SCALE
    score = tf.expand_dims(score, axis=-1)
    return keypoints, score


def build_tf_dataset(samples, training: bool):
    paths, scores = zip(*samples)
    ds = tf.data.Dataset.from_tensor_slices((list(paths), list(scores)))
    if training:
        ds = ds.shuffle(buffer_size=len(paths), seed=SEED, reshuffle_each_iteration=True)
    ds = ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    return ds


def _in_dir(path_str: str, root: Path) -> bool:
    try:
        Path(path_str).resolve().relative_to(root.resolve())
        return True
    except ValueError:
        return False


train_samples = [s for s in labeled_samples if _in_dir(s[0], TRAIN_DIR)]
test_samples = [s for s in labeled_samples if _in_dir(s[0], TEST_DIR)]
if len(train_samples) < 2:
    raise ValueError("Need at least 2 labeled train videos in squats_train for a train/val split.")

random.shuffle(train_samples)
split = max(1, int(0.8 * len(train_samples)))
if split >= len(train_samples):
    split = len(train_samples) - 1

train_ds = build_tf_dataset(train_samples[:split], training=True)
val_ds = build_tf_dataset(train_samples[split:], training=False)
test_ds = build_tf_dataset(test_samples, training=False) if test_samples else None

print("Train batches:", len(train_ds))
print("Val batches:", len(val_ds))
print("Test batches:", len(test_ds) if test_ds is not None else 0)


Train batches: 20
Val batches: 5
Test batches: 8


## 5) Define a lightweight regression model
- BiLSTM + pooling over landmark sequences; single sigmoid output predicts normalized score.


In [5]:
FEATURE_DIMS = NUM_LANDMARKS * LANDMARK_DIMS + 1

def build_model() -> tf.keras.Model:
    inputs = tf.keras.Input(shape=(NUM_FRAMES, FEATURE_DIMS))
    # no global masking — we append a per-frame "valid" flag as one feature
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(inputs)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dense(128, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Dense(64, activation="relu")(x)
    outputs = tf.keras.layers.Dense(1, activation="sigmoid")(x)  # normalized score 0-1
    return tf.keras.Model(inputs, outputs)


model = build_model()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss="mse",
    metrics=[tf.keras.metrics.MeanAbsoluteError(name="mae")],
)
model.summary()

## 6) Train
- Early stopping on validation MAE.


In [6]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True, monitor="val_mae"),
    tf.keras.callbacks.ModelCheckpoint(str(MODEL_DIR / "model.keras"), save_best_only=True, monitor="val_mae"),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=40,
    callbacks=callbacks,
)

best_val_mae = min(history.history["val_mae"])
print("Best val MAE (normalized 0-1):", best_val_mae)
print("Best val MAE (score units):", best_val_mae * SCORE_SCALE)


Epoch 1/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 369ms/step - loss: 0.1647 - mae: 0.3790 - val_loss: 0.1228 - val_mae: 0.3032
Epoch 2/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 369ms/step - loss: 0.1647 - mae: 0.3790 - val_loss: 0.1228 - val_mae: 0.3032
Epoch 2/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.1114 - mae: 0.2759 - val_loss: 0.1110 - val_mae: 0.2595
Epoch 3/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.1114 - mae: 0.2759 - val_loss: 0.1110 - val_mae: 0.2595
Epoch 3/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 0.0968 - mae: 0.2364 - val_loss: 0.1020 - val_mae: 0.2384
Epoch 4/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 0.0968 - mae: 0.2364 - val_loss: 0.1020 - val_mae: 0.2384
Epoch 4/40
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - loss: 

## 7) Evaluate and save artifacts


In [7]:
eval_target = test_ds if test_ds is not None else val_ds
eval_results = model.evaluate(eval_target, return_dict=True)
print(eval_results)
print(f"MAE in score units: {eval_results['mae'] * SCORE_SCALE:.2f}")

export_dir = MODEL_DIR / "squat_scorer.keras"
model.save(export_dir)
with (MODEL_DIR / "score_scale.txt").open("w", encoding="utf-8") as f:
    f.write(str(SCORE_SCALE))

print("Artifacts saved to", MODEL_DIR)


[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 107ms/step - loss: 0.0521 - mae: 0.1734  
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 107ms/step - loss: 0.0521 - mae: 0.1734
{'loss': 0.05210424214601517, 'mae': 0.17343056201934814}
MAE in score units: 17.34
Artifacts saved to checkpoints
{'loss': 0.05210424214601517, 'mae': 0.17343056201934814}
MAE in score units: 17.34
Artifacts saved to checkpoints


## 8) Export TFLite for Android


In [8]:
from pathlib import Path

MODEL_DIR = Path("checkpoints")
saved_model_dir = MODEL_DIR / "squat_scorer_savedmodel"  # or wherever you exported
tflite_path = MODEL_DIR / "squat_scorer.tflite"

converter = tf.lite.TFLiteConverter.from_saved_model(str(saved_model_dir))
converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Key flags for TensorList + LSTM
converter.experimental_enable_resource_variables = True
converter._experimental_lower_tensor_list_ops = False
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS,
]

tflite_model = converter.convert()
tflite_path.write_bytes(tflite_model)
print("Wrote", tflite_path)


Wrote checkpoints\squat_scorer.tflite


## 9) Single-sample inference helper


In [9]:
def predict_sample(video_path: str):
    keypoints = _extract_keypoints_np(video_path)
    # keypoints shape: (NUM_FRAMES, FEATURE_DIMS)
    keypoints = keypoints.reshape(1, NUM_FRAMES, NUM_LANDMARKS * LANDMARK_DIMS + 1)
    score_norm = float(model.predict(keypoints, verbose=0)[0][0])
    return score_norm * SCORE_SCALE


example_path = train_samples[0][0] if train_samples else labeled_samples[0][0]
pred_score = predict_sample(example_path)
print(f"Predicted score: {pred_score:.2f} (0-{int(SCORE_SCALE)}) on {example_path}")

Predicted score: 80.90 (0-100) on data\squats_train\v_BodyWeightSquats_g14_c02.avi
