# Violence Detection in Videos

This notebook uses the trained MobileNetV2 model to detect violence in videos. It processes videos frame by frame and outputs a video with labeled predictions.


Run extract_frames_from_video.ipynb then run mobilenetv2-model.ipynb then run predict_video.ipynb.

## Import Required Libraries

In [1]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import time
from keras.models import load_model
from collections import deque
from tqdm import tqdm

# Check OpenCV version
print(f"OpenCV version: {cv2.__version__}")

OpenCV version: 4.6.0


In [None]:
# Model hyperparameters (matching mobilenetv2_gru_model.py)
IMG_SIZE = 224
SEQUENCE_LENGTH = 16

## Setup Directory Structure

In [2]:
# Create necessary directories
def ensure_dir(directory):
    """Create directory if it doesn't exist"""
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"Created directory: {directory}")
    else:
        print(f"Directory exists: {directory}")

# Create input and output directories
ensure_dir('./input')
ensure_dir('./output')

Directory exists: ./input
Directory exists: ./output


## Load the Trained Model

In [3]:
def load_violence_detection_model(model_path='./model/model.h5'):
    """Load the trained violence detection model"""
    if not os.path.exists(model_path):
        print(f"Error: Model file not found at {model_path}")
        print("Please run mobilenetv2-model.ipynb first to train the model.")
        return None
    
    print(f"Loading model from {model_path}...")
    model = load_model(model_path)
    print("Model loaded successfully!")
    return model

# Load the model
model = load_violence_detection_model()

Loading model from ./model/model.h5...
Model loaded successfully!


## Video Processing Function

In [4]:
def process_video(video_path, output_path=None, display=True):
    """Process a video for violence detection
    
    Args:
        video_path (str): Path to input video
        output_path (str, optional): Path to save output video. If None, use default output path.
        display (bool): Whether to display frames during processing
    
    Returns:
        str: Path to output video
    """
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return None
    
    # If no output path is provided, create one in the output directory
    if output_path is None:
        video_name = os.path.basename(video_path)
        output_path = os.path.join('./output', f"processed_{video_name}")
    
    print(f"Processing video: {video_path}")
    print(f"Output will be saved to: {output_path}")
    
    # Load the model if not already loaded
    global model
    if model is None:
        model = load_violence_detection_model()
        if model is None:
            return None
    
    # Initialize deque for prediction averaging
    Q = deque(maxlen=32)  # Smaller queue for faster response
    
    # Open video capture
    cap = cv2.VideoCapture(video_path)
    
    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")
    
    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Use mp4v codec
    writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Process frames
    frame_count = 0
    start_time = time.time()
    
    # Create a progress bar
    pbar = tqdm(total=total_frames)
    
    while True:
        # Read frame
        ret, frame = cap.read()
        
        # If frame reading failed, break
        if not ret:
            break
        
        # Clone frame for output
        output_frame = frame.copy()
        
        # Preprocess frame for model input
        # Convert to RGB (model was trained on RGB)
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # Resize frame
        resized_frame = cv2.resize(rgb_frame, (128, 128)).astype("float32")
        # Normalize pixel values
        normalized_frame = resized_frame / 255.0
        
        # Make prediction
        preds = model.predict(np.expand_dims(normalized_frame, axis=0), verbose=0)[0]
        Q.append(preds)
        
        # Average predictions over the queue
        results = np.array(Q).mean(axis=0)
        
        # Get label and confidence
        is_violence = results[0] > 0.5
        confidence = results[0] if is_violence else 1 - results[0]
        
        # Set text color based on prediction
        text_color = (0, 0, 255) if is_violence else (0, 255, 0)  # Red for violence, green for non-violence
        
        # Create text for display
        label_text = "Violence" if is_violence else "Non-Violence"
        text = f"{label_text}: {confidence:.2f}"
        
        # Add text to frame
        cv2.putText(output_frame, text, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 
                    1.0, text_color, 3, cv2.LINE_AA)
        
        # Add a visual indicator (colored rectangle at top of frame)
        indicator_color = (0, 0, 255) if is_violence else (0, 255, 0)  # Red or Green
        cv2.rectangle(output_frame, (0, 0), (width, 20), indicator_color, -1)
        
        # Write frame to output video
        writer.write(output_frame)
        
        # Display frame if requested
        if display and frame_count % 5 == 0:  # Show every 5th frame to speed up processing
            cv2.imshow('Violence Detection', output_frame)
            
            # Break if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        # Update progress bar
        pbar.update(1)
        frame_count += 1
    
    # Clean up
    cap.release()
    writer.release()
    if display:
        cv2.destroyAllWindows()
    
    # Close progress bar
    pbar.close()
    
    # Print processing stats
    elapsed_time = time.time() - start_time
    print(f"Processed {frame_count} frames in {elapsed_time:.2f} seconds")
    print(f"Average FPS: {frame_count / elapsed_time:.2f}")
    print(f"Output saved to: {output_path}")
    
    return output_path

## Process Sample Videos

In [5]:
# List available videos in input directory
def list_input_videos():
    """List all video files in the input directory"""
    if not os.path.exists('./input'):
        print("Input directory does not exist.")
        return []
    
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
    videos = []
    
    for file in os.listdir('./input'):
        if any(file.lower().endswith(ext) for ext in video_extensions):
            videos.append(file)
    
    return videos

# List available videos
input_videos = list_input_videos()
if input_videos:
    print("Available videos in input directory:")
    for i, video in enumerate(input_videos):
        print(f"{i+1}. {video}")
else:
    print("No videos found in input directory.")
    print("Please place some video files in the './input' directory.")

Available videos in input directory:
1. Suntukan (street fight).mp4


In [6]:
# Process a sample video from the input directory
# Change the index to process a different video
video_index = 0  # Process the first video in the list

if input_videos and video_index < len(input_videos):
    video_name = input_videos[video_index]
    video_path = os.path.join('./input', video_name)
    process_video(video_path)
else:
    print("No video available to process or invalid index.")
    print("Please place videos in the './input' directory.")

Processing video: ./input\Suntukan (street fight).mp4
Output will be saved to: ./output\processed_Suntukan (street fight).mp4
Video properties: 640x360, 30.0 FPS, 2833 frames


100%|█████████▉| 2831/2833 [03:18<00:00, 14.24it/s]

Processed 2831 frames in 198.80 seconds
Average FPS: 14.24
Output saved to: ./output\processed_Suntukan (street fight).mp4





## Process Sample Videos from Training Set

In [7]:
# Process a sample from the training videos (if available)
def process_sample_from_dataset():
    """Process sample videos from the training dataset"""
    # Check for Violence videos
    violence_path = './video/Violence'
    nonviolence_path = './video/NonViolence'
    
    if os.path.exists(violence_path) and os.listdir(violence_path):
        # Process a violence video
        violence_videos = [f for f in os.listdir(violence_path) if f.endswith('.mp4')]
        if violence_videos:
            sample_video = os.path.join(violence_path, violence_videos[0])
            print(f"Processing sample violence video: {sample_video}")
            process_video(sample_video)
    
    if os.path.exists(nonviolence_path) and os.listdir(nonviolence_path):
        # Process a non-violence video
        nonviolence_videos = [f for f in os.listdir(nonviolence_path) if f.endswith('.mp4')]
        if nonviolence_videos:
            sample_video = os.path.join(nonviolence_path, nonviolence_videos[0])
            print(f"Processing sample non-violence video: {sample_video}")
            process_video(sample_video)

# Uncomment the following line to process samples from the dataset
process_sample_from_dataset()

Processing sample violence video: ./video/Violence\V_1.mp4
Processing video: ./video/Violence\V_1.mp4
Output will be saved to: ./output\processed_V_1.mp4
Video properties: 1920x1080, 15.416159218092405 FPS, 103 frames


100%|██████████| 103/103 [00:09<00:00, 11.16it/s]


Processed 103 frames in 9.23 seconds
Average FPS: 11.16
Output saved to: ./output\processed_V_1.mp4
Processing sample non-violence video: ./video/NonViolence\NV_1.mp4
Processing video: ./video/NonViolence\NV_1.mp4
Output will be saved to: ./output\processed_NV_1.mp4
Video properties: 1920x1080, 24.900460409512974 FPS, 72 frames


 92%|█████████▏| 66/72 [00:05<00:00, 11.29it/s]

Processed 66 frames in 5.85 seconds
Average FPS: 11.29
Output saved to: ./output\processed_NV_1.mp4





## Test on Custom Video

In [8]:
# Function to process any video specified by path
def process_custom_video(video_path):
    """Process a video file specified by path"""
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return
    
    process_video(video_path)

# Example usage:
#process_custom_video('/path/to/your/video.mp4')

## Display Processed Video

In [9]:
def play_video(video_path, delay=25):
    """Play a video file in the notebook
    
    Args:
        video_path (str): Path to video file
        delay (int): Delay between frames in milliseconds
    """
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    
    # Check if opened successfully
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return
    
    # Read until video is completed
    while cap.isOpened():
        # Capture frame-by-frame
        ret, frame = cap.read()
        if not ret:
            break
        
        # Display frame
        cv2.imshow('Video', frame)
        
        # Press Q on keyboard to exit
        if cv2.waitKey(delay) & 0xFF == ord('q'):
            break
    
    # Release resources
    cap.release()
    cv2.destroyAllWindows()

# List processed videos
def list_processed_videos():
    """List all processed videos in the output directory"""
    if not os.path.exists('./output'):
        print("Output directory does not exist.")
        return []
    
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
    videos = []
    
    for file in os.listdir('./output'):
        if any(file.lower().endswith(ext) for ext in video_extensions):
            videos.append(file)
    
    return videos

# List available processed videos
processed_videos = list_processed_videos()
if processed_videos:
    print("Available processed videos:")
    for i, video in enumerate(processed_videos):
        print(f"{i+1}. {video}")
else:
    print("No processed videos found in output directory.")

# Play a processed video (uncomment and set the correct index)
# video_index = 0  # Play the first video in the list
# if processed_videos and video_index < len(processed_videos):
#     video_name = processed_videos[video_index]
#     video_path = os.path.join('./output', video_name)
#     play_video(video_path)
# else:
#     print("No processed video available to play or invalid index.")

Available processed videos:
1. processed_NV_1.mp4
2. processed_Suntukan (street fight).mp4
3. processed_V_1.mp4
