In [1]:
"""
Penn‑Action → fourM preprocessing helpers
========================================
Put this cell at the top of your notebook; execute it once.  It defines every
function you need **but does nothing on disk** until you call `run(...)` from
Cell 2.
"""

from pathlib import Path
import json
from typing import List, Optional

import numpy as np
import scipy.io
import torch
import torchvision.transforms.functional as TF
from huggingface_hub import snapshot_download
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
from transformers import BlipForConditionalGeneration, BlipProcessor

# ------------------------ constants ------------------------
TARGET_RES = 256          # shorter side after resize for Cosmos encoder
SSIM_THRESHOLD = 0.985    # duplicate‑frame threshold (SSIM > thr ⇒ skip)
MAX_FRAMES = 6            # 1 reference + 5 distinct successors
COORD_QLEVELS = 8192      # discrete bins for x & y

# ------------------------ similarity ------------------------

def is_similar(img1: Image.Image, img2: Image.Image, thr: float = SSIM_THRESHOLD) -> bool:
    """Return True if two RGB PIL images are perceptually similar."""
    a = np.asarray(img1.convert("L"), dtype=np.float32)
    b = np.asarray(img2.convert("L"), dtype=np.float32)
    score, _ = ssim(a, b, full=True)
    return score > thr

# ------------------------ tokenisers ------------------------

def build_tokenisers(device: str):
    """Download (once) and load Cosmos + BLIP models."""
    ckpt_dir = Path("/tmp/cosmos_DI16x16")
    if not ckpt_dir.exists():
        snapshot_download("nvidia/Cosmos-0.1-Tokenizer-DI16x16", local_dir=str(ckpt_dir))
    from cosmos_tokenizer.image_lib import ImageTokenizer  # deferred import

    image_tok = ImageTokenizer(
        checkpoint_enc=str(ckpt_dir / "encoder.jit"),
        checkpoint_dec=str(ckpt_dir / "decoder.jit"),
    ).to(device)
    image_tok.eval()

    proc = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    blip = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
    blip.eval()

    return image_tok, proc, blip

# ------------------------ helpers ------------------------

def encode_rgb(img: Image.Image, image_tok, device: str) -> np.ndarray:
    ten = TF.to_tensor(img).unsqueeze(0).to(device) * 2 - 1
    with torch.no_grad():
        tok, _ = image_tok.encode(ten)
    return tok.squeeze(0).cpu().short().numpy()


def caption_image(img: Image.Image, proc, blip, device: str) -> str:
    with torch.no_grad():
        inputs = proc(images=img, return_tensors="pt").to(device)
        out = blip.generate(**inputs, max_length=30)
    return proc.decode(out[0], skip_special_tokens=True)


def quantise_coords(x: np.ndarray, y: np.ndarray, vis: np.ndarray) -> np.ndarray:
    """Return (3,13) uint16 array of x, y, visibility tokens."""
    q = COORD_QLEVELS - 1
    x_tok = np.clip(np.rint(x * q), 0, q).astype(np.uint16)
    y_tok = np.clip(np.rint(y * q), 0, q).astype(np.uint16)
    vis_tok = vis.astype(np.uint16)
    return np.stack([x_tok, y_tok, vis_tok])

# ------------------------ core routine ------------------------

def process_video(
    vid: str,
    frames_root: Path,
    labels_root: Path,
    out_root: Path,
    image_tok,
    proc,
    blip,
    device: str,
):
    """Tokenise one Penn‑Action clip."""
    frame_dir = frames_root / vid
    label_path = labels_root / f"{vid}.mat"
    if not frame_dir.exists() or not label_path.exists():
        print(f"⚠️  Skip {vid}: missing data")
        return

    mat = scipy.io.loadmat(label_path, squeeze_me=True, struct_as_record=False)
    x_all, y_all = mat["x"], mat["y"]
    vis_all = mat["visibility"].astype(bool)
    T = int(mat["nframes"])
    H0, W0, _ = mat["dimensions"]
    scale_x, scale_y = TARGET_RES / W0, TARGET_RES / H0

    frame_files: List[Path] = sorted(frame_dir.glob("*.jpg"))
    assert len(frame_files) == T, f"{vid}: frame/label mismatch"

    # select MAX_FRAMES distinct frames
    kept = [0]
    last = Image.open(frame_files[0]).convert("RGB")
    for j in range(1, T):
        if len(kept) == MAX_FRAMES:
            break
        cand = Image.open(frame_files[j]).convert("RGB")
        if not is_similar(last, cand):
            kept.append(j)
            last = cand

    for sub in ("tok_rgb", "tok_rgb_next", "coords", "captions"):
        (out_root / sub / vid).mkdir(parents=True, exist_ok=True)

    for idx in kept:
        stem = f"{idx+1:05d}"
        img = Image.open(frame_files[idx]).convert("RGB")
        img_res = TF.resize(img, TARGET_RES, interpolation=Image.BICUBIC)
        np.save(out_root / "tok_rgb" / vid / f"{stem}.npy", encode_rgb(img_res, image_tok, device))

        # next frame
        if idx + 1 < T:
            nxt = Image.open(frame_files[idx+1]).convert("RGB")
            nxt_res = TF.resize(nxt, TARGET_RES, interpolation=Image.BICUBIC)
            np.save(out_root / "tok_rgb_next" / vid / f"{stem}.npy", encode_rgb(nxt_res, image_tok, device))

        # coords
        x_n = (x_all[idx] * scale_x) / TARGET_RES
        y_n = (y_all[idx] * scale_y) / TARGET_RES
        np.save(out_root / "coords" / vid / f"{stem}.npy", quantise_coords(x_n, y_n, vis_all[idx]))

        # caption
        cap = caption_image(img_res, proc, blip, device)
        with open(out_root / "captions" / vid / f"{stem}.json", "w") as fp:
            json.dump({"video": vid, "frame": stem, "caption": cap}, fp, indent=2)

    print(f"✅ {vid}: {len(kept)} frames → {out_root}")

# ------------------------ convenience wrapper ------------------------

def run(
    frames_root: Path,
    labels_root: Path,
    output_root: Path,
    video: Optional[str] = None,
):
    """High‑level entry point you invoke from Cell 2."""
    assert frames_root.exists(), "frames_root path does not exist"
    assert labels_root.exists(), "labels_root path does not exist"

    device = "cuda" if torch.cuda.is_available() else "cpu"
    image_tok, proc, blip = build_tokenisers(device)

    vids = [video] if video else sorted([d.name for d in frames_root.iterdir() if d.is_dir()])
    for vid in tqdm(vids, desc="videos"):
        process_video(vid, frames_root, labels_root, output_root, image_tok, proc, blip, device)

In [None]:
"""Execute **after** Cell 1.
Edit the three paths and (optionally) `VIDEO_ID`, then run the cell to start
pre‑processing.
"""

# --- user‑editable paths ---------------------------------
FRAMES_ROOT = Path("../COM-304-FM/project/penn_action_raw/Penn_Action/frames")   # ← change me
LABELS_ROOT = Path("../COM-304-FM/project/penn_action_raw/Penn_Action/labels")   # ← change me
OUTPUT_ROOT = Path("../new/output/")                      # ok to leave as‑is
VIDEO_ID    = "0001"  # e.g. "0003" for a single clip
# ---------------------------------------------------------

run(FRAMES_ROOT, LABELS_ROOT, OUTPUT_ROOT, VIDEO_ID)

AssertionError: frames_root path does not exist