<a href="https://colab.research.google.com/github/preethishp/ExerciseClassificationAndFormCorrection/blob/main/Squat_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install numpy scipy pandas tqdm
!pip install torch torchvision torchaudio
!pip install ultralytics opencv-python-headless
!pip install scikit-learn matplotlib seaborn

In [None]:

from google.colab import drive
drive.mount('/content/drive')

SAVE_ROOT   = "/content/drive/MyDrive/fitness_form_ai"
SAVE_MODELS = f"{SAVE_ROOT}/models"
SAVE_CACHE  = f"{SAVE_ROOT}/pose_cache"

CKPT_LOCAL  = "/content/best_tcn.pt"
CACHE_LOCAL = "/content/pose_seq_yolo"

import os, shutil
from pathlib import Path
os.makedirs(CACHE_LOCAL, exist_ok=True)


shutil.copy2(Path(SAVE_MODELS)/"best_tcn.pt", CKPT_LOCAL)


if (Path(SAVE_CACHE)/"labels.json").exists():
    shutil.copy2(Path(SAVE_CACHE)/"labels.json", Path(CACHE_LOCAL)/"labels.json")

restored = 0
for npy in Path(SAVE_CACHE).glob("*.npy"):
    shutil.copy2(npy, Path(CACHE_LOCAL)/npy.name); restored += 1

print(f"Restored model → {CKPT_LOCAL}")
print(f"Restored {restored} .npy files → {CACHE_LOCAL}")

Mounted at /content/drive
✅ Restored model → /content/best_tcn.pt
✅ Restored 630 .npy files → /content/pose_seq_yolo


In [None]:
!pip install numpy scipy pandas tqdm
!pip install torch torchvision torchaudio
!pip install ultralytics opencv-python-headless
!pip install scikit-learn matplotlib seaborn

Collecting ultralytics
  Downloading ultralytics-8.3.186-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.16-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.186-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.16-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.186 ultralytics-thop-2.0.16


In [None]:
%%writefile infer_baseline.py
#!/usr/bin/env python3
# infer_baseline.py
import argparse, json, re
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn

# ------------------- TCN (baseline) -------------------
class Chomp1d(nn.Module):
    def __init__(self, c): super().__init__(); self.c=c
    def forward(self, x): return x[:, :, :-self.c].contiguous() if self.c>0 else x

class TemporalBlock(nn.Module):
    def __init__(self, n_in, n_out, k, dilation, padding, dropout=0.0):
        super().__init__()
        self.seq = nn.Sequential(
            nn.Conv1d(n_in, n_out, k, padding=padding, dilation=dilation),
            nn.ReLU(), nn.Dropout(dropout), Chomp1d(padding),
            nn.Conv1d(n_out, n_out, k, padding=padding, dilation=dilation),
            nn.ReLU(), nn.Dropout(dropout), Chomp1d(padding),
        )
        self.down = nn.Conv1d(n_in, n_out, 1) if n_in!=n_out else nn.Identity()
    def forward(self, x):
        out = self.seq(x)
        return out + self.down(x)

class TCNClassifier(nn.Module):
    def __init__(self, feats=34, channels=(64,128,256), k=7, dropout=0.0, num_classes=10):
        super().__init__()
        layers=[]; n_in=feats
        for i, ch in enumerate(channels):
            dil = 2**i
            pad = (k-1)*dil
            layers.append(TemporalBlock(n_in, ch, k, dil, pad, dropout))
            n_in = ch
        self.tcn = nn.Sequential(*layers)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc   = nn.Linear(n_in, num_classes)
    def forward(self, x):
        x = x.transpose(1,2)
        y = self.tcn(x)
        y = self.pool(y).squeeze(-1)
        return self.fc(y)

# ------------------- Seq utilities --------------------
def pad_or_center_trim(x: np.ndarray, T: int) -> np.ndarray:
    t = len(x)
    if t == T: return x
    if t > T:
        s = (t - T)//2
        return x[s:s+T]
    pad = np.zeros((T - t, x.shape[1]), dtype=x.dtype)
    return np.concatenate([x, pad], axis=0)

# ---------- YOLOv8 pose seq ----------
def extract_pose_sequence_yolo_batched(
    video_path: Path, every_n=3, batch_size=32, imgsz=448, conf=0.25, device="cpu"
):
    """
    Optional. Requires: pip install ultralytics opencv-python-headless
    Produces normalized 17x2 sequences → (T,34)
    """
    import cv2
    from ultralytics import YOLO

    LEFT_HIP, RIGHT_HIP = 11, 12
    LEFT_SHO, RIGHT_SHO = 5, 6

    def _pre(frame, target=448):
        h, w = frame.shape[:2]
        if max(h, w) > target:
            s = target / max(h, w)
            frame = cv2.resize(frame, (int(w*s), int(h*s)), interpolation=cv2.INTER_AREA)
        return frame

    pose_model = YOLO("yolov8n-pose.pt")
    if torch.cuda.is_available() and device != "cpu":
        pose_model.to(0)
        try: pose_model.model.half()
        except: pass

    cap = cv2.VideoCapture(str(video_path))
    frames=[]; fidx=0
    while cap.isOpened():
        ok, frame = cap.read()
        if not ok: break
        if fidx % every_n == 0:
            frames.append(_pre(frame, imgsz))
        fidx += 1
    cap.release()

    if not frames:
        return np.zeros((0,34), dtype=np.float32)

    seq=[]
    for i in range(0, len(frames), batch_size):
        batch = frames[i:i+batch_size]
        results = pose_model.predict(
            batch, imgsz=imgsz, device=0 if (torch.cuda.is_available() and device!="cpu") else device,
            conf=conf, max_det=1, verbose=False
        )
        for r in results:
            if not hasattr(r,"keypoints") or r.keypoints is None or len(r.keypoints)==0:
                continue
            xy = r.keypoints.xy[0].float().cpu().numpy()
            origin = (xy[LEFT_HIP] + xy[RIGHT_HIP]) / 2.0
            shoulder_mid = (xy[LEFT_SHO] + xy[RIGHT_SHO]) / 2.0
            scale = np.linalg.norm(shoulder_mid - origin) or 1.0
            xy_norm = (xy - origin) / scale
            seq.append(xy_norm.reshape(-1))
    if not seq:
        return np.zeros((0,34), dtype=np.float32)
    return np.asarray(seq, dtype=np.float32)

# ----------------- Angles + feedback -------------------
LHIP,RHIP,LKNEE,RKNEE,LANK,RANK = 11,12,13,14,15,16
LSHO,RSHO,LELB,RELB,LWR,RWR     = 5,6,7,8,9,10

def angle_deg(a, b, c):
    ba = a - b; bc = c - b
    den = (np.linalg.norm(ba)*np.linalg.norm(bc))
    if den == 0: return 180.0
    cosv = np.clip(np.dot(ba,bc)/den, -1.0, 1.0)
    return float(np.degrees(np.arccos(cosv)))

def angles_from_seq(seq_xy: np.ndarray):
    stats = {"knee_r":[], "knee_l":[], "elbow_r":[], "elbow_l":[]}
    for t in range(len(seq_xy)):
        xy = seq_xy[t].reshape(17,2)
        stats["knee_r"].append(angle_deg(xy[RHIP], xy[RKNEE], xy[RANK]))
        stats["knee_l"].append(angle_deg(xy[LHIP], xy[LKNEE], xy[LANK]))
        stats["elbow_r"].append(angle_deg(xy[RSHO], xy[RELB], xy[RWR]))
        stats["elbow_l"].append(angle_deg(xy[LSHO], xy[LELB], xy[LWR]))
    return {k: {
        "min": float(np.min(v)), "max": float(np.max(v)),
        "p10": float(np.percentile(v,10)), "p90": float(np.percentile(v,90))
    } for k,v in stats.items()}

def feedback_from_stats(label: str, stats: dict):
    L = (label or "").lower()
    fb=[]
    # Squat / hinge-ish cues
    if any(s in L for s in ["squat","leg extension","hip thrust"]):
        knee_min = min(stats["knee_l"]["min"], stats["knee_r"]["min"])
        if knee_min > 100: fb.append("Depth shallow — aim near ~90° knee flexion.")
        elif knee_min < 60: fb.append("Very deep — ensure control and comfort.")
        else: fb.append("Good depth.")
    if any(s in L for s in ["deadlift","romanian"]):
        knee_min = min(stats["knee_l"]["min"], stats["knee_r"]["min"])
        if knee_min < 70: fb.append("Knees bending a lot — hinge more from hips.")
    # Push/press elbow flare
    if any(s in L for s in ["push","press","bench"]):
        emax = max(stats["elbow_l"]["p90"], stats["elbow_r"]["p90"])
        if emax > 130: fb.append("Elbows flaring — target ~45° to protect shoulders.")
        else: fb.append("Elbow tracking looks reasonable.")
    # Curl ROM
    if "curl" in L:
        emin = min(stats["elbow_l"]["min"], stats["elbow_r"]["min"])
        if emin > 80: fb.append("Limited elbow flexion — curl through fuller ROM.")
    if "plank" in L:
        fb.append("Keep straight line head→heels; avoid hip sag.")
    if not fb:
        fb.append("Form looks okay; review depth and control.")
    return fb

# -------------- Sliding-window (no TTA) ----------------
@torch.no_grad()
def logits_on_seq(model, seq, device):
    x = torch.tensor(seq, dtype=torch.float32, device=device).unsqueeze(0)
    return model(x).squeeze(0).cpu().numpy()

def vote_over_windows(model, seq_full, device, seq_len=90, hop=30):
    T = len(seq_full)
    if T <= seq_len:
        return logits_on_seq(model, pad_or_center_trim(seq_full, seq_len), device)
    logits_sum = None; count=0
    for start in range(0, max(1, T - seq_len + 1), hop):
        chunk = seq_full[start:start+seq_len]
        if len(chunk) < seq_len:
            chunk = pad_or_center_trim(chunk, seq_len)
        L = logits_on_seq(model, chunk, device)
        logits_sum = L if logits_sum is None else (logits_sum + L)
        count += 1
    return logits_sum / max(count,1)

# ---------------------- Labels helper -------------------
def load_labels(cache_dir: Path, fallback_n=None):
    labels_json = Path(cache_dir)/"labels.json"
    if labels_json.exists():
        meta = json.loads(Path(labels_json).read_text())
        labs = meta.get("labels", [])
        if labs: return labs
    # fallback generate
    if fallback_n is not None:
        return [f"class_{i:02d}" for i in range(fallback_n)]
    # infer from file prefixes if present
    ids=[]
    for f in Path(cache_dir).glob("*.npy"):
        m = re.match(r"^(\d+?)__.+?\.npy$", f.name)
        if m: ids.append(int(m.group(1)))
    return [f"class_{i:02d}" for i in range(max(ids)+1)] if ids else []

# --------------------------- Main -----------------------
def main():
    ap = argparse.ArgumentParser("Baseline TCN inference + feedback (no TTA, no focal)")
    ap.add_argument("--checkpoint", type=Path, required=True, help="baseline TCN .pt or raw state_dict")
    ap.add_argument("--cache_dir", type=Path, required=True, help="dir with labels.json from training")
    ap.add_argument("--npy", type=Path, default=None, help="pre-extracted pose sequence (.npy)")
    ap.add_argument("--video", type=Path, default=None, help="optional video path (requires ultralytics+opencv)")
    ap.add_argument("--seq_len", type=int, default=90)
    ap.add_argument("--hop", type=int, default=30)
    ap.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu")

    ap.add_argument("--every_n", type=int, default=3)
    ap.add_argument("--imgsz", type=int, default=448)
    ap.add_argument("--conf", type=float, default=0.25)
    ap.add_argument("--batch_size", type=int, default=32)
    args = ap.parse_args()

    print("[infer] args:", vars(args))

    # Load model
    obj = torch.load(args.checkpoint, map_location=args.device)
    if isinstance(obj, dict) and "state_dict" in obj and "hparams" in obj:
        hp = obj["hparams"]
        channels = tuple(hp.get("channels", (64,128,256)))
        kernel   = int(hp.get("kernel", 7))
        num_classes = int(hp.get("num_classes", 22))
        labels = obj.get("labels") or load_labels(args.cache_dir, fallback_n=num_classes)
        model = TCNClassifier(feats=34, channels=channels, k=kernel, dropout=0.0, num_classes=len(labels)).to(args.device)
        model.load_state_dict(obj["state_dict"], strict=True)
    else:
        # raw state_dict: use baseline defaults (adjust if your training used different ones)
        labels = load_labels(args.cache_dir)
        model = TCNClassifier(feats=34, channels=(64,128,256), k=7, dropout=0.0, num_classes=len(labels)).to(args.device)
        model.load_state_dict(obj, strict=False)
        print("[warn] Loaded raw state_dict with default channels=(64,128,256), kernel=7; ensure these match training.")
    model.eval()

    # Get sequence
    if args.npy:
        if not args.npy.exists():
            raise SystemExit(f"npy not found: {args.npy}")
        seq_full = np.load(args.npy)
    elif args.video:
        if not args.video.exists():
            raise SystemExit(f"video not found: {args.video}")
        print("[info] extracting pose from video (YOLOv8n-pose)...")
        seq_full = extract_pose_sequence_yolo_batched(
            args.video, every_n=args.every_n, batch_size=args.batch_size,
            imgsz=args.imgsz, conf=args.conf, device=args.device
        )
    else:
        raise SystemExit("Provide either --npy (recommended) or --video.")

    if len(seq_full)==0:
        raise SystemExit("Empty sequence. Check video visibility/pose extraction or npy path.")

    # Predict (sliding-window vote; no TTA)
    logits = vote_over_windows(model, seq_full, args.device, seq_len=args.seq_len, hop=args.hop)
    probs = torch.softmax(torch.tensor(logits), dim=-1).numpy()
    pred_id = int(np.argmax(probs))
    pred_label = labels[pred_id] if 0 <= pred_id < len(labels) else str(pred_id)

    print("\n=== Inference (baseline TCN) ===")
    print(f"Predicted class: {pred_label} (id={pred_id})")
    topk = min(5, len(labels))
    order = np.argsort(-probs)[:topk]
    for i in order:
        print(f"  {labels[i]:22s} p={probs[i]:.3f}")

    # Feedback (angles on center-trimmed window)
    seq = pad_or_center_trim(seq_full, args.seq_len)
    stats = angles_from_seq(seq)
    cues = feedback_from_stats(pred_label, stats)
    print("\n=== Angle stats ===")
    print(json.dumps(stats, indent=2))
    print("\n=== Feedback ===")
    for c in cues: print(" -", c)

if __name__ == "__main__":
    main()

Writing infer_baseline.py


In [None]:
import os
print("Model exists:", os.path.exists("/content/best_tcn.pt"))
print("Cache exists:", os.path.exists("/content/pose_seq_yolo"))
print("Numpy files:", len([p for p in os.listdir('/content/pose_seq_yolo') if p.endswith('.npy')]))

!python -u infer_baseline.py \
  --checkpoint /content/best_tcn.pt \
  --cache_dir /content/pose_seq_yolo \
  --video "/content/squat_1.mp4" \
  # --npy "/content/pose_seq_yolo/00__barbell biceps curl_1.npy" \
  --seq_len 90 --hop 30

Model exists: True
Cache exists: True
Numpy files: 630
[infer] args: {'checkpoint': PosixPath('/content/best_tcn.pt'), 'cache_dir': PosixPath('/content/pose_seq_yolo'), 'npy': None, 'video': PosixPath('/content/squat_1.mp4'), 'seq_len': 90, 'hop': 30, 'device': 'cpu', 'every_n': 3, 'imgsz': 448, 'conf': 0.25, 'batch_size': 32}
[info] extracting pose from video (YOLOv8n-pose)...

=== Inference (baseline TCN) ===
Predicted class: squat (id=18)
  squat                  p=0.722
  pull Up                p=0.110
  lateral raise          p=0.035
  lat pulldown           p=0.024
  decline bench press    p=0.018

=== Angle stats ===
{
  "knee_r": {
    "min": 23.464935302734375,
    "max": 179.8863525390625,
    "p10": 63.20109558105469,
    "p90": 179.21744384765626
  },
  "knee_l": {
    "min": 5.136981010437012,
    "max": 179.94764709472656,
    "p10": 39.00425338745117,
    "p90": 178.73249053955078
  },
  "elbow_r": {
    "min": 33.58757019042969,
    "max": 59.58382797241211,
    "p10": 