# Prequisites

In [6]:
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
import numpy as np
import torch
from ultralytics import YOLO
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

from boxmot import BoostTrack
from boxmot import StrongSort
import torch

from collections import defaultdict
import os
from pathlib import Path

import torchvision
from torchvision.transforms import Compose, Lambda, Normalize, CenterCrop, Resize, ToTensor
from torchvision.transforms._transforms_video import CenterCropVideo, NormalizeVideo
import cv2

# from pytorchvideo.transforms import ShortSideScale

# Detector

In [7]:
detector_model = YOLO("yolo11s.pt")

def object_detection(img):
    lis = []
    results = detector_model.predict(img, verbose=False, device="cuda")
    temp = img.copy()
    for result in results:
        pass
    length = len(result.boxes.data)
    for index in range(length):
        boxes = result.boxes
        xyxy = boxes.xyxy[index].detach().cpu()
        x1, y1, x2, y2 = int(xyxy[0].item()), int(xyxy[1].item()), int(xyxy[2].item()), int(xyxy[3].item())
        # xywh = boxes.xywh[0].detach().cpu()
        # x1, y1, w, h = int(xywh[0].item()), int(xywh[1].item()), int(xywh[2].item()), int(xywh[3].item())
        cls_id = int(result.boxes.cls[index].detach().cpu().item())
        if(cls_id != 0): #Only person detection trying
            continue
        confidence = float(result.boxes.conf[index].detach().cpu().item())
        cv2.rectangle(temp, (x1, y1), (x2, y2), (255,0,0), 2)
        cv2.putText(temp, text="Person", org=(x1, y1-5), color=(255,0,0), fontScale=0.2, fontFace=cv2.FONT_HERSHEY_DUPLEX)
        res = [x1, y1, x2, y2, confidence, cls_id]
        lis.append(res)
    return lis


# Tracker

In [54]:
if torch.cuda.is_available():
    # device = torch.device("cuda")
    device = "cuda"
else:
    device = torch.device("cpu")

#tracker = BoostTrack(reid_weights='osnet_x0_25_msmt17.pt', device=device, half=False)

tracker = StrongSort(reid_weights=Path("osnet_x0_25_market1501.pt"), device="cpu", 
                     half=False, cmc=False) #CMC is motion compensation

def visualize_tracking(tracker_results, img):
    for i in tracker_results:
        x1, y1, x2, y2 = int(i[0]), int(i[1]), int(i[2]), int(i[3])
        track_id = int(i[4])
        track_confidence = float(i[5])
        cv2.rectangle(img, (x1,y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(img, text=str(track_id), org=(x1, y1-5), color=(255,0,0), fontScale=0.2, fontFace=cv2.FONT_HERSHEY_DUPLEX)
    return img 

def save_video_clip(track__buffers, out_path, track_id):
    h, w, _ = track__buffers[track_id][0].shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(out_path, fourcc, 15, (w, h))
    for f in track__buffers[track_id]:
        out.write(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
    out.release()

[32m2025-10-06 00:16:20.269[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m56[0m - [1mBaseTracker initialization parameters:[0m
[32m2025-10-06 00:16:20.269[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m57[0m - [1mdet_thresh: 0.3[0m
[32m2025-10-06 00:16:20.270[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m58[0m - [1mmax_age: 30[0m
[32m2025-10-06 00:16:20.270[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m59[0m - [1mmax_obs: 50[0m
[32m2025-10-06 00:16:20.271[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m60[0m - [1mmin_hits: 3[0m
[32m2025-10-06 00:16:20.271[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__[0m:[36m61[0m - [1miou_threshold: 0.3[0m
[32m2025-10-06 00:16:20.271[0m | [1mINFO    [0m | [36mboxmot.trackers.basetracker[0m:[36m__init__

# Video MAE

In [38]:
device = torch.device("cuda")

processor = VideoMAEImageProcessor.from_pretrained("MCG-NJU/videomae-base-finetuned-kinetics", cache_dir="models")
video_model = VideoMAEForVideoClassification.from_pretrained("MCG-NJU/videomae-base-finetuned-kinetics", cache_dir="models").to(device)

# Helper

In [None]:
transform = Compose([
    Lambda(lambda x: x / 255.0),
    CenterCropVideo(256),
    NormalizeVideo((0.45, 0.45, 0.45), (0.225, 0.225, 0.225)),
])

In [49]:
def run_action_recognition(track_buffer):
    # track_buffer: list of frames (each frame is H x W x 3)
    clip = np.stack(track_buffer)  # (T, H, W, 3)
    clip = clip.transpose(0, 3, 1, 2)  # (T, 3, H, W)
    #print(clip.shape)
    # Convert to list of torch tensors
    video = [frame for frame in clip]

    # Process with the VideoMAE processor
    inputs = processor(video, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = video_model(**inputs)
        logits = outputs.logits
        predicted_class_idx = logits.argmax(-1).item()
        predicted_label = video_model.config.id2label[predicted_class_idx]
        confidence = logits[0, predicted_class_idx].item()


    return predicted_label, confidence

# With Tracker Pipeline

In [60]:
path = "video1.mp4"
cap = cv2.VideoCapture(path)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

output_path = "VideoMAE-wt-4.mp4"

h, w = frame_height, frame_width 
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Initialize tqdm progress bar

In [None]:
crop_size = 256
track__buffers = defaultdict(list)
save_dir = "clips"
frame_idx = 0
pred_class = "-"
pbar = tqdm(total=total_frames, desc="Processing video", unit="frame")

while True:
    success, frame = cap.read()
    if not success:
        print("Video ended")
        break 
    frame_idx += 1
    pbar.update(1)
    # if(frame_idx % 3 != 0):
    #     continue
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    copy_frame = frame.copy()
    detections = np.array(object_detection(frame))
    if(detections.size == 0):
        out.write(frame)
        continue
    tracker_results = tracker.update(detections, frame)
    #print(tracker_results)
    #annotated_img = visualize_tracking(tracker_results, frame)
    #Cropping separately as per id
    if(tracker_results.size == 0):
        out.write(frame)
        continue
    for x1,y1,x2,y2, track_id, conf, cls_id, frame_id in tracker_results:
        if(x2-x1 < 10 and y2-y1 < 10):
            out.write(frame)
            continue
        x1,y1,x2,y2 = int(x1), int(y1), int(x2), int(y2)
        crop = copy_frame[y1:y2, x1:x2]
        if crop.size == 0 or crop.shape[0] == 0 or crop.shape[1] == 0: #For crop being empty or invalid
            out.write(frame)
            continue
        crop = cv2.resize(crop, (crop_size, crop_size))
        track__buffers[track_id].append(crop)
        cv2.rectangle(frame, (x1,y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(frame, text=str(track_id), org=(x1, y1-5), color=(255,0,0), fontScale=0.5, fontFace=cv2.FONT_HERSHEY_PLAIN, thickness=1)
        cv2.putText(frame, text=str(pred_class), org=(x1+30, y1-5), color=(255,0,0), fontScale=0.5, fontFace=cv2.FONT_HERSHEY_PLAIN, thickness=1)

        if len(track__buffers[track_id]) == 16:
            out_path = os.path.join(save_dir, f"id_{int(track_id)}_frame_{int(frame_idx)}.mp4")
            #save_video_clip(track__buffers, out_path, track_id)
            preds = run_action_recognition(track__buffers[track_id])
            #print(preds)
            pred_class = preds[0]
            confidence = preds[1]
            track__buffers[track_id].clear()
    out.write(frame)
    # cv2.imshow("Tracker_frame", frame)
    # if cv2.waitKey(1) & 0xFF == ord('q'):
    #     break


cap.release()
cv2.destroyAllWindows()
out.release()

Processing video:   0%|          | 0/300 [00:00<?, ?frame/s]

Video ended


# Withoutt Tracker

In [77]:
path = "car.mp4"
cap = cv2.VideoCapture(path)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

output_path = "VideoMAE-2.mp4"

h, w = frame_height, frame_width 
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Initialize tqdm progress bar

In [78]:
crop_size = 256
track__buffers = defaultdict(list)
save_dir = "clips"
frame_idx = 0
pred_class = "-"

pbar = tqdm(total=total_frames, desc="Processing video", unit="frame")
while True:
    success, frame = cap.read()
    if not success:
        print("Video ended")
        break 
    frame_idx += 1
    pbar.update(1)
    # if(frame_idx % 3 != 0):
    #     continue
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    cv2.putText(frame, text=str(pred_class), org=(40, 40), color=(0,0,255), fontScale=2, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2)
    img = cv2.resize(frame, (256, 256))
    track__buffers[0].append(img)
    if len(track__buffers[0]) == 16:
        preds = run_action_recognition(track__buffers[0])
        #print(preds)
        pred_class = preds[0]
        confidence = preds[1]
        cv2.putText(frame, text=str(pred_class), org=(40, 40), color=(0,0,255), fontScale=2, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2)
        out.write(frame)
        track__buffers[0].clear()
    out.write(frame)
    # cv2.imshow("Tracker_frame", frame)
    # if cv2.waitKey(1) & 0xFF == ord('q'):
    #     break


cap.release()
cv2.destroyAllWindows()
out.release()

Processing video:   0%|          | 0/839 [00:00<?, ?frame/s]

Video ended
