# Task
Adapt the "https://huggingface.co/OPear/videomae-large-finetuned-UCF-Crime" model for anomaly detection in CCTV footage, given that you only have examples of normal behavior and need to identify deviations from this normal behavior.

## Load the pre-trained model

### Subtask:
Load the `OPear/videomae-large-finetuned-UCF-Crime` model from Hugging Face.


In [12]:
from transformers import VivitForVideoClassification, AutoImageProcessor

model_name = "prathameshdalal/vivit-b-16x2-kinetics400-UCF-Crime"
image_processor = AutoImageProcessor.from_pretrained(model_name)
model = VivitForVideoClassification.from_pretrained(model_name)

Let's inspect the model's configuration and layers to understand its structure.

In [14]:
print("Model Configuration:")
print(model.config)

print("\nModel Layers:")
print(model)

Model Configuration:
VivitConfig {
  "architectures": [
    "VivitForVideoClassification"
  ],
  "attention_probs_dropout_prob": 0.0,
  "hidden_act": "gelu_fast",
  "hidden_dropout_prob": 0.0,
  "hidden_size": 768,
  "id2label": {
    "0": "Abuse",
    "1": "Arrest",
    "2": "Arson",
    "3": "Assault",
    "4": "Burglary",
    "5": "Explosion",
    "6": "Fighting",
    "7": "Normal",
    "8": "RoadAccidents",
    "9": "Robbery",
    "10": "Shooting",
    "11": "Shoplifting",
    "12": "Stealing",
    "13": "Vandalism"
  },
  "image_size": 224,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "label2id": {
    "Abuse": 0,
    "Arrest": 1,
    "Arson": 2,
    "Assault": 3,
    "Burglary": 4,
    "Explosion": 5,
    "Fighting": 6,
    "Normal": 7,
    "RoadAccidents": 8,
    "Robbery": 9,
    "Shooting": 10,
    "Shoplifting": 11,
    "Stealing": 12,
    "Vandalism": 13
  },
  "layer_norm_eps": 1e-06,
  "model_type": "vivit",
  "num_attention_heads": 12,
  "num_channels": 3,


## Load and preprocess video


In [None]:
import cv2
import numpy as np

video_path = "/content/drive/MyDrive/Anomaly_Detection_in_Surveillance_Videos/Anomaly-Videos-Part-1/Abuse/Abuse001_x264.mp4"
target_size = (image_processor.size["shortest_edge"], image_processor.size["shortest_edge"])
num_frames_expected = model.config.num_frames

cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print(f"Error: Could not open video {video_path}")
else:
    frames = []
    frame_count = 0
    while frame_count < num_frames_expected:
        ret, frame = cap.read()
        if not ret:
            break

        resized_frame = cv2.resize(frame, target_size)
        frames.append(resized_frame)
        frame_count += 1

    cap.release()

    if len(frames) < num_frames_expected:
        print(f"Warning: Video has only {len(frames)} frames, expected {num_frames_expected}. Padding with the last frame.")
        while len(frames) < num_frames_expected:
            frames.append(frames[-1])


    video_array = np.array(frames)

    print(f"Preprocessed video shape: {video_array.shape}")

Preprocessed video shape: (32, 224, 224, 3)


In [None]:
import torch

# The shape is currently (num_frames, height, width, num_channels)
# We need (batch_size, num_channels, num_frames, height, width)
video_tensor = torch.from_numpy(video_array).permute(3, 0, 1, 2)

# batch dimension
video_tensor = video_tensor.unsqueeze(0)

video_tensor = image_processor(list(torch.unbind(video_tensor.squeeze(0).permute(1, 2, 3, 0))), return_tensors="pt").pixel_values

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
video_tensor = video_tensor.to(device)

print(f"Video tensor shape after processing: {video_tensor.shape}")
print(f"Device: {device}")

Video tensor shape after processing: torch.Size([1, 32, 3, 224, 224])
Device: cpu


In [22]:
import torch

model.eval()

with torch.no_grad():
    outputs = model(video_tensor)

print("Model outputs obtained.")

Model outputs obtained.


In [24]:
import torch.nn.functional as F

probabilities = F.softmax(outputs.logits, dim=1)

predicted_class_index = torch.argmax(probabilities, dim=1).item()

predicted_class_label = model.config.id2label[predicted_class_index]

predicted_probability = probabilities[0][predicted_class_index].item()

print(f"Predicted class: {predicted_class_label}")
print(f"Predicted probability: {predicted_probability:.4f}")

Predicted class: Normal
Predicted probability: 0.4804


# Task
Split the preprocessed video data located at "/content/drive/MyDrive/Preprocessed_Surveillance_Videos" into training and testing sets, placing each class into its own folder within the respective training and testing directories.

In [None]:
import os
import re

preprocessed_output_dir = "/content/drive/MyDrive/Preprocessed_Surveillance_Videos"

preprocessed_video_dirs = [d for d in os.listdir(preprocessed_output_dir) if os.path.isdir(os.path.join(preprocessed_output_dir, d))]

video_class_mapping = {}
for video_dir in preprocessed_video_dirs:
    match = re.match(r"([a-zA-Z]+)", video_dir)
    if match:
        class_label = match.group(1)
        video_class_mapping[video_dir] = class_label
    else:
        video_class_mapping[video_dir] = "Unknown" # Handle cases that don't match the pattern

print("Examples of video to class mapping:")
for i, (video_dir, class_label) in enumerate(video_class_mapping.items()):
    if i >= 5:
        break
    print(f"- {video_dir}: {class_label}")

Examples of video to class mapping:
- Abuse005_x264: Abuse
- Abuse007_x264: Abuse
- Abuse002_x264: Abuse
- Abuse001_x264: Abuse
- Abuse003_x264: Abuse


## Define split ratios

In [1]:
train_ratio = 0.8
test_ratio = 0.2

## Create directory structure

In [28]:
import os

split_dataset_dir = "/content/drive/MyDrive/Split_Surveillance_Videos"
train_dir = os.path.join(split_dataset_dir, "train")
test_dir = os.path.join(split_dataset_dir, "test")

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

unique_classes = set(video_class_mapping.values())

for class_label in unique_classes:
    os.makedirs(os.path.join(train_dir, class_label), exist_ok=True)
    os.makedirs(os.path.join(test_dir, class_label), exist_ok=True)

print(f"Created directory structure in: {split_dataset_dir}")
print(f"Unique classes found: {list(unique_classes)}")

Created directory structure in: /content/drive/MyDrive/Split_Surveillance_Videos
Unique classes found: ['Assault', 'Shooting', 'Burglary', 'Arrest', 'Robbery', 'Vandalism', 'Shoplifting', 'Abuse', 'RoadAccidents', 'Explosion', 'Fighting', 'Stealing', 'Arson']


## Split and move data


In [None]:
import shutil
import random

# Group video by class
class_to_video_dirs = {}
for video_dir, class_label in video_class_mapping.items():
    if class_label not in class_to_video_dirs:
        class_to_video_dirs[class_label] = []
    class_to_video_dirs[class_label].append(video_dir)

# Split and move videos
for class_label, video_dirs in class_to_video_dirs.items():
    random.shuffle(video_dirs)

    num_train = int(len(video_dirs) * train_ratio)
    train_videos = video_dirs[:num_train]
    test_videos = video_dirs[num_train:]

    print(f"\nProcessing class: {class_label}")
    print(f"  Total videos: {len(video_dirs)}")
    print(f"  Training videos: {len(train_videos)}")
    print(f"  Testing videos: {len(test_videos)}")

    # training videos
    for video_dir in train_videos:
        src_path = os.path.join(preprocessed_output_dir, video_dir)
        dest_path = os.path.join(train_dir, class_label, video_dir)
        shutil.move(src_path, dest_path)

    # testing videos
    for video_dir in test_videos:
        src_path = os.path.join(preprocessed_output_dir, video_dir)
        dest_path = os.path.join(test_dir, class_label, video_dir)
        shutil.move(src_path, dest_path)

    print(f"  Moved {len(train_videos)} videos to {os.path.join(train_dir, class_label)}")
    print(f"  Moved {len(test_videos)} videos to {os.path.join(test_dir, class_label)}")



Processing class: Abuse
  Total videos: 50
  Training videos: 40
  Testing videos: 10
  Moved 40 videos to /content/drive/MyDrive/Split_Surveillance_Videos/train/Abuse
  Moved 10 videos to /content/drive/MyDrive/Split_Surveillance_Videos/test/Abuse

Processing class: Arrest
  Total videos: 50
  Training videos: 40
  Testing videos: 10
  Moved 40 videos to /content/drive/MyDrive/Split_Surveillance_Videos/train/Arrest
  Moved 10 videos to /content/drive/MyDrive/Split_Surveillance_Videos/test/Arrest

Processing class: Arson
  Total videos: 50
  Training videos: 40
  Testing videos: 10
  Moved 40 videos to /content/drive/MyDrive/Split_Surveillance_Videos/train/Arson
  Moved 10 videos to /content/drive/MyDrive/Split_Surveillance_Videos/test/Arson

Processing class: Assault
  Total videos: 50
  Training videos: 40
  Testing videos: 10
  Moved 40 videos to /content/drive/MyDrive/Split_Surveillance_Videos/train/Assault
  Moved 10 videos to /content/drive/MyDrive/Split_Surveillance_Videos/test

## Verify split


In [30]:
import os

split_dataset_dir = "/content/drive/MyDrive/Split_Surveillance_Videos"
train_dir = os.path.join(split_dataset_dir, "train")
test_dir = os.path.join(split_dataset_dir, "test")

print("\nTraining Set:")
for class_label in os.listdir(train_dir):
    class_train_dir = os.path.join(train_dir, class_label)
    if os.path.isdir(class_train_dir):
        video_dirs = [d for d in os.listdir(class_train_dir) if os.path.isdir(os.path.join(class_train_dir, d))]
        print(f"  Class '{class_label}': {len(video_dirs)} videos")

        # Optional: Verify frames in a few videos
        if video_dirs:
            sample_videos = random.sample(video_dirs, min(3, len(video_dirs)))
            for video_dir in sample_videos:
                video_path = os.path.join(class_train_dir, video_dir)
                frames = [f for f in os.listdir(video_path) if f.endswith(".jpg")]
                print(f"    - {video_dir}: {len(frames)} frames")


print("\nTesting Set:")
for class_label in os.listdir(test_dir):
    class_test_dir = os.path.join(test_dir, class_label)
    if os.path.isdir(class_test_dir):
        video_dirs = [d for d in os.listdir(class_test_dir) if os.path.isdir(os.path.join(class_test_dir, d))]
        print(f"  Class '{class_label}': {len(video_dirs)} videos")

        if video_dirs:
            sample_videos = random.sample(video_dirs, min(3, len(video_dirs)))
            for video_dir in sample_videos:
                video_path = os.path.join(class_test_dir, video_dir)
                frames = [f for f in os.listdir(video_path) if f.endswith(".jpg")]
                print(f"    - {video_dir}: {len(frames)} frames")

Verifying data split and movement:

Training Set:
  Class 'Assault': 40 videos
    - Assault033_x264: 256 frames
    - Assault035_x264: 180 frames
    - Assault036_x264: 183 frames
  Class 'Shooting': 40 videos
    - Shooting037_x264: 61 frames
    - Shooting054_x264: 147 frames
    - Shooting014_x264: 1282 frames
  Class 'Burglary': 80 videos
    - Burglary085_x264: 576 frames
    - Burglary070_x264: 160 frames
    - Burglary037_x264: 384 frames
  Class 'Arrest': 40 videos
    - Arrest043_x264: 2160 frames
    - Arrest016_x264: 2118 frames
    - Arrest031_x264: 370 frames
  Class 'Robbery': 120 videos
    - Robbery054_x264: 396 frames
    - Robbery058_x264: 352 frames
    - Robbery027_x264: 1080 frames
  Class 'Vandalism': 40 videos
    - Vandalism048_x264: 1437 frames
    - Vandalism019_x264: 410 frames
    - Vandalism018_x264: 1112 frames
  Class 'Shoplifting': 40 videos
    - Shoplifting014_x264: 13340 frames
    - Shoplifting043_x264: 1904 frames
    - Shoplifting037_x264: 278 fra