# Crime Classification â€“ CS 9548 Project
**Goal:** Exploring Machine Learning Techniques for Image Classification

## Import Libraries

In [1]:
%pip install -r "requirements.txt"

Note: you may need to restart the kernel to use updated packages.


In [2]:
import kagglehub
import pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import zipfile
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix


## Download and Extract Dataset

In [3]:
# Download latest version
import kagglehub
path = kagglehub.dataset_download("odins0n/ucf-crime-dataset")

print("Path to dataset files:", path)

Path to dataset files: C:\Users\admin\.cache\kagglehub\datasets\odins0n\ucf-crime-dataset\versions\1


## Create DataFrame of Images

In [4]:
import pathlib
import pandas as pd

# Get directories of train and test datasets
data_dir = pathlib.Path(path)
train_dir = data_dir / "Train"
test_dir = data_dir / "Test"

print("Data dir:", data_dir)
print(train_dir)
print(test_dir)

# Function to build dataframe
def build_image_df(root_dir):
    root_dir = pathlib.Path(root_dir)
    image_paths = list(root_dir.glob("*/*.png"))

    rows = []

    for p in image_paths:
        label = p.parent.name 
        rows.append({"image": str(p), "label": label})

    return pd.DataFrame(rows)


# Build train and test dataframes
train = build_image_df(train_dir)
test = build_image_df(test_dir)

print(train['label'].value_counts())
print(test['label'].value_counts())

Data dir: C:\Users\admin\.cache\kagglehub\datasets\odins0n\ucf-crime-dataset\versions\1
C:\Users\admin\.cache\kagglehub\datasets\odins0n\ucf-crime-dataset\versions\1\Train
C:\Users\admin\.cache\kagglehub\datasets\odins0n\ucf-crime-dataset\versions\1\Test
label
NormalVideos     947768
Stealing          44802
Robbery           41493
Burglary          39504
Arrest            26397
Shoplifting       24835
Fighting          24684
Arson             24421
RoadAccidents     23486
Abuse             19076
Explosion         18753
Vandalism         13626
Assault           10360
Shooting           7140
Name: count, dtype: int64
label
NormalVideos     64952
Burglary          7657
Shooting          7630
Shoplifting       7623
Explosion         6510
Arrest            3365
Arson             2793
RoadAccidents     2663
Assault           2657
Stealing          1984
Fighting          1231
Vandalism         1111
Robbery            835
Abuse              297
Name: count, dtype: int64


## Encode Label as Integer

In [5]:
from sklearn.preprocessing import LabelEncoder

# Create and fit label encoder on training labels
le = LabelEncoder()
train['label_idx'] = le.fit_transform(train['label'])

# Apply the same encoding to test labels
test['label_idx'] = le.transform(test['label'])

# Number of classes and mapping
num_classes = len(le.classes_)
print("Number of classes:", num_classes)
print("Class name vs label_idx:")
for index, cls in enumerate(le.classes_):
    print(index, cls)

Number of classes: 14
Class name vs label_idx:
0 Abuse
1 Arrest
2 Arson
3 Assault
4 Burglary
5 Explosion
6 Fighting
7 NormalVideos
8 RoadAccidents
9 Robbery
10 Shooting
11 Shoplifting
12 Stealing
13 Vandalism


We can see that the dataset has a large class imbalance. To mitigate this, we can do some under/oversampling.

## Under/Oversampling

In [6]:
import pandas as pd

TARGET = 50000
EXEMPT_CLASS = "Shooting"

def balance_dataset(df):
    balanced_parts = []

    for cls, group in df.groupby("label"):
        if cls == EXEMPT_CLASS:
            # Oversample shooting to 10k
            if len(group) < TARGET:
                group = group.sample(
                    n=TARGET,
                    replace=True,
                    random_state=42
                )
            balanced_parts.append(group)
        else:
            # Undersample everything else to 10k
            balanced_parts.append(group.sample(
                n=min(TARGET, len(group)),
                replace=False,
                random_state=42
            ))

    return pd.concat(balanced_parts).sample(frac=1, random_state=42).reset_index(drop=True)

train_balanced = balance_dataset(train)

In [7]:
print(train_balanced['label'].value_counts())

label
NormalVideos     50000
Shooting         50000
Stealing         44802
Robbery          41493
Burglary         39504
Arrest           26397
Shoplifting      24835
Fighting         24684
Arson            24421
RoadAccidents    23486
Abuse            19076
Explosion        18753
Vandalism        13626
Assault          10360
Name: count, dtype: int64


## Split Train Into Train and Val

In [8]:
from sklearn.model_selection import train_test_split

# 80% train 20% val
train, val = train_test_split(
    train_balanced, test_size=0.20, random_state=42, stratify=train_balanced['label_idx']
)

## Set up PyTorch

In [9]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

# Set constants for PyTorch
IMG_SIZE = (48, 48)
BATCH_SIZE = 32

## Data Augmentation

In [10]:
# Data augmentation to training dataset
train_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.9, 1.1)),
    transforms.ColorJitter(contrast=0.1),
    transforms.ToTensor()
])

# Simple transforms to val/test dataset
eval_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
])

## Create Pipeline For PyTorch

In [11]:
class CrimeDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = row['image']
        label = int(row['label_idx'])

        # Load the image
        image = Image.open(img_path).convert("L")

        # Transform if needed (might not need for val/test)
        if self.transform:
            image = self.transform(image)

        return image, label

## Create DataLoaders

In [12]:
# First convert to CrimeDataset classes
train_dataset = CrimeDataset(train, train_transform)
val_dataset = CrimeDataset(val, eval_transform)
test_dataset = CrimeDataset(test, eval_transform)

# Transform to DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Create CNN Model

In [13]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # --- Block 1 ---
        self.conv1 = nn.Conv2d(1, 128, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.drop1 = nn.Dropout(0.4)    

        # --- Block 2 ---
        self.conv2 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.drop2 = nn.Dropout(0.4)    

        # --- Block 3 ---
        self.conv3 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.drop3 = nn.Dropout(0.4)   

        # --- Block 4 ---
        self.conv4 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.pool4 = nn.MaxPool2d(2, 2)
        self.drop4 = nn.Dropout(0.4)  

        self.flatten_dim = 512 * 3 * 3

        # FC layers
        self.fc1 = nn.Linear(self.flatten_dim, 512)
        self.drop_fc1 = nn.Dropout(0.4)
        self.fc2 = nn.Linear(512, 256)
        self.drop_fc2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(256, num_classes)

    def forward(self, x):
        # --- Block 1 ---
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.drop1(x)

        # --- Block 2 ---
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.drop2(x)

        # Block 3
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = self.drop3(x)

        # Block 4
        x = F.relu(self.conv4(x))
        x = self.pool4(x)
        x = self.drop4(x)

        # Flatten
        x = x.view(x.size(0), -1) 
        
        # FC layers
        x = F.relu(self.fc1(x))
        x = self.drop_fc1(x)
        x = F.relu(self.fc2(x))
        x = self.drop_fc2(x)
        x = self.fc3(x)          
        return x

device = torch.device("cpu")
print("Using device:", device)

model = Net(num_classes=num_classes).to(device)


Using device: cpu


## Training and Testing

In [14]:
import torch.optim as optim

def train(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in loader:
        images = images.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

In [15]:
def test(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.inference_mode():
        for images, labels in loader:
            images = images.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)

            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

In [16]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
EPOCHS = 10

for epoch in range(1, EPOCHS + 1):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = test(model, val_loader, criterion, device)

    print(
        f"Epoch {epoch}/{EPOCHS} | "
        f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
        f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
    )

Epoch 1/10 | Train Loss: 1.4724, Train Acc: 0.5359 | Val Loss: 0.9532, Val Acc: 0.7578
Epoch 2/10 | Train Loss: 0.9833, Train Acc: 0.6989 | Val Loss: 0.8549, Val Acc: 0.8033
Epoch 3/10 | Train Loss: 0.8225, Train Acc: 0.7506 | Val Loss: 0.6152, Val Acc: 0.8539
Epoch 4/10 | Train Loss: 0.7251, Train Acc: 0.7824 | Val Loss: 0.4581, Val Acc: 0.8957
Epoch 5/10 | Train Loss: 0.6630, Train Acc: 0.8020 | Val Loss: 0.3705, Val Acc: 0.9141
Epoch 6/10 | Train Loss: 0.6163, Train Acc: 0.8168 | Val Loss: 0.4444, Val Acc: 0.9023
Epoch 7/10 | Train Loss: 0.5782, Train Acc: 0.8294 | Val Loss: 0.3606, Val Acc: 0.9223
Epoch 8/10 | Train Loss: 0.5592, Train Acc: 0.8357 | Val Loss: 0.4629, Val Acc: 0.8975
Epoch 9/10 | Train Loss: 0.5319, Train Acc: 0.8438 | Val Loss: 0.3043, Val Acc: 0.9265
Epoch 10/10 | Train Loss: 0.5139, Train Acc: 0.8502 | Val Loss: 0.5879, Val Acc: 0.8486


In [17]:
test_loss, test_acc = test(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")

Test Loss: 2.2808, Test Acc: 0.4881
