In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# Set paths (edit these!)
WEIGHTS_PATH = "/content/drive/MyDrive/slowfast_7class_state_dict.pth"
VIDEO_PATH   = "/content/drive/MyDrive/AnnotationsFinal/val/Drinking/clip_0009_1160s_cow_5.mp4"
OUT_PATH     = "/content/gradcam_overlay_drinking.mp4"

In [14]:
# ==== ONE-CELL: SlowFast Grad-CAM (robust, end-to-end) ====
# Uses your WEIGHTS_PATH, VIDEO_PATH, OUT_PATH defined earlier.

!pip -q install pytorchvideo opencv-python av >/dev/null

import os, cv2, numpy as np, torch, torch.nn as nn, torch.nn.functional as F
from typing import List
from IPython.display import Video, display
from pytorchvideo.models.hub import slowfast_r50

# ---------- sanity checks ----------
assert 'WEIGHTS_PATH' in globals(), "Define WEIGHTS_PATH before running."
assert 'VIDEO_PATH'   in globals(), "Define VIDEO_PATH before running."
assert 'OUT_PATH'     in globals(), "Define OUT_PATH before running."
assert os.path.isfile(WEIGHTS_PATH), f"Missing weights: {WEIGHTS_PATH}"
assert os.path.isfile(VIDEO_PATH),   f"Missing video: {VIDEO_PATH}"

# ---------- config ----------
PATHWAY  = "fast"   # visualize fast pathway (use "slow" for slow pathway)
T_FAST   = 32
SIZE     = 224
ALPHA    = 4
FPS_OUT  = 24
KIN_MEAN = (0.45, 0.45, 0.45)
KIN_STD  = (0.225, 0.225, 0.225)

# ---------- utils ----------
def read_video_opencv(path: str):
    cap = cv2.VideoCapture(path)
    frames = []
    while True:
        ok, bgr = cap.read()
        if not ok: break
        frames.append(cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB))
    cap.release()
    if not frames: raise RuntimeError(f"No frames read from {path}")
    return np.stack(frames, 0)  # [F,H,W,3] uint8

def uniform_temporal_subsample(x: torch.Tensor, num: int):
    T = x.shape[0]
    if T == num: return x
    idxs = torch.linspace(0, T-1, num).round().long().clamp(0, T-1)
    return x.index_select(0, idxs)

def preprocess_clip(frames_rgb: np.ndarray, T_fast=T_FAST, size=SIZE, mean=KIN_MEAN, std=KIN_STD):
    v = torch.from_numpy(frames_rgb).float() / 255.0     # [T,H,W,3]
    v = v.permute(0,3,1,2)                              # [T,3,H,W]
    v = F.interpolate(v, size=(size,size), mode="bilinear", align_corners=False)
    v = uniform_temporal_subsample(v, T_fast)           # [T,3,H,W]
    for c in range(3): v[:,c] = (v[:,c]-mean[c])/std[c]
    v = v.permute(1,0,2,3).unsqueeze(0)                 # [1,3,T,H,W]
    return v

def pack_slowfast(fast_clip: torch.Tensor, alpha=ALPHA):
    slow = F.interpolate(
        fast_clip, scale_factor=(1.0/alpha,1.0,1.0),
        mode="trilinear", align_corners=False, recompute_scale_factor=True
    )
    return [slow, fast_clip]

def denorm(frames: torch.Tensor, mean=KIN_MEAN, std=KIN_STD):
    v = frames.clone().cpu()
    for c in range(3): v[c] = v[c]*std[c] + mean[c]
    return (v.clamp(0,1)*255).byte().permute(1,2,3,0).numpy()  # [T,H,W,3]

def write_video_rgb(path: str, frames_rgb: np.ndarray, fps=FPS_OUT):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    T,H,W,_ = frames_rgb.shape
    vw = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W,H))
    for t in range(T):
        vw.write(cv2.cvtColor(frames_rgb[t], cv2.COLOR_RGB2BGR))
    vw.release()

# robust overlay that auto-aligns CAM to frames
def _align_cam_to_frames(cam: torch.Tensor, frames_rgb: np.ndarray) -> torch.Tensor:
    """
    cam: [T_cam, Hc, Wc] (0..1 tensor)
    frames_rgb: [T_f, Hf, Wf, 3] uint8
    returns cam_aligned: [T_f, Hf, Wf] (0..1)
    """
    T_f, H_f, W_f, _ = frames_rgb.shape
    T_c, H_c, W_c = cam.shape
    cam5 = cam[None, None].float()  # [1,1,T,H,W]
    cam5 = F.interpolate(cam5, size=(T_f, H_f, W_f), mode="trilinear", align_corners=False)
    cam_aligned = cam5[0,0]
    cam_aligned = (cam_aligned - cam_aligned.min()) / (cam_aligned.max() + 1e-6)
    return cam_aligned

def overlay_cam(frames_rgb: np.ndarray, cam: torch.Tensor, alpha: float = 0.45):
    """
    frames_rgb: [T,H,W,3] uint8
    cam: [T,H,W] (0..1) torch tensor, any size (auto-aligned)
    """
    cam_aligned = _align_cam_to_frames(cam, frames_rgb)
    T,H,W,_ = frames_rgb.shape
    out = []
    for t in range(T):
        heat = (cam_aligned[t].cpu().numpy() * 255).astype(np.uint8)   # [H,W]
        heat = cv2.applyColorMap(heat, cv2.COLORMAP_JET)               # [H,W,3] BGR
        heat = cv2.cvtColor(heat, cv2.COLOR_BGR2RGB)                   # [H,W,3] RGB
        frame = frames_rgb[t].astype(np.uint8)
        out.append(cv2.addWeighted(frame, 1.0, heat, alpha, 0))
    return np.stack(out, 0)

def find_module_by_name(model: nn.Module, module_path: str):
    m = model
    for attr in module_path.split("."):
        m = m[int(attr)] if attr.isdigit() else getattr(m, attr)
    return m

def set_module_by_name(model: nn.Module, module_path: str, new_module: nn.Module):
    parts = module_path.split(".")
    parent = model
    for p in parts[:-1]:
        parent = parent[int(p)] if p.isdigit() else getattr(parent, p)
    last = parts[-1]
    if last.isdigit(): parent[last] = new_module
    else: setattr(parent, last, new_module)

# robust conv layer picker (tries names; if none, picks a Conv3d that actually fires)
def pick_best_conv3d_layer(model: nn.Module, pathway_idx: int, inputs: List[torch.Tensor]) -> str:
    names = [n for n,m in model.named_modules() if isinstance(m, nn.Conv3d)]
    s1 = [n for n in names if "multipathway" in n and f".{pathway_idx}." in n and "layer4" in n and n.endswith("conv3")]
    if s1: return sorted(s1)[-1]
    s2 = [n for n in names if "multipathway" in n and f".{pathway_idx}." in n and "layer4" in n]
    if s2: return sorted(s2)[-1]
    s3 = [n for n in names if "multipathway" in n and f".{pathway_idx}." in n]
    if s3: return sorted(s3)[-1]
    fired = []
    hooks = []
    def make_hook(name):
        def _h(m,i,o): fired.append(name)
        return _h
    for n, m in model.named_modules():
        if isinstance(m, nn.Conv3d):
            hooks.append(m.register_forward_hook(make_hook(n)))
    with torch.no_grad():
        _ = model([t.to(next(model.parameters()).device) for t in inputs])
    for h in hooks: h.remove()
    if fired: return fired[-1]
    if names: return names[-1]
    raise RuntimeError("No Conv3d layers found.")

# fixed Grad-CAM (einsum; outputs [T,H,W])
class SlowFastGradCAM:
    def __init__(self, model, target_layer_path: str, pathway: str="fast", device=None):
        assert pathway in ("slow","fast")
        self.model = model.eval()
        self.device = device or next(model.parameters()).device
        self.pathway = pathway
        self.target_layer = find_module_by_name(model, target_layer_path)
        self._acts = None; self._grads = None
        def fwd_hook(m,i,o): self._acts = o.detach()       # [B,C,T,H,W]
        def bwd_hook(m,gi,go): self._grads = go[0].detach() # [B,C,T,H,W]
        self._handles = [
            self.target_layer.register_forward_hook(fwd_hook),
            self.target_layer.register_full_backward_hook(bwd_hook),
        ]
    @torch.no_grad()
    def _resize_cam(self, cam_thw: torch.Tensor, T_out: int, H_out: int, W_out: int) -> torch.Tensor:
        cam5 = cam_thw[None, None]  # [1,1,T,H,W]
        cam5 = F.interpolate(cam5, size=(T_out, H_out, W_out), mode="trilinear", align_corners=False)
        cam = cam5.squeeze(0).squeeze(0)
        cam = cam.clamp(min=0)
        cam = (cam - cam.min()) / (cam.max() + 1e-6)
        return cam
    def __call__(self, inputs, target_class: int=None):
        inputs = [x.to(self.device) for x in inputs]
        logits = self.model(inputs)
        if logits.ndim == 1: logits = logits.unsqueeze(0)
        if target_class is None:
            target_class = int(torch.softmax(logits, dim=1).argmax(dim=1)[0].item())
        self.model.zero_grad(set_to_none=True)
        one_hot = torch.zeros_like(logits); one_hot[0, target_class] = 1.0
        logits.backward(gradient=one_hot, retain_graph=True)
        acts  = self._acts[0]   # [C,T,H,W]
        grads = self._grads[0]  # [C,T,H,W]
        weights = grads.mean(dim=(1,2,3))                      # [C]
        cam_thw = torch.einsum("c,cthw->thw", weights, acts)   # [T,H,W]
        cam_thw = F.relu(cam_thw)
        idx = 1 if self.pathway == "fast" else 0
        _, _, T_out, H_out, W_out = inputs[idx].shape
        cam = self._resize_cam(cam_thw, T_out, H_out, W_out)
        return cam.cpu(), logits.detach().cpu()[0]

# ---------- build class list from VIDEO_PATH's parent folder ----------
video_dir = os.path.dirname(VIDEO_PATH)                   # .../<ClassName>/
classes_root = os.path.dirname(video_dir)                 # .../<ClassesRoot>/
classes = sorted([d for d in os.listdir(classes_root) if os.path.isdir(os.path.join(classes_root, d))])
class_to_idx = {c:i for i,c in enumerate(classes)}
NUM_CLASSES = len(classes)
target_class_name = os.path.basename(video_dir)
target_idx = class_to_idx.get(target_class_name, None)
print("[INFO] classes:", classes)
print(f"[INFO] target class: {target_class_name} -> idx {target_idx} | NUM_CLASSES={NUM_CLASSES}")

# ---------- build model (no head_act), replace head, load weights ----------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = slowfast_r50(pretrained=False)   # compatible with your PyTorchVideo

# find last Linear (classification head) and replace with NUM_CLASSES
head_name, head_mod = None, None
for n, m in model.named_modules():
    if isinstance(m, nn.Linear):
        head_name, head_mod = n, m
assert head_mod is not None, "Could not locate classification head."
new_head = nn.Linear(head_mod.in_features, NUM_CLASSES, bias=True)

# setter for dotted path
def set_by_name(model, dotted, newm):
    parts = dotted.split(".")
    parent = model
    for p in parts[:-1]:
        parent = parent[int(p)] if p.isdigit() else getattr(parent, p)
    last = parts[-1]
    if last.isdigit(): parent[int(last)] = newm
    else: setattr(parent, last, newm)

set_by_name(model, head_name, new_head)
model = model.to(device).eval()

# load weights (.pth/.pt; supports common wrappers)
ckpt = torch.load(WEIGHTS_PATH, map_location=device)
if isinstance(ckpt, nn.Module):
    ckpt = ckpt.state_dict()
for key in ("state_dict", "model_state_dict"):
    if isinstance(ckpt, dict) and key in ckpt and isinstance(ckpt[key], dict):
        ckpt = ckpt[key]; break
ckpt = { (k.split("module.",1)[1] if k.startswith("module.") else k): v for k,v in ckpt.items() }
missing, unexpected = model.load_state_dict(ckpt, strict=False)
print("[OK] weights loaded (strict=False)")
if missing:   print("  missing (sample):", missing[:6])
if unexpected: print("  unexpected (sample):", unexpected[:6])

# ---------- prep input ----------
raw    = read_video_opencv(VIDEO_PATH)                  # [F,H,W,3] uint8
clip   = preprocess_clip(raw, T_fast=T_FAST, size=SIZE) # [1,3,T,H,W]
inputs = pack_slowfast(clip, alpha=ALPHA)               # [slow, fast]

# ---------- pick a good conv layer & run Grad-CAM ----------
pathway_idx = 1 if PATHWAY=="fast" else 0
layer_path  = pick_best_conv3d_layer(model, pathway_idx, inputs)
print(f"[CAM] using layer: {layer_path}")

cam_engine = SlowFastGradCAM(model, layer_path, pathway=PATHWAY, device=device)
cam, logits = cam_engine(inputs, target_class=target_idx)

# ---------- overlay & save ----------
frames_norm = inputs[1 if PATHWAY=="fast" else 0][0]  # [3,T,H,W]
frames_rgb  = denorm(frames_norm)                     # [T,H,W,3] uint8
overlay     = overlay_cam(frames_rgb, cam, alpha=0.45)
write_video_rgb(OUT_PATH, overlay, fps=FPS_OUT)

print("[DONE] Saved:", OUT_PATH)
display(Video(OUT_PATH, embed=True))


[INFO] classes: ['Drinking', 'Feeding & Lying', 'Feeding & Standing', 'Lying', 'Ruminating & Lying', 'Ruminating & Standing', 'Standing']
[INFO] target class: Drinking -> idx 0 | NUM_CLASSES=7
[OK] weights loaded (strict=False)
[CAM] using layer: blocks.4.multipathway_blocks.1.res_blocks.2.branch2.conv_c
[DONE] Saved: /content/gradcam_overlay_drinking.mp4


In [19]:
# --- Helper to run Grad-CAM on any video path (reuses your model & utils) ---

def gradcam_on_video(video_path, out_path, target_idx=None, repick_layer=False):
    """
    video_path: path to a single video (e.g., .../Drinking/clip.mp4)
    out_path:   where to save the overlay mp4
    target_idx: optional class index to visualize; if None, infers from folder name
    repick_layer: if True, re-picks the best conv layer each call (slower, safer)
    """
    assert os.path.isfile(video_path), f"Video not found: {video_path}"

    # 1) infer class index from folder (if not provided)
    if target_idx is None:
        vdir = os.path.dirname(video_path)
        classes_root = os.path.dirname(vdir)
        classes = sorted([d for d in os.listdir(classes_root) if os.path.isdir(os.path.join(classes_root, d))])
        cname = os.path.basename(vdir)
        c2i = {c:i for i,c in enumerate(classes)}
        target_idx = c2i.get(cname, None)
        print(f"[INFO] target class inferred: {cname} -> {target_idx}")

    # 2) preprocess & pack pathways
    raw    = read_video_opencv(video_path)
    clip   = preprocess_clip(raw, T_fast=T_FAST, size=SIZE)   # [1,3,T,H,W]
    inputs = pack_slowfast(clip, alpha=ALPHA)                 # [slow, fast]

    # 3) pick conv layer (reuse global LAYER_PATH if available, else pick now)
    global LAYER_PATH
    if repick_layer or 'LAYER_PATH' not in globals():
        pathway_idx = 1 if PATHWAY == "fast" else 0
        LAYER_PATH = pick_best_conv3d_layer(model, pathway_idx, inputs)
        print(f"[CAM] using layer: {LAYER_PATH}")
    else:
        print(f"[CAM] using cached layer: {LAYER_PATH}")

    # 4) run Grad-CAM
    cam_engine = SlowFastGradCAM(model, LAYER_PATH, pathway=PATHWAY, device=next(model.parameters()).device)
    cam, logits = cam_engine(inputs, target_class=target_idx)

    # 5) overlay & save
    frames_norm = inputs[1 if PATHWAY=="fast" else 0][0]   # [3,T,H,W]
    frames_rgb  = denorm(frames_norm)                      # [T,H,W,3]
    overlay     = overlay_cam(frames_rgb, cam, alpha=0.45)
    write_video_rgb(out_path, overlay, fps=FPS_OUT)

    # quick print of prediction
    pred_idx = int(torch.argmax(torch.softmax(logits, dim=0)).item())
    print(f"[DONE] Saved: {out_path} | target={target_idx} pred={pred_idx}")
    display(Video(OUT_PATH, embed=True))



In [20]:
NEW_VIDEO_PATH = "/content/drive/MyDrive/AnnotationsFinal/val/Feeding & Standing/001_250319230000_6001_FF_00_59_row14_clip2_Feeding_cow_4.mp4"
NEW_OUT_PATH   = "/content/gradcam_overlay_feeding_standing.mp4"
gradcam_on_video(NEW_VIDEO_PATH, NEW_OUT_PATH)


[INFO] target class inferred: Feeding & Standing -> 2
[CAM] using cached layer: blocks.4.multipathway_blocks.1.res_blocks.2.branch2.conv_c
[DONE] Saved: /content/gradcam_overlay_feeding_standing.mp4 | target=2 pred=4
