In [None]:
!pip install torchsummary

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torchsummary import summary
import warnings
import os
import sys
from tqdm import tqdm
import cv2

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [3]:
print( torch.cuda.is_available() )

True


In [4]:
RESOLUTION = 224

In [5]:
warnings.filterwarnings( "ignore", module = "matplotlib\..*" )



**DATA AUGMENTATION AND VISUALIZING FUNCTIONS**

In [6]:
import os
import cv2
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from torchvision.transforms import functional as F
from tqdm import tqdm
from typing import Callable, Any

# Define the resolution
RESOLUTION = 224

# Custom data augmentation functions
def random_horizontal_flip(image: Image.Image) -> Image.Image:
    if torch.rand(1) < 0.5:
        image = F.hflip(image)
    return image

def random_rotation(image: Image.Image) -> Image.Image:
    angle = torch.randint(-10, 10, (1,))
    image = F.rotate(image, angle.item())
    return image

# Define the transformation pipeline including multiple data augmentations
transformer = transforms.Compose([
    random_horizontal_flip,
    random_rotation,
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
    transforms.Resize((RESOLUTION, RESOLUTION))
])

In [7]:
import torch
import matplotlib.pyplot as plt
from torchvision import transforms

# Define a transform to resize images to 224x224
 

def plot_image_from_list(images, labels, count):
    plt.figure(figsize=(20, 20))  # Increase figure size for larger images
    for i in range(count[0] * count[1]):
        plt.subplot(count[0], count[1], i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        img = images[i].to("cpu").numpy()
        if img.shape[0] == 1:  # If the image is single-channel, assume it's grayscale
            img = img.squeeze()
            plt.imshow(img, cmap="gray")
        else:
            plt.imshow(img.transpose((1, 2, 0)))  # Convert from CHW to HWC format
        plt.xlabel(labels[i])
    plt.show()

def plot_randomly_from_dataset(dataset):
    images = []  # List to store selected images
    labels = []  # List to store corresponding labels

    for _ in range(25):
        index = torch.randint(0, len(dataset), (1,)).item()  # Randomly select an index
        image, label = dataset[index]  # Retrieve image and label by index
        image = resize_transform(image)  # Resize image to 224x224
        images.append(image)
        labels.append(dataset.label_id2str(label.item()))

    plot_image_from_list(images, labels, (5, 5))  # Plot the images in a 5x5 grid


**DATA PATHS**

In [9]:
DATA_SOURCE = {
    "Abuse": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-1/Abuse",
    "Arrest": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-1/Arrest",
    "Arson": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-1/Arson",
    "Assault": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-1/Assault",
    "Burglary": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-2/Burglary",
    "Explosion": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-2/Explosion",
    "Fighting": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Anomaly-Videos-Part-2/Fighting",
    "Normal": "/kaggle/input/crimeucfdataset/Anomaly_Dataset/Anomaly_Videos/Normal-Videos-Part-1"
}

**TRAIN-TEST SPLIT AND VIDEO FRAMES GENERATION WITH DATA AUGMENTATION**

In [9]:
import torch
from torch.utils.data import Dataset
import os
import cv2
from tqdm import tqdm
from PIL import Image

# Assume transformer and DATA_SOURCE are defined somewhere else in the code

class CrimeDataset(Dataset):
    def __init__(self, train=True, train_test_split=0.8, random_state=42):
        torch.manual_seed(random_state)  # Set the random seed for reproducibility
        
        self._data = []  # List to store the image data
        self._labels = []  # List to store the corresponding labels
        self._inclusion_probability = train_test_split if train else 1.0 - train_test_split  # Probability of including a file in the dataset
        self._frame_interval = 30  # Interval to skip frames when reading video
        
        print(f"Loading {'train' if train else 'test' } dataset...")
        for label, data_path in DATA_SOURCE.items():  # Iterate over the data sources
            print(f"Loading Label {label}...")
            for file in tqdm(os.listdir(data_path)):  # Iterate over the files in the data source directory
                if file.endswith(".mp4") and torch.rand(1).item() <= self._inclusion_probability:
                    path = os.path.join(data_path, file)
                    data, labels = self._parse_file(path, label)  # Parse the video file
                    self._data.extend(data)
                    self._labels.extend(labels)
        print(f"Finished loading {'train' if train else 'test' } dataset... Loaded {len(self._data)} images.")
    
    def _parse_file(self, path, label):
        if not os.path.exists(path):
            return [], []
        
        data = []  # List to store frames from the video
        labels = []  # List to store labels corresponding to the frames
        
        cap = cv2.VideoCapture(path)  # Open the video file
        
        success, image = cap.read()
        while success:
            try:
                if True:  # This block processes each frame
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)  # Convert frame to YUV color space
                    Y, U, V = cv2.split(image)  # Split the frame into Y, U, and V components
                    image = transformer(Image.fromarray(Y))  # Apply the transformation to the Y component
                    data.append(image)  # Add the transformed frame to the data list
                    labels.append(label)  # Add the label to the labels list
            except Exception as e:
                print(f"Error loading file {path}: {e}")
            
            count = 0
            while success and count < self._frame_interval:
                success, image = cap.read()  # Read the next frame
                count += 1
        return data, labels
        
    def label_str2id(self, label):
        labels = DATA_SOURCE.keys()
        return list(labels).index(label)  # Convert string label to an ID

    def label_id2str(self, label):
        labels = DATA_SOURCE.keys()
        return list(labels)[label]  # Convert label ID back to a string
        
    def __len__(self):
        return len(self._labels)  # Return the total number of samples in the dataset

    def __getitem__(self, idx):
        data = self._data[idx]  # Get the data (image) at the specified index
        label = self._labels[idx]  # Get the label at the specified index
        return data, torch.tensor([self.label_str2id(label)])  # Return the data and label as a tensor


In [None]:
# Create the training dataset
train_dataset = CrimeDataset(True)  # Initialize with training data

# Create the testing dataset
test_dataset = CrimeDataset(False)  # Initialize with testing data


In [None]:
# Load the test dataset randomly
plot_randomly_from_dataset(test_dataset)

In [None]:
# Create a DataLoader for the training dataset
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Create a DataLoader for the testing dataset
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)


In [8]:
# Define number of classes
num_classes = 8

**Convulation Network (Feature Extraction)**

In [9]:
import torch.nn as nn

class CrimeModelCNN(nn.Module):
    def __init__(self):
        super(CrimeModelCNN, self).__init__()
        # First convolutional layer: input channels=1, output channels=64, kernel size=3x3, padding=same
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding="same")
        # Leaky ReLU activation function with a negative slope of 0.1
        self.leaky_relu = nn.LeakyReLU(0.1)
        # Max pooling layer with kernel size=2x2 and stride=2
        self.max_pool1 = nn.MaxPool2d(2, 2)
        # Dropout layer with a probability of 0.25
        self.dropout1 = nn.Dropout(0.25)
        # Second convolutional layer: input channels=64, output channels=128, kernel size=3x3, padding=same
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding="same")
        # Max pooling layer with kernel size=2x2 and stride=2
        self.max_pool2 = nn.MaxPool2d(2, 2)
        # Dropout layer with a probability of 0.25
        self.dropout2 = nn.Dropout(0.25)
        # Third convolutional layer: input channels=128, output channels=256, kernel size=3x3, padding=same
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding="same")
        # Max pooling layer with kernel size=2x2 and stride=2
        self.max_pool3 = nn.MaxPool2d(2, 2)
        # Dropout layer with a probability of 0.4
        self.dropout3 = nn.Dropout(0.4)
        # Flatten layer to convert the 3D tensor to a 1D tensor
        self.flatten = nn.Flatten()
        # Fully connected layer: input size=28*28*256, output size=256
        self.fc1 = nn.Linear(28 * 28 * 256, 256)
        # Dropout layer with a probability of 0.5
        self.dropout4 = nn.Dropout(0.5)

    def forward(self, x):
        # Forward pass through the network
        x = self.conv1(x)
        x = self.leaky_relu(x)
        x = self.max_pool1(x)
        x = self.dropout1(x)
        x = self.conv2(x)
        x = self.leaky_relu(x)
        x = self.max_pool2(x)
        x = self.dropout2(x)
        x = self.conv3(x)
        x = self.leaky_relu(x)
        x = self.max_pool3(x)
        x = self.dropout3(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.leaky_relu(x)
        x = self.dropout4(x)
        return x


**LSTM Network**

In [10]:

class CrimeModelLSTM(nn.Module):
    def __init__(self):
        super(CrimeModelLSTM, self).__init__()
        # Define the first LSTM layer: input_size=1, hidden_size=8, batch_first=True, bidirectional=False
        self.lstm1 = nn.LSTM(1, 8, batch_first=True, bidirectional=False)
        # Define the second LSTM layer: input_size=8, hidden_size=8, batch_first=True, bidirectional=False
        self.lstm2 = nn.LSTM(8, 8, batch_first=True, bidirectional=False)
        # Define the fully connected layer: input_size=8, output_size=4
        self.fc = nn.Linear(8, 4)
        # Define the drpout layer with a probability of 0.2
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        # Forward pass through the network
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]  # Select the last output of the LSTM sequence
        x = self.fc(x)
        x = self.dropout(x)
        return x


**Conv + LSTM Network**

In [11]:
class CrimeModel(nn.Module):
    def __init__(self):
        super(CrimeModel, self).__init__()
        self.cnn = CrimeModelCNN()
        self.lstm = CrimeModelLSTM()
        self.fc = nn.Linear(260, 8)  # Adjust the input size according to your concatenation axis

    def forward(self, x):
        x_cnn = x
        x_lstm = torch.reshape(x, (x.shape[0], RESOLUTION * RESOLUTION, 1))
        x_cnn = self.cnn(x_cnn)
        x_lstm = self.lstm(x_lstm)
        x_combined = torch.cat((x_cnn, x_lstm), dim=1)
        x = self.fc(x_combined)
#         return F.softmax(x, dim=1)
        return x

In [12]:
# Initialize the model
model = CrimeModel()

In [16]:
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

**MODEL LAYERS**

In [17]:
model = nn.DataParallel(model)
model.to(device)

DataParallel(
  (module): CrimeModel(
    (cnn): CrimeModelCNN(
      (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
      (leaky_relu): LeakyReLU(negative_slope=0.1)
      (max_pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (dropout1): Dropout(p=0.25, inplace=False)
      (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
      (max_pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (dropout2): Dropout(p=0.25, inplace=False)
      (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=same)
      (max_pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (dropout3): Dropout(p=0.4, inplace=False)
      (flatten): Flatten(start_dim=1, end_dim=-1)
      (fc1): Linear(in_features=200704, out_features=256, bias=True)
      (dropout4): Dropout(p=0.5, inplace=False)
    )
    (lstm): CrimeModelLSTM(
      (lstm1): LST

In [18]:
NUM_EPOCHS = 5

In [None]:
for g in optimizer.param_groups:
    g['lr'] = 0.001

**MODEL TRAINING**

In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, roc_curve

train_loss_history = []
train_accuracy_history = []
test_loss_history = []
test_accuracy_history = []
test_auc_roc_history = []

for epoch in range(NUM_EPOCHS):
    model.train()
    total_correct_train = 0
    total_train_loss = 0

    print(f"Epoch : {epoch + 1}...")

    for batch_idx, (data, label) in enumerate(train_loader):
        data = data.to(device)
        label = label.to(device)

        # Ensure label is a 1D tensor of class indices
        label = label.squeeze()

        preds = model(data)
        loss = criterion(preds, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        total_correct_train += (preds.argmax(dim=1) == label).sum().item()

        if batch_idx % 10 == 0:
            print(f"Loss: {loss.item()}, Batch: {batch_idx + 1}/{len(train_loader)}", end="\r")

    # Calculate average loss and accuracy for this epoch
    avg_train_loss = total_train_loss / len(train_loader)
    train_accuracy = total_correct_train / len(train_loader.dataset)

    train_loss_history.append(avg_train_loss)
    train_accuracy_history.append(train_accuracy)

    print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")

    # Save the model after each epoch
    torch.save(model.state_dict(), f"crime_model_epoch_{epoch + 1}.pth")

    # Testing
    model.eval()
    total_correct_test = 0
    total_test_loss = 0
    all_test_preds = []
    all_test_labels = []

    with torch.no_grad():
        for data, label in test_loader:
            data = data.to(device)
            label = label.to(device)

            # Ensure label is a 1D tensor of class indices
            label = label.squeeze()

            preds = model(data)
            loss = criterion(preds, label)

            total_test_loss += loss.item()
            total_correct_test += (preds.argmax(dim=1) == label).sum().item()

            # Normalize the predicted probabilities
            preds_normalized = nn.functional.softmax(preds, dim=1)
            all_test_preds.append(preds_normalized)
            all_test_labels.append(label)

    avg_test_loss = total_test_loss / len(test_loader)
    test_accuracy = total_correct_test / len(test_loader.dataset)

    test_loss_history.append(avg_test_loss)
    test_accuracy_history.append(test_accuracy)
    # Calculate and store AUC-ROC
    all_test_preds = torch.cat(all_test_preds, dim=0)
    all_test_labels = torch.cat(all_test_labels, dim=0)
    test_auc_roc = roc_auc_score(all_test_labels.cpu(), all_test_preds.cpu(), multi_class='ovr')
    test_auc_roc_history.append(test_auc_roc)

    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}, Test AUC-ROC: {test_auc_roc:.4f}")

# Plotting the accuracies
plt.figure(figsize=(10, 5))
plt.title("Training and Test Accuracy")
plt.plot(train_accuracy_history, label="Train Accuracy")
plt.plot(test_accuracy_history, label="Test Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.title("Training Loss")
plt.plot(train_loss_history, label="Train Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.title("Test Loss")
plt.plot(test_loss_history, label="Test Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.title("Test AUC-ROC")
plt.plot(test_auc_roc_history, label="Test AUC-ROC")
plt.xlabel("Epochs")
plt.ylabel("AUC-ROC")
plt.legend()
plt.show()

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt

def plot_randomly_from_dataset_model(__dataset, __model):
    images = []  # List to store selected images
    labels = []  # List to store corresponding labels and model predictions

    # Define the transformation to resize images to 224x224 and then convert to tensor
    resize_transform = transforms.Resize((224, 224))
    to_tensor_transform = transforms.ToTensor()

    for _ in range(25):
        index = torch.randint(0, len(__dataset), (1,)).item()  # Randomly select an index
        image, label = __dataset[index]  # Retrieve image and label by index

        # If the image is already a tensor, convert it back to PIL Image first
        if isinstance(image, torch.Tensor):
            image = transforms.ToPILImage()(image)

        # Apply the resize transformation
        image = resize_transform(image)
        image_tensor = to_tensor_transform(image)  # Convert resized image to tensor
        images.append(image_tensor)

        # Get the model prediction for the image
        with torch.no_grad():
            image_tensor = image_tensor.unsqueeze(0).to(device)  # Add batch dimension and move to device
            result = __model(image_tensor).squeeze().cpu().argmax()

        # Append the actual label and predicted label to the labels list
        labels.append(f"{__dataset.label_id2str(label.item())} -> {__dataset.label_id2str(result.item())}")

    plot_image_from_list(images, labels, (5, 5))  # Plot the images in a 5x5 grid

def plot_image_from_list(images, labels, count):
    plt.figure(figsize=(20, 20))  # Increase figure size for larger images
    for i in range(count[0] * count[1]):
        plt.subplot(count[0], count[1], i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        img = images[i].to("cpu").numpy()
        if img.shape[0] == 1:  # If the image is single-channel, assume it's grayscale
            img = img.squeeze()
            plt.imshow(img, cmap="gray")
        else:
            plt.imshow(img.transpose((1, 2, 0)))  # Convert from CHW to HWC format
        plt.xlabel(labels[i])
    plt.show()

**MODEL TESTING**

In [None]:

model.to("cpu")  # Move model to CPU before saving


In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt

def predict_and_plot(image_path, model, label_id2str, device=None):
    """
    Loads an image, applies transformations, makes a prediction using the model, 
    and plots the image with the predicted label.

    Args:
        image_path (str): Path to the image file.
        model (torch.nn.Module): Trained PyTorch model.
        label_id2str (function): Function mapping label indices to class names.
        device (str, optional): Device to run inference on ('cpu' or 'cuda'). 
                                Defaults to auto-detect.
    """
    # Auto-detect device if not provided
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    # Define the transformation pipeline
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to match model input size
        transforms.ToTensor(),          # Convert image to tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard ImageNet normalization
    ])
    
    try:
        # Load and preprocess the image
        image = Image.open(image_path).convert("RGB")  # Ensure it's RGB
    except Exception as e:
        print(f"Error loading image: {e}")
        return

    # Apply transformations and add a batch dimension
    image_tensor = transform(image).unsqueeze(0).to(device)  # Shape: [1, 3, 224, 224]

    # Model inference
    model.to(device)
    model.eval()
    with torch.no_grad():
        output = model(image_tensor).cpu().squeeze()
        predicted_label = output.argmax().item()  # Get class index

    # Convert label index to class name
    predicted_label_str = label_id2str(predicted_label)

    # Plot the image with prediction
    plt.figure(figsize=(6, 6))
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(image)
    plt.xlabel(f"Prediction: {predicted_label_str}", fontsize=14, color="blue", fontweight='bold')
    plt.show()


In [13]:
print(model)

CrimeModel(
  (cnn): CrimeModelCNN(
    (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (leaky_relu): LeakyReLU(negative_slope=0.1)
    (max_pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (dropout1): Dropout(p=0.25, inplace=False)
    (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (max_pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (dropout2): Dropout(p=0.25, inplace=False)
    (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (max_pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (dropout3): Dropout(p=0.4, inplace=False)
    (flatten): Flatten(start_dim=1, end_dim=-1)
    (fc1): Linear(in_features=200704, out_features=256, bias=True)
    (dropout4): Dropout(p=0.5, inplace=False)
  )
  (lstm): CrimeModelLSTM(
    (lstm1): LSTM(1, 8, batch_first=True)
    (lstm2): LSTM(8, 8, batch_firs

In [18]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import cv2

# Ensure the model is on the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to("cuda")
model.eval()  # Set model to evaluation mode

# Define transformations to match training preprocessing
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale if needed
    transforms.Resize((224, 224)),  # Resize to match the model's input shape
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize (adjust values as per training)
])

def test_single_image(image_path, model):
    """
    Function to test the model on a single image.
    
    Args:
        image_path (str): Path to the image file.
        model (torch.nn.Module): Trained model.

    Returns:
        Predicted class label.
    """
    # Load and preprocess image
    image = Image.open(image_path).convert("L")  # Convert to grayscale
    image = transform(image)  # Apply transformations
    image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device

    # Make prediction
    with torch.no_grad():
        output = model(image)
        predicted_label = torch.argmax(output, dim=1).item()

    print(f"Predicted Label: {label_id2str(predicted_label)}")
    return predicted_label

# Function to test live feed from a camera
def test_live_feed(frame, model):
    """
    Function to test the model on a single frame from a live video feed.
    
    Args:
        frame (np.array): Frame from video feed.
        model (torch.nn.Module): Trained model.

    Returns:
        Predicted class label.
    """
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert frame to grayscale
    image = Image.fromarray(image)  # Convert NumPy array to PIL image
    image = transform(image)  # Apply preprocessing
    image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device

    # Make prediction
    with torch.no_grad():
        output = model(image)
        predicted_label = torch.argmax(output, dim=1).item()

    return label_id2str(predicted_label)

# Example usage
test_single_image("gun.jpg", model)


Predicted Label: Arrest


1

In [21]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import cv2

# Ensure the model is on the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()  # Set model to evaluation mode

# Define transformations to match training preprocessing
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale if needed
    transforms.Resize((224, 224)),  # Resize to match the model's input shape
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize (adjust values as per training)
])

def process_video(video_path, model):
    """
    Function to process a full video and make predictions on each frame.
    
    Args:
        video_path (str): Path to the video file.
        model (torch.nn.Module): Trained model.
    """
    cap = cv2.VideoCapture(video_path)  # Open the video file
    
    if not cap.isOpened():
        print("Error: Could not open video.")
        return
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Exit loop if video ends
        
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert frame to grayscale
        image = Image.fromarray(image)  # Convert NumPy array to PIL image
        image = transform(image)  # Apply preprocessing
        image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device

        # Make prediction
        with torch.no_grad():
            output = model(image)
            predicted_label = torch.argmax(output, dim=1).item()
        
        # Display prediction on the frame
        label_text = label_id2str(predicted_label)
        cv2.putText(frame, f"Prediction: {label_text}", (10, 50), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        
        # Show the frame
        cv2.imshow("Video Classification", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break  # Press 'q' to exit
    
    cap.release()
    cv2.destroyAllWindows()

# Example usage
process_video("TEST2.mp4", model)


In [25]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import cv2

# Ensure the model is on the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()  # Set model to evaluation mode

# Define transformations to match training preprocessing
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale if needed
    transforms.Resize((224, 224)),  # Resize to match the model's input shape
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize (adjust values as per training)
])

def label_id2str(label_id):
    label_map = {0: "Abuse", 1: "Arrest", 2: "Arson", 3: "Assault", 4: "Burglary", 5: "Explosion", 6: "Fighting", 7: "Normal"}
    return label_map.get(label_id, "Unknown")

def process_video(video_path, model):
    """
    Function to process a full video and make predictions on each frame.
    
    Args:
        video_path (str): Path to the video file.
        model (torch.nn.Module): Trained model.
    """
    cap = cv2.VideoCapture(video_path)  # Open the video file
    
    if not cap.isOpened():
        print("Error: Could not open video.")
        return
    
    metadata = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Exit loop if video ends
        
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert frame to grayscale
        image = Image.fromarray(image)  # Convert NumPy array to PIL image
        image = transform(image)  # Apply preprocessing
        image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device

        # Make prediction
        with torch.no_grad():
            output = model(image)
            predicted_label = torch.argmax(output, dim=1).item()
        
        # Get the class name and print it to the backend
        label_text = label_id2str(predicted_label)
        print(f"{label_text}")  # Printing to the backend/log

        # Store label in metadata list
        metadata.append(label_text)
        
        # Display prediction on the frame
        cv2.putText(frame, f"Prediction: {label_text}", (10, 50), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        
        # Show the frame
        cv2.imshow("Video Classification", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break  # Press 'q' to exit
    
    cap.release()
    cv2.destroyAllWindows()

    # Return metadata for potential use in frontend
    return metadata

# Example usage
metadata = process_video("TEST2.mp4", model)

# Now metadata contains the list of predicted labels per frame.
print("Metadata")
print(metadata)

Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Arrest
Metadata
['Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest', 'Arrest']


In [36]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import cv2
import base64
import socketio
import time
# Ensure the model is on the correct device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()  # Set model to evaluation mode
sio = socketio.Client()
sio.connect('http://127.0.0.1:5000')

# Define transformations to match training preprocessing
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale if needed
    transforms.Resize((224, 224)),  # Resize to match the model's input shape
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize (adjust values as per training)
])


def process_video(video_path, model):
    """
    Function to process a full video and make predictions on each frame.
    
    Args:
        video_path (str): Path to the video file.
        model (torch.nn.Module): Trained model.
    """
    cap = cv2.VideoCapture(video_path)  # Open the video file
    
    if not cap.isOpened():
        print("Error: Could not open video.")
        return
    
    class_list = []  # To store predicted classes
    threat_level_list = []  # To store corresponding threat levels
    frame_interval = 1.0 / 10
    last_time = time.time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Exit loop if video ends
        
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert frame to grayscale
        image = Image.fromarray(image)  # Convert NumPy array to PIL image
        image = transform(image)  # Apply preprocessing
        image = image.unsqueeze(0).to(device)  # Add batch dimension and move to device

        # Make prediction
        with torch.no_grad():
            output = model(image)
            predicted_label = torch.argmax(output, dim=1).item()
        
        # Get the class name
        label_text = label_id2str(predicted_label)
        
        # Get the corresponding threat level
        threat_level = get_threat_level(label_text)
        
        # Print the class and threat level to the backend
        alert = f" {label_text}" 
        threat = f" {threat_level}"
        sio.emit('Alert', {'data': alert})
        sio.emit('Threat', {'data': threat})
        
        # Store class and threat level in separate lists
        class_list.append(label_text)
        threat_level_list.append(threat_level)
        
        # Display prediction and threat level on the frame
        cv2.putText(frame, f"Prediction: {label_text}", (10, 50), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2, cv2.LINE_AA)
        cv2.putText(frame, f"Threat level: {threat_level}", (10, 100), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 255), 2, cv2.LINE_AA)
        
        # Show the frame
        
        frame = cv2.resize(frame, (640, 480))  # Resize the frame

        _, buffer = cv2.imencode('.jpg', frame)  # Convert frame to JPG format
        frame_bytes = buffer.tobytes()  # Convert the buffer to a byte object

        # Encode the byte object to base64
        frame_base64 = base64.b64encode(frame_bytes).decode('utf-8')  # Base64 encode the frame

        # Emit the base64 frame to the socket server
        sio.emit('video_frame_1', {'data': frame_base64})
        
    
    cap.release()
    cv2.destroyAllWindows()

    # Return both lists for potential use in frontend or further processing
    return class_list, threat_level_list

# Example usage
class_list, threat_level_list = process_video("final.mp4", model)

# Now class_list contains the predicted classes, and threat_level_list contains the corresponding threat levels.
print("Class List:")
print(class_list)
print("\nThreat Level List:")
print(threat_level_list)


KeyboardInterrupt: 

packet queue is empty, aborting


In [22]:
def label_id2str(label_id):
    label_map = {0: "Abuse", 1: "Arrest", 2: "Arson", 3: "Assault", 4: "Burglary", 5: "Explosion", 6: "Fighting", 7: "Normal"}
    return label_map.get(label_id, "Unknown")

def get_threat_level(label):
    threat_levels = {
        "Abuse": "Low",
        "Arrest": "Low",
        "Arson": "High",
        "Assault": "Medium",
        "Burglary": "Medium",
        "Explosion": "High",
        "Fighting": "Medium",
        "Normal": "No threat"
    }
    return threat_levels.get(label, "Unknown")

In [None]:
model.to("cuda")

In [20]:
model=CrimeModel()

In [13]:
import torch

def save_model(model, path="crime_model.pth"):
    """
    Saves the trained model while ensuring it is compatible for reloading.
    
    Args:
        model (torch.nn.Module): Trained model.
        path (str): Path to save the model.
    """
    if isinstance(model, torch.nn.DataParallel):
        model_state = model.module.state_dict()  # Remove 'module.' prefix
    else:
        model_state = model.state_dict()
    
    torch.save(model_state, path)
    print(f"Model saved to {path}")

def load_model(model, path="crime_model.pth"):
    """
    Loads the trained model from a file and ensures compatibility.
    
    Args:
        model (torch.nn.Module): Model architecture (must be defined before calling this).
        path (str): Path to the saved model.

    Returns:
        torch.nn.Module: Model with loaded weights.
    """
    state_dict = torch.load(path, map_location=torch.device("cuda" if torch.cuda.is_available() else "cpu"))

    # Fix issue with "module." prefix in keys when loading DataParallel-trained models
    new_state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()}

    model.load_state_dict(new_state_dict)
    model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    model.eval()
    print(f"Model loaded from {path}")
    
    return model

# Example usage:
#save_model(model, "crime_model_cpu.pth")
model = load_model(model, "Crime_model2 (1).pth")


Model loaded from Crime_model2 (1).pth
