# Violence Detection Iteration-1

## Constant information to set the data and engineering resources

In [157]:
# Path information to load videos and annotations
import os

DATA_PATH = os.path.join(r"C:\Users\margo\OneDrive\UOC\projects\thesis", 'data', 'kranok-nv')
ANNOTATIONS_PATH = "Annotations"
VIDEOS_PATH = "Videos"
INFO_PATH = "Info"

DATASET_TRAIN_PATH = os.path.join(DATA_PATH, INFO_PATH, "violence_detection_train.hdf5")
DATASET_VAL_PATH = os.path.join(DATA_PATH, INFO_PATH, "violence_detection_val.hdf5")
DATASET_TEST_PATH = os.path.join(DATA_PATH, INFO_PATH, "violence_detection_test.hdf5")

# Dataset Description

* Folder Videos is composed by a list of videos by name: [v1, v2, v3]
* Folder Annotations is composed by a list of jsons by name: [v1, v2, v3]

The name of each video corresponds to the name of each annotation

The annotation json has the following structure:

```json
{
  "Frame_0000096": {
    "numberOfPeople": 1,
    "pedestriansData": [
      [
        "640",
        "90",
        "710",
        "198",
        "Normal"
      ]
    ]
  }
}
```

# Load/Extract data

I will create a function to extract the data information per each video-annotation pair that will contain the following structure

```
input: video-name
output: VideoInformation

class Label(enum):
    NORMAL: "Normal"
    VIOLENT: "Violent"

class PersonInfo:
    ax: int
    ay: int
    bx: int
    by: int
    # action of the person
    label: LabelEnum

class VideoInfo:
    frame_name: str
    n_people: int
    people_info: list[PersonInfo]
    label: Label

# dictionary of paris by frame_name, frame_numpy_array
frame_info: Dict[str, np.ndarray] = dict()
``` 

In [70]:
from enum import Enum

# class syntax
class Label(Enum):
    VIOLENT = "Violent"
    NORMAL = "Normal"

In [71]:
class PersonInfo:
    def __init__(self, ax: int, ay: int, bx: int, by: int, label: str):
        self.ax: int = ax
        self.ay: int = ay
        self.bx: int = bx
        self.by: int = by
        self.label: Label = Label(label)
        
    def __str__(self):
        return f"A: ({self.ax}, {self.ay}), B: ({self.bx}, {self.by}), Label: {self.label.value}"
            
    def __repr__(self):
        return f"A: ({self.ax}, {self.ay}), B: ({self.bx}, {self.by}), Label: {self.label.value}"

In [97]:
class VideoInfo:
    def __init__(self, category: Label, frame_name: str, n_people: int, people_info: list[PersonInfo]) -> None:
        self.frame_name: str = frame_name
        self.n_people: int = n_people
        self.people_info: List[PersonInfo] = people_info
        self.category = category
        self.label: int = 1 if category == Label.VIOLENT else 0

    
    def __str__(self):
        return f"Category: {self.category.value}, Frame: {self.frame_name}, Persons: {self.n_people}\n{[ppl for ppl in self.people_info]}\n"

    def __repr__(self):
        return f"Category: {self.category.value}, Frame: {self.frame_name}, Persons: {self.n_people}\n{[ppl for ppl in self.people_info]}\n"

In [98]:
# test videoInfo class
p = PersonInfo(0, 0, 10, 10, "Violent")
lp = [p]
vi = VideoInfo(Label.NORMAL, "Frame_0001324", 1, lp)
print(vi)
vi = VideoInfo(Label.VIOLENT, "Frame_0001324", 1, lp)
print(vi)


Category: Normal, Frame: Frame_0001324, Persons: 1
[A: (0, 0), B: (10, 10), Label: Violent]

Category: Violent, Frame: Frame_0001324, Persons: 1
[A: (0, 0), B: (10, 10), Label: Violent]


In [90]:
from typing import List, Dict
import json
import cv2
import numpy as np

def load_video_data(name: str, category: Label, target_size: tuple = (224, 224)) -> (List[VideoInfo], Dict[str, np.ndarray]):
    annotation_path = os.path.join(DATA_PATH, ANNOTATIONS_PATH, f"{name}.json")
    video_path = os.path.join(DATA_PATH, VIDEOS_PATH, f"{name}.mp4")
    video_infos = []
    frame_infos = {}

    with open(annotation_path) as f:
        data_list = json.load(f)
    
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error opening video: {video_path}")
        return video_infos, frame_infos
    
    for frame_name in data_list.keys():
        n_people = data_list[frame_name]["numberOfPeople"]
        pedestrians = data_list[frame_name]["pedestriansData"]
        people_info = []
        for ax, ay, bx, by, label in pedestrians:
            people_info.append(PersonInfo(ax, ay, bx, by, label))
        
        # Read the frame based on frame name convention: Frame_0000119
        frame_id = int(frame_name.split("_")[-1])
        ret, frame = cap.read(frame_id)  # Read frame by index
        
        # Normalize the frame
        frame = frame.astype(np.float32) / 255.0
        
        # Resize all frames to same dimension
        frame = cv2.resize(frame, dsize=target_size)

        # Convert to grayscale
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Include data in storage        
        frame_infos[frame_name] = frame
        video_infos.append(VideoInfo(category, frame_name, n_people, people_info))
    
    cap.release()
    return video_infos, frame_infos
        

# Test function
vi, fi = load_video_data("Normal_00001", Label.NORMAL)
print(vi) 
print(fi)

[Category: 0, Frame: Frame_0000096, Persons: 1
[A: (365, 35), B: (450, 162), Label: Normal]
, Category: 0, Frame: Frame_0000119, Persons: 1
[A: (356, 21), B: (429, 147), Label: Normal]
, Category: 0, Frame: Frame_0000142, Persons: 1
[A: (277, 22), B: (341, 145), Label: Normal]
, Category: 0, Frame: Frame_0000092, Persons: 1
[A: (368, 34), B: (454, 174), Label: Normal]
, Category: 0, Frame: Frame_0000055, Persons: 1
[A: (411, 68), B: (505, 216), Label: Normal]
, Category: 0, Frame: Frame_0000050, Persons: 1
[A: (428, 67), B: (516, 221), Label: Normal]
, Category: 0, Frame: Frame_0000037, Persons: 1
[A: (487, 86), B: (581, 265), Label: Normal]
, Category: 0, Frame: Frame_0000002, Persons: 1
[A: (733, 168), B: (840, 353), Label: Normal]
, Category: 0, Frame: Frame_0000079, Persons: 1
[A: (388, 42), B: (467, 183), Label: Normal]
, Category: 0, Frame: Frame_0000069, Persons: 1
[A: (389, 46), B: (492, 209), Label: Normal]
, Category: 0, Frame: Frame_0000071, Persons: 1
[A: (393, 50), B: (479

---

# Store Video Information

In [142]:
from typing import Tuple
import cv2
import os

# Define a dictionary to store frame data and category within HDF5 datasets
def create_hdf5_dataset(hdf5_file, name, shape: Tuple = (100, 224, 224, 1), dtype = np.float32):
    dataset = hdf5_file.create_dataset(name, shape, dtype=dtype)  # Assuming target size (224x224) and 3 channels (RGB)
    return dataset

In [163]:
# Function to process a single video and its annotation
def process_video(name, category, hdf5_file):    
    # Load Video Info and Frame Info
    video_infos, frame_infos = load_video_data(name, category)
    frames_dataset = create_hdf5_dataset(hdf5_file, f"{name}/frames", (len(frame_infos), 224, 224, 1))  # Create dataset for frames
    labels_dataset = create_hdf5_dataset(hdf5_file, f"{name}/labels", (len(frame_infos),), np.integer)  # Create dataset for labels

    # Iterate through processed frames
    frame_count = 0
    for frame_info, video_info in zip(frame_infos.items(), video_infos):
        frame_name, frame = frame_info
        frame = frame.reshape(224, 224, 1)  # Reshape to add a single channel for grayscale
        frame = frame.reshape(1, 224, 224, 1)  # Reshape again to add the first dimension for a single frame
        
        # Use indexing with dynamic dimension
        frames_dataset[frame_count] = frame
        labels_dataset[frame_count] = video_info.label
        frame_count += 1
    
    return frames_dataset, labels_dataset

In [164]:
def percentage_processed(l: Label, v_processed: int, n_processed: int, total_processed: int) -> float:
    if l == Label.VIOLENT:
        return v_processed / total_processed
    
    return n_processed / total_processed

def is_train_set(l: Label, v_processed: int, n_processed: int, total_processed: int) -> bool:
    return percentage_processed(l, v_processed, n_processed, total_processed) < 0.6

def is_val_set(l: Label, v_processed: int, n_processed: int, total_processed: int) -> bool:
    return percentage_processed(l, v_processed, n_processed, total_processed) < 0.8


In [165]:
import h5py

# Function to create HDF5 file (handles potential overwrite)
def create_hdf5_file(filepath):
    try:
        hdf5_file = h5py.File(filepath, "w")  # Try creating in write mode
        return hdf5_file
    except OSError:  # Handle potential overwrite error
        os.remove(filepath)  # Remove existing file
        print(f"Removed existing file: {filepath}")
        hdf5_file = h5py.File(filepath, "w")  # Retry creating
        return hdf5_file

In [191]:
import random

# Create empty HDF5 file for each data split (train, validation, test)
train_hdf5_file = create_hdf5_file(DATASET_TRAIN_PATH)
val_hdf5_file = create_hdf5_file(DATASET_VAL_PATH)
test_hdf5_file = create_hdf5_file(DATASET_TEST_PATH)
try:
    # Loop through videos and annotations, processing each and storing in appropriate HDF5 file
    videos = os.listdir(str(os.path.join(DATA_PATH, VIDEOS_PATH)))
    # remove this for final test
    videos = random.choices(videos, k=100)
    normal_processed = 0
    violent_processed = 0
    total = len(videos)
    for filename in videos:
        name, extension = os.path.splitext(filename)
        lab = Label.VIOLENT if  name.startswith(Label.VIOLENT.value) else Label.NORMAL
        # Decide on training/validation/testing set based on your split strategy
        if is_train_set(lab, violent_processed, normal_processed, total):  # Replace with your split logic
            process_video(name, lab, train_hdf5_file)
        elif is_val_set(lab, violent_processed, normal_processed, total):
            process_video(name, lab, val_hdf5_file)
        else:
            process_video(name, lab, test_hdf5_file)
        
        normal_processed += 1 * 1 if lab == Label.NORMAL else 0
        violent_processed += 1 * 1 if lab == Label.VIOLENT else 0
except Exception as e:
    print(e)

finally:
    # Close the HDF5 files after processing all videos
    train_hdf5_file.close()
    val_hdf5_file.close()
    test_hdf5_file.close()

print("Everything was ok")

Everything was ok


# Model-1: Simple CNN for Violence Detection
This is a basic CNN structure suitable for initial exploration. We'll break it down step-by-step:

## Input Layer:
Takes a single frame as input, assuming a shape of (224, 224, 1) (grayscale).

## Convolutional Layer:
Applies a set of filters (kernels) to the input frame to extract features.
Typical choices for the first layer could be:

* Number of filters: 32
* Kernel size: 3x3
* Activation function: ReLU (Rectified Linear Unit)

## Pooling Layer (Optional):
Reduces the dimensionality of the data extracted by the convolutional layer.
Options include MaxPooling or AveragePooling with a kernel size of 2x2 and a stride of 2.

## Flatten Layer:
Converts the output from the convolutional layers (usually a 3D array) into a 1D vector suitable for feeding into a fully-connected layer.

## Fully-Connected Layer:
Performs classification based on the extracted features.
I will use a single neuron with a sigmoid activation for binary classification (violence vs. non-violence).

In [198]:
# Execute this if you are not able to install tensorflow properly
!pip install tensorflow[and-cuda]





In [153]:
from tensorflow import keras

def get_basic_model():
    model = keras.Sequential([
      keras.layers.Conv2D(32, (3, 3), activation="relu", input_shape=(224, 224, 1)),  # Convolutional layer
      # Optional pooling layer (e.g., MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
      # Optional additional convolutional layer(s)
      keras.layers.Flatten(),  # Flatten layer
      keras.layers.Dense(1, activation="sigmoid")  # Fully-connected layer for binary classification
    ])
    
    return model


# Train the model

## Load the data

In [193]:
# Function to explore the datasets
def traverse_datasets(hdf_file):

    """Traverse all datasets across all groups in HDF5 file."""

    import h5py

    def h5py_dataset_iterator(g, prefix=''):
        for key in g.keys():
            item = g[key]
            path = '{}/{}'.format(prefix, key)
            if isinstance(item, h5py.Dataset): # test for dataset
                yield (path, item)
            elif isinstance(item, h5py.Group): # test for group (go down)
                yield from h5py_dataset_iterator(item, path)

    with h5py.File(hdf_file, 'r') as f:
        for (path, dset) in h5py_dataset_iterator(f):
            print(path, dset)

    return None

# Check content of dataset
print("Dataset Train")
traverse_datasets(DATASET_TRAIN_PATH)
print("Dataset Val")
traverse_datasets(DATASET_VAL_PATH)
print("Dataset Test")
traverse_datasets(DATASET_TEST_PATH)

Dataset Train
/Normal_00136/frames <HDF5 dataset "frames": shape (82, 224, 224, 1), type "<f4">
/Normal_00136/labels <HDF5 dataset "labels": shape (82,), type "<i4">
/Normal_00162/frames <HDF5 dataset "frames": shape (68, 224, 224, 1), type "<f4">
/Normal_00162/labels <HDF5 dataset "labels": shape (68,), type "<i4">
/Normal_00174/frames <HDF5 dataset "frames": shape (229, 224, 224, 1), type "<f4">
/Normal_00174/labels <HDF5 dataset "labels": shape (229,), type "<i4">
/Normal_00243/frames <HDF5 dataset "frames": shape (183, 224, 224, 1), type "<f4">
/Normal_00243/labels <HDF5 dataset "labels": shape (183,), type "<i4">
/Normal_00251/frames <HDF5 dataset "frames": shape (123, 224, 224, 1), type "<f4">
/Normal_00251/labels <HDF5 dataset "labels": shape (123,), type "<i4">
/Normal_00263/frames <HDF5 dataset "frames": shape (120, 224, 224, 1), type "<f4">
/Normal_00263/labels <HDF5 dataset "labels": shape (120,), type "<i4">
/Normal_00301/frames <HDF5 dataset "frames": shape (282, 224, 224,

In [194]:
import h5py

def load_data_from_hdf5(file_path):
  """Loads frames and labels from separate datasets within an HDF5 file.

  Args:
      file_path: The path to the HDF5 file containing the data.

  Returns:
      A tuple containing three NumPy arrays: frames, labels, video_names.
  """
  with h5py.File(file_path, "r") as hdf5_file:
    frames_list, labels_list, video_names = [], [], []
    for key in hdf5_file.keys():
      video_names.append(key)
      
      # Access frames and labels datasets
      frames_list.append(hdf5_file[f"{key}/frames"][:])
      labels_list.append(hdf5_file[f"{key}/labels"][:])
    
    # Concatenate frames and labels across all videos in the file
    frames = np.concatenate(frames_list, axis=0)
    labels = np.concatenate(labels_list, axis=0)

  return frames, labels, video_names

In [196]:
def train_model(model, X_train, y_train, X_val, y_val, epochs=10):
  """Trains a model on the provided data.

  Args:
      model: The Keras model to be trained.
      X_train: Training set frames.
      y_train: Training set labels.
      X_val: Validation set frames.
      y_val: Validation set labels.
      epochs: Number of training epochs (default 10).

  Returns:
      The trained model.
  """
  model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
  model.fit(X_train, y_train, epochs=epochs, validation_data=(X_val, y_val))

  return model

In [197]:
# Load data from HDF5
train_data = load_data_from_hdf5(DATASET_TRAIN_PATH)
val_data = load_data_from_hdf5(DATASET_VAL_PATH)
test_data = load_data_from_hdf5(DATASET_TEST_PATH)

# Unpack the data
X_train, y_train, train_video_names = train_data
X_val, y_val, val_video_names = val_data
X_test, y_test, test_video_names = test_data


# Train the model
model = get_basic_model()
trained_model = train_model(model, X_train, y_train, X_val, y_val, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
