# Data Engineering by Perynn

In [18]:
# import os, cv2, json
# import numpy as np
# import mediapipe as mp

# mp_pose = mp.solutions.pose

# # Normalise keypoints to ignore height/position
# def normalize_keypoints(pts):
#     left_hip, right_hip = pts[23], pts[24]
#     center = (left_hip + right_hip) / 2
#     pts -= center
#     left_shoulder, right_shoulder = pts[11], pts[12]
#     scale = np.linalg.norm(left_shoulder - right_shoulder) + 1e-6
#     pts /= scale
#     return pts[:, :2]  # only x, y

# # Process a single video and save JSON
# def process_video(video_path, output_path, label):
#     cap = cv2.VideoCapture(video_path)
#     pose = mp_pose.Pose()
#     sequence = []

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break

#         rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#         result = pose.process(rgb)

#         if result.pose_landmarks:
#             pts = np.array([[l.x, l.y, l.z] for l in result.pose_landmarks.landmark])
#             pts_norm = normalize_keypoints(pts)
#             sequence.append(pts_norm.flatten().tolist())

#     cap.release()
#     pose.close()

#     with open(output_path, "w") as f:
#         json.dump({"label": label, "frames": sequence}, f, indent=2)
#     print(f"✅ Saved {output_path} ({len(sequence)} frames)")

# # Main loop — process all videos
# INPUT_DIR = "dataset_videos"
# OUTPUT_DIR = "pose_keypoints"
# os.makedirs(OUTPUT_DIR, exist_ok=True)

# for label in os.listdir(INPUT_DIR):
#     folder = os.path.join(INPUT_DIR, label)
#     if not os.path.isdir(folder): continue

#     for filename in os.listdir(folder):
#         if not filename.endswith(".mp4"): continue

#         video_path = os.path.join(folder, filename)
#         out_path = os.path.join(OUTPUT_DIR, f"{label}_{filename.replace('.mp4','.json')}")
#         process_video(video_path, out_path, label)


✅ Saved pose_keypoints\air_guitar_15153663-uhd_3840_2160_60fps.json (650 frames)
✅ Saved pose_keypoints\air_guitar_5592447-hd_1920_1080_24fps.json (230 frames)
✅ Saved pose_keypoints\air_guitar_6197064-uhd_3840_2160_25fps (1).json (419 frames)
✅ Saved pose_keypoints\air_guitar_6197064-uhd_3840_2160_25fps.json (419 frames)
✅ Saved pose_keypoints\air_guitar_7320716-uhd_3840_2160_25fps.json (167 frames)
✅ Saved pose_keypoints\air_guitar_8513869-uhd_3840_2160_25fps.json (290 frames)
✅ Saved pose_keypoints\air_guitar_9056754-uhd_3840_2160_25fps.json (192 frames)
✅ Saved pose_keypoints\air_guitar_9057178-uhd_3840_2160_25fps.json (266 frames)
✅ Saved pose_keypoints\air_guitar_9057179-uhd_3840_2160_25fps.json (242 frames)
✅ Saved pose_keypoints\air_guitar_9057675-uhd_3840_2160_25fps.json (159 frames)
✅ Saved pose_keypoints\air_guitar_WhatsApp Video 2025-09-29 at 18.09.07_625a26e9.json (130 frames)
✅ Saved pose_keypoints\boxing_punches_4108054-uhd_3840_2160_25fps.json (485 frames)
✅ Saved pose_

In [78]:
import os, cv2, json, math, random
import numpy as np
import mediapipe as mp

# ===================== CONFIG =====================
INPUT_DIR   = "dataset_videos"     # dataset/<label>/*.mp4
OUTPUT_DIR  = "pose_keypoints_aug" # where JSON windows will be written
WINDOW      = 48                   # frames per sample
STRIDE      = 24                   # hop between windows
AUG_PER_WINDOW = 3                 # how many augmented variants to create per window
USE_LABEL_AWARE_NUDGE = True       # helpful for phone_call vs playing_games
RANDOM_SEED = 42
# ==================================================

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

mp_pose = mp.solutions.pose

# ---- Left/Right joint pairs (Mediapipe 33) ----
LR_PAIRS = [
    (11,12), (13,14), (15,16),  # shoulders, elbows, wrists
    (23,24), (25,26), (27,28),  # hips, knees, ankles
    (5,6), (7,8), (9,10),       # eyes, ears, mouth corners (if used)
]
LEFT_EAR, RIGHT_EAR = 7, 8
LEFT_WRIST, RIGHT_WRIST = 15, 16
LEFT_SHOULDER, RIGHT_SHOULDER = 11, 12

# -------------- Normalization helpers --------------
def normalize_keypoints(pts33x3):
    """
    pts33x3: (33,3) in image-normalized coords from Mediapipe (x,y,z in [0..1] approx)
    Center at hip midpoint, scale by shoulder width. Returns (33,2) (x,y only)
    """
    pts = pts33x3.copy()
    left_hip, right_hip = pts[23], pts[24]
    center = (left_hip + right_hip) / 2.0
    pts -= center
    left_sh, right_sh = pts[LEFT_SHOULDER], pts[RIGHT_SHOULDER]
    scale = np.linalg.norm(left_sh[:2] - right_sh[:2]) + 1e-6
    pts[:, :2] /= scale
    return pts[:, :2]  # (33,2)

# -------------- Augmentations --------------
def jitter_pose(seq, sigma=0.01):
    # seq: (T,33,2)
    return seq + np.random.normal(0, sigma, size=seq.shape)

def scale_pose(seq, smin=0.95, smax=1.05):
    s = random.uniform(smin, smax)
    center = seq.mean(axis=1, keepdims=True)  # (T,1,2)
    return (seq - center) * s + center

def rotate_pose(seq, deg_range=(-8, 8)):
    theta = math.radians(random.uniform(*deg_range))
    R = np.array([[math.cos(theta), -math.sin(theta)],
                  [math.sin(theta),  math.cos(theta)]], dtype=np.float32)
    center = seq.mean(axis=1, keepdims=True)
    return (seq - center) @ R.T + center

def drop_joints(seq, p=0.05):
    # randomly set some joints to NaN (occlusion). Your dataloader should handle NaNs (e.g., zero-fill).
    mask = (np.random.rand(*seq.shape[:2]) < p)  # (T,33)
    seq_occl = seq.copy()
    seq_occl[mask] = np.nan
    return seq_occl

def flip_pose(seq):
    # Horizontal flip in normalized space: x -> -x (after our centering/scaling, symmetric around 0)
    seq_f = seq.copy()
    seq_f[...,0] *= -1.0
    # swap left/right joints
    for l, r in LR_PAIRS:
        seq_f[:, [l, r], :] = seq_f[:, [r, l], :]
    return seq_f

def time_warp(seq, min_scale=0.9, max_scale=1.1, target_len=None):
    # resample temporally to simulate speed changes
    T = seq.shape[0]
    scale = random.uniform(min_scale, max_scale)
    new_T = max(8, int(T * scale))
    idx = np.linspace(0, T-1, new_T)
    # linear interpolate per joint & coord
    seq_warp = np.empty((new_T, seq.shape[1], seq.shape[2]), dtype=np.float32)
    for j in range(seq.shape[1]):
        for d in range(seq.shape[2]):
            seq_warp[:, j, d] = np.interp(idx, np.arange(T), seq[:, j, d])
    if target_len:
        seq_warp = time_crop_or_pad(seq_warp, target_len)
    return seq_warp

def time_crop_or_pad(seq, target_len):
    T = seq.shape[0]
    if T > target_len:
        start = np.random.randint(0, T - target_len + 1)
        return seq[start:start+target_len]
    elif T < target_len:
        pad = target_len - T
        left = pad // 2
        right = pad - left
        left_pad = np.repeat(seq[:1], left, axis=0)
        right_pad = np.repeat(seq[-1:], right, axis=0)
        return np.concatenate([left_pad, seq, right_pad], axis=0)
    return seq

# ---- Label-aware nudges to teach discriminative cues ----
def nudge_towards_phone_call(seq, strength=0.04):
    # bring one wrist toward nearest ear each frame
    seq2 = seq.copy()
    ear_side = random.choice([(LEFT_EAR, LEFT_WRIST), (RIGHT_EAR, RIGHT_WRIST)])
    ear_idx, wrist_idx = ear_side
    vec = seq2[:, ear_idx, :] - seq2[:, wrist_idx, :]
    seq2[:, wrist_idx, :] += strength * vec
    return seq2

def nudge_towards_playing_game(seq, strength=0.04):
    # bring both wrists toward torso midline between shoulders
    seq2 = seq.copy()
    mid = 0.5*(seq2[:, LEFT_SHOULDER, :] + seq2[:, RIGHT_SHOULDER, :])  # (T,2)
    for w in (LEFT_WRIST, RIGHT_WRIST):
        vec = mid - seq2[:, w, :]
        seq2[:, w, :] += strength * vec
    return seq2

def apply_random_aug(seq, label=None):
    s = seq.copy()
    # random geometric/noise
    if random.random() < 0.9: s = jitter_pose(s, sigma=0.01)
    if random.random() < 0.5: s = scale_pose(s, 0.95, 1.05)
    if random.random() < 0.5: s = rotate_pose(s, (-8, 8))
    if random.random() < 0.4: s = drop_joints(s, p=0.05)
    if random.random() < 0.5: s = flip_pose(s)
    if random.random() < 0.5: s = time_warp(s, 0.9, 1.1, target_len=s.shape[0])

    # label-aware cue shaping (optional but helpful)
    if USE_LABEL_AWARE_NUDGE and label is not None:
        if label.lower() in ["talking_phone", "talking-phone", "phone_call", "phone-call"]:
            if random.random() < 0.7:
                s = nudge_towards_phone_call(s)
        if label.lower() in ["playing_games", "playing-games", "gaming", "play_game", "play-game"]:
            if random.random() < 0.7:
                s = nudge_towards_playing_game(s)
    return s

# -------------- Core extraction --------------
def extract_sequence_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    pose = mp_pose.Pose()
    frames = []

    while True:
        ok, frame = cap.read()
        if not ok: break
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = pose.process(rgb)
        if res.pose_landmarks:
            pts = np.array([[l.x, l.y, l.z] for l in res.pose_landmarks.landmark], dtype=np.float32) # (33,3)
            pts_norm = normalize_keypoints(pts) # (33,2)
            frames.append(pts_norm)
        else:
            # if no detection, append NaNs to keep timing consistent
            frames.append(np.full((33,2), np.nan, dtype=np.float32))

    cap.release()
    pose.close()
    if len(frames) == 0: return None
    return np.stack(frames, axis=0)  # (T,33,2)

def generate_windows(seq, window=48, stride=24):
    T = seq.shape[0]
    if T < 2: return []
    windows = []
    for start in range(0, max(1, T - window + 1), stride):
        chunk = seq[start:start+window]
        if chunk.shape[0] < window:
            chunk = time_crop_or_pad(chunk, window)
        windows.append(chunk)
    return windows

def save_window_json(window_arr, out_path, label):
    # window_arr: (W,33,2)
    payload = {
        "label": label,
        "frames": window_arr.reshape(window_arr.shape[0], -1).tolist()  # flatten joints per frame
    }
    with open(out_path, "w") as f:
        json.dump(payload, f, indent=2)

# -------------- Main --------------
os.makedirs(OUTPUT_DIR, exist_ok=True)

for label in os.listdir(INPUT_DIR):
    in_label_dir = os.path.join(INPUT_DIR, label)
    if not os.path.isdir(in_label_dir): continue

    out_label_dir = os.path.join(OUTPUT_DIR, label)
    os.makedirs(out_label_dir, exist_ok=True)

    for filename in os.listdir(in_label_dir):
        if not filename.lower().endswith(".mp4"): continue
        video_path = os.path.join(in_label_dir, filename)
        seq = extract_sequence_from_video(video_path)
        if seq is None:
            print(f"⚠️ No frames for {video_path}")
            continue

        windows = generate_windows(seq, WINDOW, STRIDE)
        base = os.path.splitext(filename)[0]

        for i, w in enumerate(windows):
            # Save original window
            out0 = os.path.join(out_label_dir, f"{base}_win{i:03d}_orig.json")
            save_window_json(w, out0, label)

            # Save augmented variants
            for k in range(AUG_PER_WINDOW):
                w_aug = apply_random_aug(w, label=label)
                outk = os.path.join(out_label_dir, f"{base}_win{i:03d}_aug{k+1}.json")
                save_window_json(w_aug, outk, label)

        print(f"✅ {label}/{filename}: {len(windows)} windows x (1+{AUG_PER_WINDOW}) saved -> {out_label_dir}")


In [None]:
# import mediapipe as mp
# mp_pose = mp.solutions.pose
# print("✅ Mediapipe is working!")


# Machine Learning by Yee Jing

In [74]:
# ===============================================
# AI Charades — End-to-End: JSON -> NPY -> Train
# ===============================================

import os, re, json, math, random, pathlib, shutil
from collections import Counter, defaultdict

import numpy as np
np.set_printoptions(suppress=True)

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# -----------------------------
# Config (edit as you like)
# -----------------------------
SRC_DIR    = "pose_keypoints_aug"        # JSONs (possibly in subfolders)
OUT_DIR    = "dataset/sequences"         # where .npy clips will go: OUT_DIR/<label>/*.npy
SPLIT_DIR  = "splits"
MODEL_DIR  = "models"

WINDOW     = 48       # frames per clip
STRIDE     = 24       # hop between windows
TEST_RATIO = 0.15     # 15% test
VAL_RATIO  = 0.15     # 15% val
BATCH_TRAIN= 64
BATCH_EVAL = 128
EPOCHS     = 100
LR         = 1e-3
WD         = 1e-3
SEED       = 42

# Keep labels simple: letters, digits, _, -
LABEL_REGEX = re.compile(r"^[A-Za-z0-9_-]+$")

# If you want to hard-limit to known classes, put them here; leave as None to auto-discover
KNOWN_CLASSES = None  # e.g., ["talking_phone", "playing_games", "air_guitar", "boxing_punches"]

# -----------------------------
# Repro
# -----------------------------
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# -----------------------------
# Utils
# -----------------------------
def ensure_dir(d):
    os.makedirs(d, exist_ok=True)

def slug(s: str) -> str:
    return re.sub(r'[^A-Za-z0-9_-]+', '_', s.strip())

def is_valid_label(s: str) -> bool:
    return bool(s) and bool(LABEL_REGEX.match(s))

def get_label_from_json(path: str, fname: str) -> str:
    """Prefer 'label' inside JSON; else parent folder; else filename-derived."""
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, dict) and "label" in data and data["label"]:
            return str(data["label"])
    except Exception:
        pass

    parent = pathlib.Path(path).parent.name
    if parent and parent != pathlib.Path(SRC_DIR).name:
        return parent

    base = os.path.splitext(fname)[0]
    if "-" in base:
        base = base.split("-")[0]
    parts = base.split("_")
    if parts and parts[-1].isdigit():
        base = "_".join(parts[:-1])
    return base

def load_frames_json(path: str) -> np.ndarray:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    frames = data.get("frames", data)  # supports dict or plain list
    arr = np.array(frames, dtype=np.float32)  # (T, F)
    if arr.ndim != 2:
        raise ValueError(f"{path} expected (T,F), got {arr.shape}")
    arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
    return arr

def time_crop_or_pad(seq: np.ndarray, target_len: int) -> np.ndarray:
    T = len(seq)
    if T > target_len:
        start = np.random.randint(0, T - target_len + 1)
        return seq[start:start+target_len]
    elif T < target_len:
        pad = target_len - T
        left = pad // 2
        right = pad - left
        left_pad = np.repeat(seq[:1], left, axis=0)
        right_pad = np.repeat(seq[-1:], right, axis=0)
        return np.concatenate([left_pad, seq, right_pad], axis=0)
    return seq

# -----------------------------
# 1) Discover JSONs + labels
# -----------------------------
ensure_dir(OUT_DIR)
ensure_dir(SPLIT_DIR)
ensure_dir(MODEL_DIR)

json_paths = []
for root, _, files in os.walk(SRC_DIR):
    for fn in files:
        if fn.lower().endswith(".json"):
            json_paths.append(os.path.join(root, fn))

if not json_paths:
    raise SystemExit(f"❌ No JSON files found under {SRC_DIR}. Check path/structure.")

print(f"Found {len(json_paths)} JSON files. Example:\n  {json_paths[0]}")

raw_labels = [ get_label_from_json(p, os.path.basename(p)) for p in json_paths ]
label_counter = Counter(raw_labels)
print("Raw label counts (before filtering):")
for k,v in sorted(label_counter.items(), key=lambda kv: (-kv[1], kv[0])):
    print(f"  {k!r}: {v}")

# Allowed labels
if KNOWN_CLASSES is not None:
    allowed_labels = sorted([c for c in KNOWN_CLASSES if is_valid_label(c)])
else:
    allowed_labels = sorted({ slug(l) for l in raw_labels if is_valid_label(slug(l)) })

print("\nAllowed classes:", allowed_labels)
if not allowed_labels:
    raise SystemExit("❌ No allowed classes inferred. Set KNOWN_CLASSES or fix your labels.")

# Prepare class folders freshly (optional: clean only unknown/junk)
for d in os.listdir(OUT_DIR):
    p = os.path.join(OUT_DIR, d)
    if os.path.isdir(p) and d not in allowed_labels:
        shutil.rmtree(p)  # remove junk class dirs
        print("Removed junk dir:", p)
for lbl in allowed_labels:
    ensure_dir(os.path.join(OUT_DIR, lbl))

# -----------------------------
# 2) JSON -> fixed-length NPY
# -----------------------------
meta = []      # list of (path, label)
bad_files = 0
per_class_written = Counter()

for src_path in json_paths:
    fname = os.path.basename(src_path)
    raw_lbl = get_label_from_json(src_path, fname)
    lbl = slug(raw_lbl)
    if lbl not in allowed_labels:
        # skip labels with spaces or unknown names like 'WhatsApp Video 2025'
        continue

    try:
        X = load_frames_json(src_path)  # (T, F)
    except Exception as e:
        bad_files += 1
        print(f"⚠️ Failed to load {src_path}: {e}")
        continue

    T = len(X)
    if T == 0:
        print(f"⚠️ Empty frames in {src_path}; skipping.")
        continue

    # Sliding windows
    i = 0
    base = pathlib.Path(fname).stem
    out_cls_dir = os.path.join(OUT_DIR, lbl)

    wrote_any = False
    while i + WINDOW <= T:
        clip = X[i:i+WINDOW]
        out_path = os.path.join(out_cls_dir, f"{base}_t{i:05d}.npy")
        np.save(out_path, clip.astype(np.float32))
        meta.append((out_path, lbl))
        per_class_written[lbl] += 1
        wrote_any = True
        i += STRIDE

    # If the sequence was shorter than WINDOW or remainder didn’t fit, write one padded window
    if not wrote_any:
        clip = time_crop_or_pad(X, WINDOW)  # (WINDOW, F)
        out_path = os.path.join(out_cls_dir, f"{base}_t00000.npy")
        np.save(out_path, clip.astype(np.float32))
        meta.append((out_path, lbl))
        per_class_written[lbl] += 1

print(f"\n✅ NPY windows saved: {len(meta)} (bad json files: {bad_files})")
print("Per-class written counts:")
for k in allowed_labels:
    print(f"  {k}: {per_class_written[k]}")

# -----------------------------
# 3) Train/Val/Test Splits (TAB-delimited)
# -----------------------------
random.shuffle(meta)
n = len(meta)
n_train = int((1.0 - VAL_RATIO - TEST_RATIO) * n)
n_val   = int(VAL_RATIO * n)
splits = {
    "train.txt": meta[:n_train],
    "val.txt":   meta[n_train:n_train+n_val],
    "test.txt":  meta[n_train+n_val:]
}
for name, items in splits.items():
    with open(os.path.join(SPLIT_DIR, name), "w", encoding="utf-8") as f:
        for p,l in items:
            f.write(f"{p}\t{l}\n")
    print(f"{name} -> {len(items)} lines")

# -----------------------------
# 4) Dataset & Loaders
# -----------------------------
class SeqSet(Dataset):
    def __init__(self, list_file):
        self.items = []
        with open(list_file, "r", encoding="utf-8") as f:
            for ln in f:
                ln = ln.rstrip("\n")
                if not ln: continue
                path, label = ln.split("\t")
                self.items.append((path, label))
        if not self.items:
            raise RuntimeError(f"No items in {list_file}.")
        # classes from file
        self.classes = sorted({ l for _, l in self.items })
        self.cls2id  = {c:i for i,c in enumerate(self.classes)}

    def __len__(self): return len(self.items)

    def __getitem__(self, i):
        path, label = self.items[i]
        if not os.path.exists(path):
            raise FileNotFoundError(f"Missing file: {path}")
        x = np.load(path).astype(np.float32)   # (T, F)
        x = torch.from_numpy(x).permute(1, 0)  # -> (F, T)
        y = torch.tensor(self.cls2id[label], dtype=torch.long)
        return x, y

train_txt = os.path.join(SPLIT_DIR, "train.txt")
val_txt   = os.path.join(SPLIT_DIR, "val.txt")
test_txt  = os.path.join(SPLIT_DIR, "test.txt")

train_ds = SeqSet(train_txt)
val_ds   = SeqSet(val_txt)
test_ds  = SeqSet(test_txt)

CLASSES  = train_ds.classes
CLS2ID   = train_ds.cls2id
print("\nCLASSES:", CLASSES)

in_channels = train_ds[0][0].shape[0]   # e.g., 66 (33 joints × 2)
print("in_channels:", in_channels)

# Dataloaders (num_workers=0 is safe on Windows)
train_dl = DataLoader(train_ds, batch_size=BATCH_TRAIN, shuffle=True, num_workers=0, drop_last=False)
val_dl   = DataLoader(val_ds,   batch_size=BATCH_EVAL,  shuffle=False, num_workers=0, drop_last=False)
test_dl  = DataLoader(test_ds,  batch_size=BATCH_EVAL,  shuffle=False, num_workers=0, drop_last=False)


Found 2941 JSON files. Example:
  pose_keypoints_aug\WhatsApp Video 2025-09-29 at 18.09.07_64087d0b.json
Raw label counts (before filtering):
  'talking_phone': 912
  'boxing_punches': 880
  'playing_games': 664
  'air_guitar': 484
  'WhatsApp Video 2025': 1

Allowed classes: ['WhatsApp_Video_2025', 'air_guitar', 'boxing_punches', 'playing_games', 'talking_phone']

✅ NPY windows saved: 2944 (bad json files: 0)
Per-class written counts:
  WhatsApp_Video_2025: 4
  air_guitar: 484
  boxing_punches: 880
  playing_games: 664
  talking_phone: 912
train.txt -> 2060 lines
val.txt -> 441 lines
test.txt -> 443 lines

CLASSES: ['WhatsApp_Video_2025', 'air_guitar', 'boxing_punches', 'playing_games', 'talking_phone']
in_channels: 66


## Train the Model

In [75]:

# -----------------------------
# 5) Model
# -----------------------------
class TCN(nn.Module):
    def __init__(self, in_ch, n_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(in_ch, 128, 5, padding=2),
            nn.ReLU(),
            nn.Conv1d(128, 128, 5, padding=2),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Linear(128, n_classes)
    def forward(self, x):  # x: (B,F,T)
        z = self.net(x).squeeze(-1)
        return self.fc(z)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TCN(in_channels, len(CLASSES)).to(device)

# Class weights for imbalance
counts = Counter(lbl for _, lbl in train_ds.items)
w = torch.tensor([1.0/max(1, counts[c]) for c in CLASSES], dtype=torch.float32)
w = (w / w.mean()).to(device)

crit = nn.CrossEntropyLoss(weight=w)
opt  = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WD)
save_path = os.path.join(MODEL_DIR, "ai_charades_tcn.pt")

# -----------------------------
# 6) Training & Evaluation
# -----------------------------
def evaluate(loader):
    model.eval(); total=0; correct=0; loss_sum=0.0
    with torch.no_grad():
        for x,y in loader:
            x,y = x.to(device), y.to(device)
            logits = model(x)
            loss = crit(logits, y)
            loss_sum += loss.item()*y.size(0)
            total += y.numel()
            correct += (logits.argmax(1)==y).sum().item()
    return (loss_sum/total) if total>0 else 0.0, (correct/total) if total>0 else 0.0

best_val = 0.0
for epoch in range(1, EPOCHS+1):
    model.train(); n=0; loss_sum=0.0; correct=0
    for x,y in train_dl:
        x,y = x.to(device), y.to(device)
        opt.zero_grad()
        logits = model(x)
        loss = crit(logits, y)
        loss.backward(); opt.step()
        n += y.size(0)
        loss_sum += loss.item()*y.size(0)
        correct += (logits.argmax(1)==y).sum().item()
    tr_loss = loss_sum / max(1,n)
    tr_acc  = correct / max(1,n)

    val_loss, val_acc = evaluate(val_dl)
    print(f"epoch {epoch:02d} | train {tr_acc:.2%} loss {tr_loss:.3f} | val {val_acc:.2%} loss {val_loss:.3f}")

    if val_acc > best_val:
        best_val = val_acc
        torch.save({"state_dict": model.state_dict(),
                    "classes": CLASSES,
                    "in_channels": in_channels}, save_path)
        print("  ↳ saved best:", save_path)

# Load best and evaluate on test
ckpt = torch.load(save_path, map_location=device)
model.load_state_dict(ckpt["state_dict"])
print("\nBEST VAL ACC:", best_val)

test_loss, test_acc = evaluate(test_dl)
print("TEST ACC    :", f"{test_acc:.2%}", "| loss", f"{test_loss:.3f}")

# Confusion matrix (quick)
def confusion(loader):
    cm = np.zeros((len(CLASSES), len(CLASSES)), dtype=int)
    model.eval()
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device)
            pred = model(x).argmax(1).cpu().numpy()
            y = y.numpy()
            for t,p in zip(y,pred):
                cm[t,p] += 1
    return cm

cm = confusion(test_dl)
print("\nConfusion Matrix (rows=true, cols=pred):")
print("Classes:", CLASSES)
print(cm)


epoch 01 | train 51.12% loss 1.171 | val 64.17% loss 0.929
  ↳ saved best: models\ai_charades_tcn.pt
epoch 02 | train 69.08% loss 0.911 | val 70.07% loss 0.618
  ↳ saved best: models\ai_charades_tcn.pt
epoch 03 | train 75.19% loss 0.678 | val 72.56% loss 0.501
  ↳ saved best: models\ai_charades_tcn.pt
epoch 04 | train 75.49% loss 0.614 | val 83.67% loss 0.405
  ↳ saved best: models\ai_charades_tcn.pt
epoch 05 | train 83.20% loss 0.471 | val 86.39% loss 0.324
  ↳ saved best: models\ai_charades_tcn.pt
epoch 06 | train 86.80% loss 0.363 | val 87.76% loss 0.257
  ↳ saved best: models\ai_charades_tcn.pt
epoch 07 | train 88.54% loss 0.313 | val 88.89% loss 0.233
  ↳ saved best: models\ai_charades_tcn.pt
epoch 08 | train 91.36% loss 0.252 | val 88.44% loss 0.242
epoch 09 | train 91.12% loss 0.238 | val 93.20% loss 0.149
  ↳ saved best: models\ai_charades_tcn.pt
epoch 10 | train 92.48% loss 0.201 | val 93.88% loss 0.164
  ↳ saved best: models\ai_charades_tcn.pt
epoch 11 | train 92.52% loss 0.1

Temporal Convolutional Network (TCN) style classifier — essentially a 1D CNN over time for action recognition.

Family: 1D CNN / Temporal Convolutional Network (shallow).

Purpose: sequence classification (pose-based action recognition).

Why it works: Convolutions learn local motion patterns, pooling summarizes the sequence, and FC layer maps to labels.

## Test on my own video

In [77]:
# run_infer_own_video_tcn.py
import os, cv2, json, glob
import numpy as np
import torch
import torch.nn as nn
import mediapipe as mp

# ===================== CONFIG =====================
INPUT_VID_DIR   = "yj_own_vid"         # folder with your test videos
INFER_JSON_DIR  = "infer_pose_json"    # keep inference JSONs separate
WINDOWS_ROOT    = "infer_windows"      # windows per-video subfolders

CKPT_PATH       = os.path.join("models", "ai_charades_tcn.pt")  # <-- baseline TCN checkpoint

WINDOW, STRIDE  = 48, 24
USE_NORMALIZE   = True   # True if training used hip-center + shoulder-width scale
PRINT_WINDOW_PRED = False

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ==================================================

mp_pose = mp.solutions.pose

# ----------------- Pose Extraction ----------------
def normalize_keypoints(pts33x3):
    """
    pts: (33,3) with x,y in [0,1] from MediaPipe.
    Center at hip midpoint, scale by shoulder width; return (33,2).
    """
    xy = pts33x3[:, :2].astype(np.float32)
    center = (xy[23] + xy[24]) / 2.0
    xy = xy - center
    scale = np.linalg.norm(xy[11] - xy[12]) + 1e-6
    xy = xy / scale
    return xy

def extract_pose_to_json(video_path, out_json_path):
    os.makedirs(os.path.dirname(out_json_path), exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1,
                        enable_segmentation=False,
                        min_detection_confidence=0.5,
                        min_tracking_confidence=0.5)
    sequence = []
    last_xy = np.zeros((33,2), dtype=np.float32)

    while True:
        ret, frame = cap.read()
        if not ret: break
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = pose.process(rgb)

        if res.pose_landmarks and len(res.pose_landmarks.landmark) == 33:
            pts = np.array([[l.x, l.y, getattr(l, "z", 0.0)]
                            for l in res.pose_landmarks.landmark], dtype=np.float32)
            xy = normalize_keypoints(pts) if USE_NORMALIZE else pts[:, :2]
            last_xy = xy.astype(np.float32)

        # keep timing continuity (repeat last when not detected)
        sequence.append(last_xy.flatten().tolist())

    cap.release(); pose.close()

    arr = np.nan_to_num(np.array(sequence, dtype=np.float32), nan=0.0, posinf=0.0, neginf=0.0)
    with open(out_json_path, "w") as f:
        json.dump({"label": None, "frames": arr.tolist()}, f)
    print(f"✅ Pose saved: {out_json_path} ({len(arr)} frames)")

# ----------------- Windowing ----------------------
def make_windows_for_json(in_json_path, out_dir, window=WINDOW, stride=STRIDE):
    os.makedirs(out_dir, exist_ok=True)
    with open(in_json_path, "r") as f:
        data = json.load(f)
    frames = data.get("frames", data)
    X = np.array(frames, dtype=np.float32)  # (T, F=66)
    T = len(X)
    if T == 0:
        print(f"⚠️  No frames in {in_json_path}")
        return []

    if T < window:
        pad = np.repeat(X[-1][None,:], window-T, axis=0)
        X = np.concatenate([X, pad], axis=0)
        T = len(X)

    paths = []
    i = 0
    base = os.path.splitext(os.path.basename(in_json_path))[0]
    while i + window <= T:
        clip = X[i:i+window]
        out_path = os.path.join(out_dir, f"{base}_t{i:05d}.npy")
        np.save(out_path, clip.astype(np.float32))
        paths.append(out_path)
        i += stride
    print(f"🧩 Windows: {len(paths)} -> {out_dir}")
    return paths

# ----------------- Your Baseline TCN ----------------
class TCN(nn.Module):
    def __init__(self, in_ch, n_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(in_ch, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Conv1d(128, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Linear(128, n_classes)
    def forward(self, x):  # x: (B,F,T)
        z = self.net(x).squeeze(-1)
        return self.fc(z)

def load_model_and_classes(ckpt_path=CKPT_PATH):
    ckpt = torch.load(ckpt_path, map_location=DEVICE)
    classes = ckpt["classes"]
    in_ch = int(ckpt.get("in_channels", 66))  # your baseline saved this
    model = TCN(in_ch, len(classes)).to(DEVICE)
    model.load_state_dict(ckpt["state_dict"])
    model.eval()
    return model, classes, in_ch

# ----------------- Prediction ---------------------
def load_clip_tensor(npy_path):
    # Baseline: NO velocity; just (F,T)
    x = torch.from_numpy(np.load(npy_path).astype(np.float32)).permute(1,0)  # (F,T)
    return x.unsqueeze(0).to(DEVICE)  # (1,F,T)

def predict_video_from_windows(win_dir, model, classes):
    npys = sorted(glob.glob(os.path.join(win_dir, "*.npy")))
    if not npys:
        return None, None, None
    probs_sum = torch.zeros(len(classes), device=DEVICE)
    per_window = []
    with torch.no_grad():
        for p in npys:
            x = load_clip_tensor(p)
            logits = model(x)
            probs = torch.softmax(logits, dim=1).squeeze(0)  # (C,)
            probs_sum += probs
            if PRINT_WINDOW_PRED:
                wid = os.path.basename(p)
                top_prob, top_idx = torch.max(probs, dim=0)
                per_window.append((wid, classes[int(top_idx)], float(top_prob)))
    probs_mean = probs_sum / len(npys)
    topk_prob, topk_idx = torch.topk(probs_mean, k=min(3, len(classes)))
    topk = [(classes[int(i)], float(p)) for p,i in zip(topk_prob.tolist(), topk_idx.tolist())]
    pred_label, pred_conf = topk[0]
    return (pred_label, pred_conf, topk), per_window

# ===================== MAIN =======================
if __name__ == "__main__":
    os.makedirs(INFER_JSON_DIR, exist_ok=True)
    os.makedirs(WINDOWS_ROOT, exist_ok=True)

    # 0) Load model + classes
    model, CLASSES, in_ch = load_model_and_classes(CKPT_PATH)
    print("Classes:", CLASSES)
    print("Checkpoint in_channels:", in_ch)

    # 1) Extract pose JSONs for each video in yj_own_vid/
    videos = [f for f in os.listdir(INPUT_VID_DIR)
              if f.lower().endswith((".mp4", ".mov", ".mkv", ".avi"))]
    if not videos:
        print(f"❌ No videos found in {INPUT_VID_DIR}")
        raise SystemExit

    json_paths = []
    for vid in videos:
        in_path  = os.path.join(INPUT_VID_DIR, vid)
        out_json = os.path.join(INFER_JSON_DIR, os.path.splitext(vid)[0] + ".json")
        extract_pose_to_json(in_path, out_json)
        json_paths.append(out_json)

    # 2) Make windows per video
    win_dirs = []
    for jp in json_paths:
        base = os.path.splitext(os.path.basename(jp))[0]
        out_dir = os.path.join(WINDOWS_ROOT, base)
        make_windows_for_json(jp, out_dir, WINDOW, STRIDE)
        win_dirs.append(out_dir)

    # 3) Predict per video (baseline: no velocity)
    for vd, jp in zip(win_dirs, json_paths):
        (label, conf, top3), per_window = predict_video_from_windows(vd, model, CLASSES)
        video_name = os.path.splitext(os.path.basename(jp))[0]
        if label is None:
            print(f"{video_name}: (no windows)")
        else:
            print(f"🎬 {video_name}  →  {label}  (conf {conf:.2f})  | top3 = {top3}")
            if PRINT_WINDOW_PRED and per_window:
                print("  Per-window top1:")
                for wid, lab, p in per_window:
                    print(f"   - {wid}: {lab} ({p:.2f})")


Classes: ['WhatsApp_Video_2025', 'air_guitar', 'boxing_punches', 'playing_games', 'talking_phone']
Checkpoint in_channels: 66
✅ Pose saved: infer_pose_json\WhatsApp Video 2025-09-29 at 18.12.54_a2cbd305.json (299 frames)
🧩 Windows: 11 -> infer_windows\WhatsApp Video 2025-09-29 at 18.12.54_a2cbd305
🎬 WhatsApp Video 2025-09-29 at 18.12.54_a2cbd305  →  talking_phone  (conf 1.00)  | top3 = [('talking_phone', 0.9988024234771729), ('playing_games', 0.0010144879342988133), ('air_guitar', 0.00015900166181381792)]
