In [14]:
# Imports
import os
import zipfile
import shutil
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import cv2
import numpy as np

In [None]:
# Extract all zip files in a directory

dataset_addr = "E:\\Datasets\\UCF_crime_from_dropbox"

for file in os.listdir(dataset_addr):
    if file.endswith(".zip"):
        with zipfile.ZipFile(os.path.join(dataset_addr, file), 'r') as zip_ref:
            zip_ref.extractall(dataset_addr)
            print(f"Extracted {file}")


Extracted Anomaly-Videos-Part-1.zip
Extracted Anomaly-Videos-Part-2.zip
Extracted Anomaly-Videos-Part-3.zip
Extracted Anomaly-Videos-Part-4.zip
Extracted Testing_Normal_Videos.zip
Extracted UCF_Crimes-Train-Test-Split.zip
Extracted Normal_Videos_for_Event_Recognition.zip
Extracted Training-Normal-Videos-Part-1.zip


In [None]:
# Data Split

# Define paths
dataset_path = dataset_addr
output_path = 'E:\\Datasets\\train_test_ucf_crime'
os.makedirs(output_path, exist_ok=True)

# Create directories for train, val, and test splits
splits = ['train', 'val', 'test']
for split in splits:
    split_path = os.path.join(output_path, split)
    os.makedirs(split_path, exist_ok=True)

# List all videos and their class labels
videos = []
labels = []
for part in ['Anomaly-Videos-Part-1', 'Anomaly-Videos-Part-2', 'Anomaly-Videos-Part-3', 'Anomaly-Videos-Part-4', 'Training-Normal-Videos-Part-1_']:
    part_path = os.path.join(dataset_path, part)
    for class_name in os.listdir(part_path):
        class_path = os.path.join(part_path, class_name)
        if os.path.isdir(class_path):
            for video in os.listdir(class_path):
                video_path = os.path.join(class_path, video)
                if os.path.isfile(video_path):
                    videos.append(video_path)
                    labels.append(class_name)

# Split the dataset
train_videos, test_videos, train_labels, test_labels = train_test_split(videos, labels, test_size=0.2, stratify=labels, random_state=42)
train_videos, val_videos, train_labels, val_labels = train_test_split(train_videos, train_labels, test_size=0.2, stratify=train_labels, random_state=42)

# Function to move files to the respective directories
def move_files(file_paths, labels, split):
    for file_path, label in zip(file_paths, labels):
        split_path = os.path.join(output_path, split, label)
        os.makedirs(split_path, exist_ok=True)
        shutil.copy(file_path, split_path)

# Move files to train, val, and test directories
move_files(train_videos, train_labels, 'train')
move_files(val_videos, val_labels, 'val')
move_files(test_videos, test_labels, 'test')

print("Dataset split completed.")

Dataset split completed.


In [None]:
# Generate annotation files

# Map each class folder to an integer label
CLASS_LABELS = {
    "Abuse": 0,
    "Arrest": 1,
    "Arson": 2,
    "Assault": 3,
    "Burglary": 4,
    "Explosion": 5,
    "Fighting": 6,
    "RoadAccidents": 7,
    "Robbery": 8,
    "Shooting": 9,
    "Shoplifting": 10,
    "Stealing": 11,
    "Vandalism": 12,
    "Normal": 13
}

def generate_annotations(data_dir, split_name):
    """
    Generates annotation files for a given data directory and split (train, val, test).

    Args:
        data_dir (str): Path to the base directory containing train, val, or test folders.
        split_name (str): The split to process (train, val, or test).
    """
    split_path = os.path.join(data_dir, split_name)
    output_file = f"{split_name}.txt"

    with open(output_file, "w") as f:
        for class_name, label in CLASS_LABELS.items():
            class_folder = os.path.join(split_path, class_name)
            if not os.path.exists(class_folder):
                print(f"Warning: {class_folder} does not exist. Skipping.")
                continue

            for video in os.listdir(class_folder):
                if video.endswith(".mp4"):
                    video_path = os.path.join(split_name, class_name, video)  # Relative path
                    f.write(f"{video_path} {label}\n")

    print(f"{split_name}.txt annotation file created successfully.")

# Paths
data_dir = "E:\\Datasets\\train_test_ucf_crime"  # path to the train_test_ucf_crime directory
output_dir = "E:\\Datasets\\train_test_ucf_crime"  # path to the train_test_ucf_crime directory

# Generate annotation files for train, val, and test splits
generate_annotations(data_dir, os.path.join(output_dir, "train"))
generate_annotations(data_dir, os.path.join(output_dir, "val"))
generate_annotations(data_dir, os.path.join(output_dir, "test"))


E:\Datasets\train_test_ucf_crime\train.txt annotation file created successfully.
E:\Datasets\train_test_ucf_crime\val.txt annotation file created successfully.
E:\Datasets\train_test_ucf_crime\test.txt annotation file created successfully.


Step 1: Prepare the dataset
First, create a custom dataset class to load and preprocess the UCF Crime dataset.

In [None]:
# Dataset class for loading UCF Crime dataset

class UCFCrimeDataset(Dataset):
    def __init__(self, annotations_file, video_dir, transform=None, num_frames=32, frame_size=(256, 256)):
        self.annotations = self.load_annotations(annotations_file)
        self.video_dir = video_dir
        self.transform = transform
        self.num_frames = num_frames
        self.frame_size = frame_size

    def load_annotations(self, annotations_file):
        with open(annotations_file, 'r') as f:
            lines = f.readlines()
        annotations = [line.strip().split() for line in lines]
        return annotations

    def preprocess_video(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frames = []
        while len(frames) < self.num_frames:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, self.frame_size)
            frame = frame / 255.0  # Normalize to [0, 1]
            frames.append(frame)
        cap.release()
        
        # If the video has fewer frames than num_frames, pad with zeros
        while len(frames) < self.num_frames:
            frames.append(np.zeros((self.frame_size[0], self.frame_size[1], 3)))
        
        frames = np.array(frames)
        if self.transform:
            frames = self.transform(frames)
        return frames

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        video_name, label = self.annotations[idx]
        video_path = os.path.join(self.video_dir, video_name)
        video_frames = self.preprocess_video(video_path)
        label = int(label)
        return video_frames, label


In [16]:
# Paths 

path_to_train_videos = "E:\\Datasets\\train_test_ucf_crime\\train"
path_to_val_videos = "E:\\Datasets\\train_test_ucf_crime\\val"
path_to_train_videos = "E:\\Datasets\\train_test_ucf_crime\\test"
path_to_train_ann = "E:\\Datasets\\train_test_ucf_crime\\train.txt"
path_to_val_ann = "E:\\Datasets\\train_test_ucf_crime\\val.txt"
path_to_test_ann = "E:\\Datasets\\train_test_ucf_crime\\test.txt"

In [22]:
# Create datasets and loaders
from torchvision.transforms import Compose, Lambda

transform = Compose([
    UniformTemporalSubsample(32),
    Lambda(lambda x: x / 255.0),
    NormalizeVideo(mean=[0.45, 0.45, 0.45], std=[0.225, 0.225, 0.225]),
    ShortSideScale(size=256),
    CenterCropVideo(256),
    PackPathway()
])

train_dataset = UCFCrimeDataset(path_to_train_ann, path_to_train_videos, transform=transform)
val_dataset = UCFCrimeDataset(path_to_val_ann, path_to_val_videos, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

NameError: name 'UniformTemporalSubsample' is not defined

Step 2: Modify the model
Load the pretrained SlowFast model and modify the final layers to match the number of classes in your dataset.

In [18]:
# Choose the `slowfast_r50` model 
model = torch.hub.load('facebookresearch/pytorchvideo', 'slowfast_r50', pretrained=True)

Using cache found in C:\Users\kruth/.cache\torch\hub\facebookresearch_pytorchvideo_main


In [6]:
print(model)

Net(
  (blocks): ModuleList(
    (0): MultiPathWayWithFuse(
      (multipathway_blocks): ModuleList(
        (0): ResNetBasicStem(
          (conv): Conv3d(3, 64, kernel_size=(1, 7, 7), stride=(1, 2, 2), padding=(0, 3, 3), bias=False)
          (norm): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (activation): ReLU()
          (pool): MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=[0, 1, 1], dilation=1, ceil_mode=False)
        )
        (1): ResNetBasicStem(
          (conv): Conv3d(3, 8, kernel_size=(5, 7, 7), stride=(1, 2, 2), padding=(2, 3, 3), bias=False)
          (norm): BatchNorm3d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (activation): ReLU()
          (pool): MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=[0, 1, 1], dilation=1, ceil_mode=False)
        )
      )
      (multipathway_fusion): FuseFastToSlow(
        (conv_fast_to_slow): Conv3d(8, 16, kernel_size=(7, 1, 1), st

In [19]:
import torch.nn as nn
# Modify the final layer to match the number of classes in UCF Crime
num_classes = 14  # Number of classes in UCF Crime dataset
model.blocks[6].proj = nn.Linear(model.blocks[6].proj.in_features, num_classes)

In [20]:
torch.cuda.is_available()

False

In [21]:
import torch.optim as optim

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs = [i.to(device) for i in inputs]
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}')

    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = [i.to(device) for i in inputs]
            labels = labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    print(f'Validation Loss: {val_loss/len(val_loader)}, Accuracy: {100 * correct / total}%')

ValueError: pic should be 2/3 dimensional. Got 4 dimensions.

From Here: trying out the example usage of slowfast usign torch

In [29]:
from typing import Dict
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.data.encoded_video import EncodedVideo
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample,
    UniformCropVideo
) 

In [16]:
# Set to GPU or CPU
device = "cpu"
model = model.eval()
model = model.to(device)

In [17]:
json_url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json"
json_filename = "kinetics_classnames.json"
try: urllib.URLopener().retrieve(json_url, json_filename)
except: urllib.request.urlretrieve(json_url, json_filename)

In [18]:
with open(json_filename, "r") as f:
    kinetics_classnames = json.load(f)

# Create an id to label name mapping
kinetics_id_to_classname = {}
for k, v in kinetics_classnames.items():
    kinetics_id_to_classname[v] = str(k).replace('"', "")

In [19]:
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 256
num_frames = 32
sampling_rate = 2
frames_per_second = 30
slowfast_alpha = 4
num_clips = 10
num_crops = 3

class PackPathway(torch.nn.Module):
    """
    Transform for converting video frames as a list of tensors. 
    """
    def __init__(self):
        super().__init__()
        
    def forward(self, frames: torch.Tensor):
        fast_pathway = frames
        # Perform temporal sampling from the fast pathway.
        slow_pathway = torch.index_select(
            frames,
            1,
            torch.linspace(
                0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
            ).long(),
        )
        frame_list = [slow_pathway, fast_pathway]
        return frame_list

transform =  ApplyTransformToKey(
    key="video",
    transform=Compose(
        [
            UniformTemporalSubsample(num_frames),
            Lambda(lambda x: x/255.0),
            NormalizeVideo(mean, std),
            ShortSideScale(
                size=side_size
            ),
            CenterCropVideo(crop_size),
            PackPathway()
        ]
    ),
)

# The duration of the input clip is also specific to the model.
clip_duration = (num_frames * sampling_rate)/frames_per_second

In [20]:
url_link = "https://dl.fbaipublicfiles.com/pytorchvideo/projects/archery.mp4"
video_path = 'archery.mp4'
try: urllib.URLopener().retrieve(url_link, video_path)
except: urllib.request.urlretrieve(url_link, video_path)

In [21]:
# Select the duration of the clip to load by specifying the start and end duration
# The start_sec should correspond to where the action occurs in the video
start_sec = 0
end_sec = start_sec + clip_duration

# Initialize an EncodedVideo helper class and load the video
video = EncodedVideo.from_path(video_path)

# Load the desired clip
video_data = video.get_clip(start_sec=start_sec, end_sec=end_sec)

# Apply a transform to normalize the video input
video_data = transform(video_data)

# Move the inputs to the desired device
inputs = video_data["video"]
inputs = [i.to(device)[None, ...] for i in inputs]

In [22]:
# Pass the input clip through the model
preds = model(inputs)

# Get the predicted classes
post_act = torch.nn.Softmax(dim=1)
preds = post_act(preds)
pred_classes = preds.topk(k=5).indices[0]

# Map the predicted classes to the label names
pred_class_names = [kinetics_id_to_classname[int(i)] for i in pred_classes]
print("Top 5 predicted labels: %s" % ", ".join(pred_class_names))

Top 5 predicted labels: archery, throwing axe, playing paintball, disc golfing, riding or walking with horse
