In [None]:
# Importing essential libraries

import tensorflow as tf
import keras
from keras.layers import *
from keras.models import Sequential, load_model
from tensorflow.keras.utils import to_categorical, plot_model
from sklearn.model_selection import train_test_split

import os
import cv2
import math
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import datetime as dt
import shutil
import random

In [None]:
import seaborn as sns
sns.set_theme()



In [None]:
VIDEO_FRAME_HEIGHT = 64
VIDEO_FRAME_WIDTH = 64
import os 
FRAMES_IN_SEQUENCE = 16
 
DATASET_DIRECTORY = "./Dataset/Real Life Violence Dataset"
import os
current_directory = os.path.exists(DATASET_DIRECTORY)
print(f"Current Working Directory: {current_directory}")
CLASS_LABELS = ["NonViolence", "Violence"]

In [None]:
def extract_frames(video_path, sequence_length=FRAMES_IN_SEQUENCE, image_height=VIDEO_FRAME_HEIGHT, image_width=VIDEO_FRAME_WIDTH):
    """
    Extracts and processes frames from a video file.
    """
    frames_list = []
    video_reader = cv2.VideoCapture(video_path)
    
    # Get the total number of frames in the video.
    total_frames = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Calculate the interval at which to skip frames.
    skip_frames_window = max(int(total_frames / sequence_length), 1)
    
    # Iterate through the video frames.
    for frame_index in range(sequence_length):
        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_index * skip_frames_window)
        
        # Read the frame.
        success, frame = video_reader.read()
        
        # Check if the frame was read successfully. If not, break the loop.
        if not success:
            break
            
        # Resize the frame to the required height and width.
        resized_frame = cv2.resize(frame, (image_height, image_width))
        
        # Normalize the resized frame pixel values.
        normalized_frame = resized_frame / 255.0
        
        # Append the normalized frame to the frames list.
        frames_list.append(normalized_frame)
        
    video_reader.release()
    return frames_list

In [None]:
extract_frames(f"{DATASET_DIRECTORY}/NonViolence/NV_1.mp4")

In [None]:
def create_dataset():
    """
    Creates a dataset of features (frames) and labels from video files.
    """
    features = []
    labels = []
    video_files_paths = []

    for class_index, class_name in enumerate(CLASS_LABELS):
        print(f'Extracting Data of Class: {class_name}')
        
        # Get the list of video files in the class directory.
        class_dir = os.path.join(DATASET_DIRECTORY, class_name)
        files_list = os.listdir(class_dir)
        
        for file_name in files_list:
            video_file_path = os.path.join(class_dir, file_name)
            
            # Extract frames from the video.
            frames = extract_frames(video_file_path)
            
            # Ensure the video has the required number of frames (FRAMES_IN_SEQUENCE).
            if len(frames) == FRAMES_IN_SEQUENCE:
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)

    features = np.array(features)
    labels = np.array(labels)
    
    return features, labels, video_files_paths

In [None]:
# Create the dataset.
features, labels, video_files_paths = create_dataset()

In [None]:
# Saving the extracted data

np.save("features.npy",features)
np.save("labels.npy",labels)
np.save("video_files_paths.npy",video_files_paths)

In [None]:
features = np.load("features.npy")
labels = np.load("labels.npy")
video_files_paths = np.load("video_files_paths.npy")

In [None]:
# # convert labels into one-hot-encoded vectors
# one_hot_encoded_labels = to_categorical(labels)
# one_hot_encoded_labels

In [None]:
def get_files_and_labels():
    """
    Gets a list of all video file paths and their corresponding labels.
    This does NOT load the video data into memory.
    """
    video_files_paths = []
    labels = []

    for class_index, class_name in enumerate(CLASS_LABELS):
        print(f'Getting file paths for class: {class_name}')
        class_dir = os.path.join(DATASET_DIRECTORY, class_name)
        
        for file_name in os.listdir(class_dir):
            video_file_path = os.path.join(class_dir, file_name)
            video_files_paths.append(video_file_path)
            labels.append(class_index)
            
    return video_files_paths, labels

# 1. Get the list of all file paths and their corresponding integer labels.
all_video_paths, all_labels = get_files_and_labels()

# 2. Convert the integer labels to one-hot encoded vectors.
one_hot_encoded_labels = to_categorical(all_labels)

# 3. Split the file paths and their labels into training and test sets.
features_train, features_test, labels_train, labels_test = train_test_split(
    all_video_paths, 
    one_hot_encoded_labels,
    test_size=0.1,  # Using 10% for testing
    shuffle=True,
    random_state=42
)

print(f"Total videos for training: {len(features_train)}")
print(f"Total videos for testing: {len(features_test)}")

In [None]:
from keras.applications.mobilenet_v2 import MobileNetV2

mobilenet = MobileNetV2(include_top=False , weights="imagenet")

mobilenet.trainable=True
for layer in mobilenet.layers[:-100]:
  layer.trainable=False

In [None]:

def construct_model():
    model = Sequential()
    model.add(Input(shape=(FRAMES_IN_SEQUENCE, VIDEO_FRAME_HEIGHT, VIDEO_FRAME_WIDTH, 3)))
    model.add(TimeDistributed(mobilenet))
    model.add(Dropout(0.25))
    model.add(TimeDistributed(Flatten()))
    lstm_fw = LSTM(units=32)
    lstm_bw = LSTM(units=32, go_backwards=True)
    model.add(Bidirectional(lstm_fw, backward_layer=lstm_bw))
    dense_units = [256, 128, 64, 32]
    for units in dense_units:
        model.add(Dense(units, activation='relu'))
        model.add(Dropout(0.25))
    model.add(Dense(len(CLASS_LABELS), activation='softmax'))
    model.summary()
    return model

In [None]:
# Constructing the Model

my_model = construct_model()

In [None]:
from tensorflow.keras.utils import Sequence
import math

class VideoDataGenerator(Sequence):
    """
    Custom Keras data generator for loading video data on the fly.
    """
    def __init__(self, video_paths, labels, batch_size, sequence_length, image_dims):
        self.video_paths = video_paths
        self.labels = labels
        self.batch_size = batch_size
        self.sequence_length = sequence_length
        self.image_height, self.image_width = image_dims

    def __len__(self):
        # Denotes the number of batches per epoch.
        return math.ceil(len(self.video_paths) / self.batch_size)

    def __getitem__(self, index):
        # Generate one batch of data.
        start_index = index * self.batch_size
        end_index = (index + 1) * self.batch_size
        batch_paths = self.video_paths[start_index:end_index]
        batch_labels = self.labels[start_index:end_index]

        # Initialize batch data.
        batch_features = np.zeros((len(batch_paths), self.sequence_length, self.image_height, self.image_width, 3), dtype=np.float32)

        for i, path in enumerate(batch_paths):
            # Use the extract_frames function you wrote earlier.
            frames = extract_frames(path, self.sequence_length, self.image_height, self.image_width)
            if len(frames) == self.sequence_length:
                batch_features[i] = frames

        return batch_features, np.array(batch_labels)

In [None]:
from keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Set the batch size.
BATCH_SIZE = 8

# Create instances of your data generator for training and validation.
train_generator = VideoDataGenerator(
    video_paths=features_train,
    labels=labels_train,
    batch_size=BATCH_SIZE,
    sequence_length=FRAMES_IN_SEQUENCE,
    image_dims=(VIDEO_FRAME_HEIGHT, VIDEO_FRAME_WIDTH)
)

val_generator = VideoDataGenerator(
    video_paths=features_test,
    labels=labels_test,
    batch_size=BATCH_SIZE,
    sequence_length=FRAMES_IN_SEQUENCE,
    image_dims=(VIDEO_FRAME_HEIGHT, VIDEO_FRAME_WIDTH)
)

# --- Model Compiling and Callbacks (same as before) ---
early_stopping_callback = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.000001, verbose=1)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
my_model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

# --- THE CORRECTED MODEL.FIT() CALL ---
print("Starting model training with the custom data generator...")
MobBiLSTM_model_history = my_model.fit(
    train_generator,
    epochs=50,
    validation_data=val_generator,
    callbacks=[early_stopping_callback, reduce_lr]
)
print("Model training finished.")

In [None]:
# BATCH_SIZE = 4

# train_dataset = tf.data.Dataset.from_tensor_slices((features_train, labels_train))
# train_dataset = train_dataset.shuffle(buffer_size=len(features_train)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# val_dataset = tf.data.Dataset.from_tensor_slices((features_test, labels_test))
# val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
# from keras.callbacks import EarlyStopping, ReduceLROnPlateau

# early_stopping_callback = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True)
# reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.000001, verbose=1)

# optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

# my_model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])


# # --- THE CORRECTED MODEL.FIT() CALL ---
# print("Starting model training with tf.data.Dataset...")
# MobBiLSTM_model_history = my_model.fit(
#     train_dataset,
#     epochs=50,
#     validation_data=val_dataset,
#     callbacks=[early_stopping_callback, reduce_lr]
# )
# print("Model training finished.")

In [None]:
model_evaluation_history = my_model.evaluate(features_test, labels_test)

In [None]:
my_model.save('Model.h5')

In [None]:
my_model = load_model('Model.h5')
my_model.summary()

In [None]:
def predict_video_class(video_file_path, SEQUENCE_LENGTH=16):
    try:
        video_reader = cv2.VideoCapture(video_file_path)
        if not video_reader.isOpened():
            print("Error: Unable to open video file.")
            return
        video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
        frames_list = []
        skip_frames_window = max(int(video_frames_count / SEQUENCE_LENGTH), 1)
        for frame_counter in range(SEQUENCE_LENGTH):
            video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
            frame_found, frame = video_reader.read()
            if not frame_found:
                print("Error: Unable to read frame from video.")
                return
            resized_frame = cv2.resize(frame, (VIDEO_FRAME_HEIGHT, VIDEO_FRAME_WIDTH))
            normalized_frame = resized_frame / 255
            frames_list.append(normalized_frame)
        predicted_labels_probabilities = my_model.predict(np.expand_dims(frames_list, axis=0))[0]
        predicted_label_index = np.argmax(predicted_labels_probabilities)
        predicted_class_name = CLASS_LABELS[predicted_label_index]
        prediction_confidence = predicted_labels_probabilities[predicted_label_index]
        return (predicted_class_name, prediction_confidence)
    except Exception as e:
        print(f"An error occurred: {str(e)}")
    finally:
        if video_reader:
            video_reader.release()


In [None]:
random_class = random.choice(CLASS_LABELS)
path = os.path.join(DATASET_DIRECTORY, random_class)
random_video = random.choice(os.listdir(path))

# Specifying video to be predicted
input_video_file_path = os.path.join(path, random_video)

# Perform Single Prediction on the Test Video.
predicted_class_name, prediction_confidence = predict_video_class(input_video_file_path, FRAMES_IN_SEQUENCE)

# Output
print(f'Predicted Class: {predicted_class_name}')
print(f'Confidence: {prediction_confidence}')

print("Prediction is",predicted_class_name == random_class)

print(f"\nFor Referene: Choosen Video = {random_video}\nPath: \'{input_video_file_path}\'")