In [None]:
import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, TimeDistributed, Bidirectional, LSTM,
    Dense, Dropout, BatchNormalization
)
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

In [None]:
# Configs
VIOLENCE_PATH = "data/Violence"
NONVIOLENCE_PATH = "data/NonViolence"
SEQUENCE_LENGTH = 16
IMG_HEIGHT = 224  # MobileNetV2 default input size
IMG_WIDTH = 224
CHANNELS = 3
BATCH_SIZE = 32
EPOCHS = 50

### Data Loading and Preprocessing

In [None]:
def extract_frames(video_path):
    frames = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    indices = np.linspace(0, total_frames - 1, SEQUENCE_LENGTH, dtype=int)

    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            resized_frame = cv2.resize(frame, (IMG_WIDTH, IMG_HEIGHT))
            normalized_frame = resized_frame / 255.0
            frames.append(normalized_frame)
        else:
            # In case of reading error, append a black frame
            frames.append(np.zeros((IMG_HEIGHT, IMG_WIDTH, CHANNELS)))

    cap.release()
    return np.array(frames)

def load_dataset(violence_path, nonviolence_path):
    sequences, labels = [], []
    class_map = {"Violence": 1, "NonViolence": 0}

    for class_name, label in class_map.items():
        path = violence_path if class_name == "Violence" else nonviolence_path
        if not os.path.exists(path):
            continue

        video_files = [f for f in os.listdir(path) if f.endswith(('.mp4', '.avi', '.mov'))]
        for video in tqdm(video_files, desc=f"Loading {class_name}"):
            video_path = os.path.join(path, video)
            frames = extract_frames(video_path)
            if frames is not None and len(frames) == SEQUENCE_LENGTH:
                sequences.append(frames)
                labels.append(label)

    if not sequences:
        raise ValueError("No videos found. Please check dataset paths.")

    return np.array(sequences), np.array(labels)

### Model Architecture

In [None]:
def build_model(sequence_length, img_height, img_width, channels):
    input_shape = (sequence_length, img_height, img_width, channels)

    # CNN Feature Extractor (MobileNetV2)
    base_cnn = MobileNetV2(
        weights='imagenet',
        include_top=False,
        input_shape=(img_height, img_width, channels),
        pooling='avg'
    )
    base_cnn.trainable = False

    # Model Definition
    model_input = Input(shape=input_shape)
    cnn_features = TimeDistributed(base_cnn)(model_input)

    # BiLSTM for Temporal Analysis
    bilstm_out = Bidirectional(LSTM(128))(cnn_features)

    # Dense Layers and Regularization
    x = Dense(64, activation='relu')(bilstm_out)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)

    # Output Layer
    model_output = Dense(2, activation='softmax')(x)

    model = Model(inputs=model_input, outputs=model_output)
    return model

### Training and Evaluation

In [None]:
def train_model(model, X_train, y_train, X_val, y_val):
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    ]

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        callbacks=callbacks,
        verbose=1
    )
    return history

def evaluate_model(model, X_test, y_test):
    y_prob = model.predict(X_test)
    y_pred = np.argmax(y_prob, axis=1)
    y_true = np.argmax(y_test, axis=1)
    target_names = ["Non-Violence", "Violence"]

    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=target_names))

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=target_names, yticklabels=target_names)
    plt.title('Confusion Matrix')
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    plt.show()

def plot_training_history(history):
    metrics = ['loss', 'accuracy', 'precision', 'recall', 'f1_score']
    plt.figure(figsize=(14, 10))

    for i, metric in enumerate(metrics):
        plt.subplot(3, 2, i + 1)
        plt.plot(history.history[metric], label=f'Train {metric.capitalize()}')
        plt.plot(history.history[f'val_{metric}'], label=f'Validation {metric.capitalize()}')
        plt.title(f'{metric.capitalize()} Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel(metric.capitalize())
        plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
if __name__ == "__main__":
    # Load and prepare the dataset
    X, y = load_dataset(VIOLENCE_PATH, NONVIOLENCE_PATH)
    y_one_hot = to_categorical(y, num_classes=2)

    # Split the data: 80% for training + validation and 20% for testing
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_one_hot, test_size=0.2, stratify=y_one_hot, random_state=42
    )
    # From the 80% training set, take 20% for validation (which is 16% of the full dataset)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.2, stratify=y_train, random_state=42
    )

    # Build the model
    model = build_model(SEQUENCE_LENGTH, IMG_HEIGHT, IMG_WIDTH, CHANNELS)

    # Compile the model
    optimizer = Adam(learning_rate=0.0001)
    metrics = [
        'accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
        tf.keras.metrics.F1Score(name='f1_score', average='macro')
    ]
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=metrics
    )
    model.summary()

    # Train the model
    history = train_model(model, X_train, y_train, X_val, y_val)

    # Evaluate the model
    evaluate_model(model, X_test, y_test)

    # Plot training metrics
    plot_training_history(history)

    # Save the final model
    os.makedirs("results", exist_ok=True)
    model.save("results/violence_detection_model.h5")
    print("\nModel saved to results/violence_detection_model.h5")

Loading Violence: 100%|██████████| 1000/1000 [06:55<00:00,  2.40it/s]
Loading Non-Violence: 100%|██████████| 1000/1000 [03:12<00:00,  5.19it/s]


Epoch 1/50
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2s/step - accuracy: 0.6051 - loss: 0.6583 - precision: 0.6022 - recall: 0.7085