# Action Detection using mobilenet

This project use mobilenet as the core model.

What you need to do to run this is to run the:
1. import cell
2. Video_Play function
3. Constant cell
4. save_model() and load_model()
5. load_model("model")
6. predict_frame()
7. predict_webcam()
8. Inference cell

In [None]:
# This is the entry point for the package. It's called by the installer and runs the packages
%%bash
pip install tensorflow
pip install keras
pip install opencv-python
pip install scikit-learn
pip install matplotlib
pip install seaborn
pip install colorama

In [4]:
import os
import shutil
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow
import keras
from collections import deque
import matplotlib.pyplot as plt
# plt.style.use("seaborn")

%matplotlib inline

from sklearn.model_selection import train_test_split

from keras.layers import *
from keras.models import Sequential
from tensorflow.keras.utils import to_categorical
from keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model

2024-02-23 15:55:50.331332: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-02-23 15:55:50.517750: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-02-23 15:55:50.517849: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-02-23 15:55:50.547633: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-02-23 15:55:50.617093: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-02-23 15:55:50.618513: I tensorflow/core/platform/cpu_feature_guard.cc:1

In [None]:
def Video_Play(filepath):
    """
     Plays a video file and waits for user input. This is a blocking function so it should be called from a thread
     
     Args:
     	 filepath: Path to the video
    """
    cap = cv2.VideoCapture(filepath)

    # Open video file if the video file is open
    if (cap.isOpened == False):
        print("Error opening video file")
    
    # Read the current cap from the cap stream and display it.
    while (cap.isOpened()):
        ret, frame = cap.read()
        # if ret is True cv2. imshow frame frame
        if ret == True:
            cv2.imshow('Frame', frame)

            # Wait for a key to be pressed.
            if cv2.waitKey(25) & 0xFF == ord('q'):
                break
        else:
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# Retrieve and list all the video files present in the Classes Directory. Randomly select a video file
# Classes Directories
NonViolnceVideos_Dir = "real-life-violence-situations-dataset/Real Life Violence Dataset/NonViolence/"
ViolnceVideos_Dir = "real-life-violence-situations-dataset/Real Life Violence Dataset/Violence/"

# Retrieve the list of all the video files present in the Class Directory.
NonViolence_files_names_list = os.listdir(NonViolnceVideos_Dir)
Violence_files_names_list = os.listdir(ViolnceVideos_Dir)

# Randomly select a video file from the Classes Directory.
Random_NonViolence_Video = random.choice(NonViolence_files_names_list)
Random_Violence_Video = random.choice(Violence_files_names_list)

In [None]:
Video_Play(f"{NonViolnceVideos_Dir}/{Random_NonViolence_Video}")

In [6]:
# Constant

IMAGE_HEIGHT, IMAGE_WIDTH = 64,64
SEQUENCE_LENGTH = 16

DATASET_DIR = "real-life-violence-situations-dataset/Real Life Violence Dataset/"
CLASSES_LIST = ["NonViolence", "Violence"]

In [None]:
def frames_extraction(video_path):
    """
    Extracts frames from Video File and returns them as a list. The list is sorted by frame number and the frames are normalized to the width and height
    
    Args:
        video_path: Path to the Video File
    
    Returns: 
        List of frames in the Video File ( Image format ) as a list of NumPy arrays ( grayscale
    """

    frames_list = []

    # Read the Video File
    video_reader = cv2.VideoCapture(video_path)

    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)

    # Iterate through the Video Frames.
    # Reads the next frame from the video.
    for frame_counter in range(SEQUENCE_LENGTH):

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        # Reading the frame from the video.
        success, frame = video_reader.read()

        # If success is true break the loop until the next call to this function is successful.
        if not success:
            break

        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Normalize the resized frame
        normalized_frame = resized_frame / 255

        # Append the normalized frame into the frames list
        frames_list.append(normalized_frame)


    video_reader.release()

    return frames_list

In [None]:
def create_dataset():
    """
    Creates and returns a dataset of repectivities. This is a function that takes as input a list of video files and extracts the frames of each video file.
    
    
    Returns: 
        A list of features and a list of labels for each video file that was extracted and the path to the video
    """

    features = []
    labels = []
    video_files_paths = []

    # Iterating through all the classes.
    # Extract the data of the data of a specific class.
    for class_index, class_name in enumerate(CLASSES_LIST):

        print(f'Extracting Data of Class: {class_name}')

        # Get the list of video files present in the specific class name directory.
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))

        # Iterate through all the files present in the files list.
        # Extract the frames of the video files from the files_list and add them to the features and labels.
        for file_name in files_list:

            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)

            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path)

            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified.
            # So ignore the vides having frames less than the SEQUENCE_LENGTH.
            # Append the data to the features labels labels and video files paths.
            if len(frames) == SEQUENCE_LENGTH:

                # Append the data to their repective lists.
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)

    features = np.asarray(features)
    labels = np.array(labels)

    return features, labels, video_files_paths

In [None]:
# Creates and saves a dataset. This is a convenience function for use with scipy. ndimage. catalyst_util
features, labels, video_file_paths = create_dataset()

np.save("features.npy", features)
np.save("labels.npy", labels)
np.save("video_files_paths.npy", video_file_paths)

In [None]:
# Load data from disk and return features labels and video_files_paths. This is called by a function that is in charge of loading the data
features, labels, video_files_paths = np.load("features.npy") , np.load("labels.npy") , np.load("video_files_paths.npy")

In [None]:
# convert labels into one-hot-encoded vectors
one_hot_encoded_labels = to_categorical(labels)

# Split the Data into Train ( 90% ) and Test Set ( 10% ).
features_train, features_test, labels_train, labels_test = train_test_split(features, one_hot_encoded_labels, test_size = 0.1,
                                                                            shuffle = True, random_state = 42)

print(features_train.shape,labels_train.shape )
print(features_test.shape, labels_test.shape)

In [None]:
# This is a hack to make it easier to use Mobilenet's built - in version
from keras.applications.mobilenet_v2 import MobileNetV2

mobilenet = MobileNetV2( include_top=False , weights="imagenet")

#Fine-Tuning to make the last 40 layer trainable
mobilenet.trainable=True

for layer in mobilenet.layers[:-40]:
  layer.trainable=False

mobilenet.summary()

In [None]:
def create_model():
    """
    Creates and returns a Sequential model that can be used to train the model. This is the first step in the training process.
    
    
    Returns: 
        A : class : ` neural. Model ` instance that can be used to train the model. Note that the model is created in the following way
    """

    model = Sequential()

    ########################################################################################################################

    #Specifying Input to match features shape
    model.add(Input(shape = (SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3)))

    # Passing mobilenet in the TimeDistributed layer to handle the sequence
    model.add(TimeDistributed(mobilenet))

    model.add(Dropout(0.25))

    model.add(TimeDistributed(Flatten()))


    lstm_fw = LSTM(units=32)
    lstm_bw = LSTM(units=32, go_backwards = True)

    model.add(Bidirectional(lstm_fw, backward_layer = lstm_bw))

    model.add(Dropout(0.25))

    model.add(Dense(256,activation='relu'))
    model.add(Dropout(0.25))

    model.add(Dense(128,activation='relu'))
    model.add(Dropout(0.25))

    model.add(Dense(64,activation='relu'))
    model.add(Dropout(0.25))

    model.add(Dense(32,activation='relu'))
    model.add(Dropout(0.25))


    model.add(Dense(len(CLASSES_LIST), activation = 'softmax'))

    ########################################################################################################################
    early_stopping_callback = EarlyStopping(monitor= 'val_accuracy', patience = 10, restore_best_weights=True)
    reduce_lr = tensorflow.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.6, patience=5,
                                                             min_lr=0.00005, verbose=1)


    model.compile(loss = 'categorical_crossentropy', optimizer = 'sgd', metrics=["accuracy"])

    model.fit(x=features_train, y=labels_train, epochs = 50, batch_size=8, shuffle=True, validation_split=0.2, callbacks=[early_stopping_callback, reduce_lr])
    model.summary()

    return model

In [21]:
from tensorflow.keras.models import model_from_json

def save_model(model, model_name):
    """
     Save model to disk. This will save the model's JSON and HDF5 file as well as the weights.
     
     Args:
     	 model: The model to save. Must have a : py : class : ` ~gensim. models. BaseNeuralModel ` class.
     	 model_name: The name of the model to save
    """
    model_json = model.to_json()
    with open(model_name + ".json", "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model.save_weights(model_name + ".h5")
    print("Saved model to disk")

def load_model(model_name):
    """
     Loads a Keras model from disk. This is a convenience function for loading a Keras model from disk.
     
     Args:
     	 model_name: The name of the model to load.
     
     Returns: 
     	 A Keras model that was loaded from disk or None if the model couldn't be loaded ( in which case an error is logged
    """
    with open(model_name + ".json", "r") as json_file:
        loaded_model_json = json_file.read()
    loaded_model = model_from_json(loaded_model_json)

    loaded_model.load_weights("model.h5")
    print("Loaded model from disk")

    return loaded_model

In [None]:
# Constructing the Model
MoBiLSTM_model = create_model()

# Plot the structure of the contructed LRCN model.
# plot_model(MoBiLSTM_model, to_file = 'MobBiLSTM_model_structure_plot.png', show_shapes = True, show_layer_names = True)

save_model(MoBiLSTM_model, "model")

In [22]:
MoBiLSTM_model = load_model("model")

Loaded model from disk


In [None]:
# This function computes the metagenomic consensus matrix for the predicting features. In this case we have a confusion matrix that is based on the accuracy_score function
labels_predict = MoBiLSTM_model.predict(features_test)

# Decoding the data to use in Metrics
labels_predict = np.argmax(labels_predict , axis=1)
labels_test_normal = np.argmax(labels_test , axis=1)

from sklearn.metrics import accuracy_score
AccScore = accuracy_score(labels_predict, labels_test_normal)
print('Accuracy Score is : ', AccScore)

import seaborn as sns
from sklearn.metrics import confusion_matrix

ax= plt.subplot()
cm=confusion_matrix(labels_test_normal, labels_predict)
sns.heatmap(cm, annot=True, fmt='g', ax=ax);

ax.set_xlabel('Predicted labels');ax.set_ylabel('True labels');
ax.set_title('Confusion Matrix');
ax.xaxis.set_ticklabels(['True', 'False']); ax.yaxis.set_ticklabels(['NonViolence', 'Violence']);

from sklearn.metrics import classification_report

ClassificationReport = classification_report(labels_test_normal,labels_predict)
print('Classification Report is : \n', ClassificationReport)

In [8]:
from colorama import Fore

def predict_frames(video_file_path, output_file_path, SEQUENCE_LENGTH):
    """
    Predict frames in a video and save them in a file. This function is called by the predict_frames function of the Cairo class.
    
    Args:
        video_file_path: Path to the video file to be predicted
        output_file_path: Path to the output video
        SEQUENCE_LENGTH: Length of the sequence that will be
    """

    # Read from the video file.
    video_reader = cv2.VideoCapture(video_file_path)

    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # VideoWriter to store the output video in the disk.
    video_writer = cv2.VideoWriter(output_file_path, cv2.VideoWriter_fourcc('m', 'p', '4', 'v'),
                                    video_reader.get(cv2.CAP_PROP_FPS), (original_video_width, original_video_height))

    # Declare a queue to store video frames.
    frames_queue = deque(maxlen = SEQUENCE_LENGTH)

    # Store the predicted class in the video.
    predicted_class_name = ''

    # Initialize counters for violence and non-violence
    violence_count = 0
    non_violence_count = 0

    # Iterate until the video is accessed successfully.
    # This method reads the next video frame from the video_reader.
    while video_reader.isOpened():

        ok, frame = video_reader.read()

        # If the user is not ok break the loop.
        if not ok:
            break

        # Resize the Frame to fixed Dimensions.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Normalize the resized frame
        normalized_frame = resized_frame / 255

        # Appending the pre-processed frame into the frames list.
        frames_queue.append(normalized_frame)

        # We Need at Least number of SEQUENCE_LENGTH Frames to perform a prediction.
        # Check if the number of frames in the queue are equal to the fixed sequence length.
        # Predicts the class probabilities from the given frames queue.
        if len(frames_queue) == SEQUENCE_LENGTH:

            # Pass the normalized frames to the model and get the predicted probabilities.
            predicted_labels_probabilities = MoBiLSTM_model.predict(np.expand_dims(frames_queue, axis = 0))[0]

            # Get the index of class with highest probability.
            predicted_label = np.argmax(predicted_labels_probabilities)

            # Get the class name using the retrieved index.
            predicted_class_name = CLASSES_LIST[predicted_label]

        # Write predicted class name on top of the frame.
        # This function is called by the Fore.
        if predicted_class_name == "Violence":
            cv2.putText(frame, predicted_class_name, (5, 100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 0, 255), 12)
            print(Fore.RED + predicted_class_name)
            violence_count += 1
        else:
            cv2.putText(frame, predicted_class_name, (5, 100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 255, 0), 12)
            print(Fore.GREEN + predicted_class_name)
            non_violence_count += 1

        # Write The frame into the disk using the VideoWriter
        video_writer.write(frame)

    # Print out the violations and non violations.
    if violence_count >= non_violence_count:
        print(Fore.WHITE + "This action is " + Fore.RED + "[Violence]")
    else:
        print(Fore.WHITE + "This action is " + Fore.RED + "[Non Violence]")

    video_reader.release()
    video_writer.release()

In [25]:
def predict_webcam():
    """
     Predict webcam and put result in text window. Args : None Tuple ( violence_count non_violence_count )
    """
    # Inside here, put number 0 as to use laptop webcam and use 2 if using external webcam
    video_reader = cv2.VideoCapture(0)

    frames_queue = deque(maxlen = SEQUENCE_LENGTH)

    predicted_class_name = ''

    violence_count = 0
    non_violence_count = 0

    # This function reads the video and returns the frame.
    while True:
        ok, frame = video_reader.read()

        # If the user is not ok break the loop.
        if not ok:
            break

        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        normalized_frame = resized_frame / 255

        frames_queue.append(normalized_frame)

        # This function is used to calculate the number of frames in the queue.
        if len(frames_queue) == SEQUENCE_LENGTH:
            predicted_labels_probabilities = MoBiLSTM_model.predict(np.expand_dims(frames_queue, axis=0))[0]

            predicted_label = np.argmax(predicted_labels_probabilities)

            predicted_class_name = CLASSES_LIST[predicted_label]

            # This method is used to add violations and non violations to the plot.
            if predicted_class_name == "Violence":
                cv2.putText(frame, predicted_class_name, (5,100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 0, 255), 12)
                print(Fore.RED + "Violence")
                violence_count +=1
            else:
                cv2.putText(frame, predicted_class_name, (5,100), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 0, 255), 12)
                print(Fore.GREEN + "Non Violence")
                non_violence_count +=1

            cv2.rectangle(frame, (0, 0), (200,200), (0,255,0), 2)

            frames_queue.clear()

        cv2.imshow('Action Detection', frame)

        # Wait for a key to be pressed.
        if cv2.waitKey(1) & 0xFF ==ord('q'):
            break

    # Print out the violations and non violations.
    if violence_count >= non_violence_count:
        print(Fore.WHITE + "This action is " + Fore.RED + " [Violence]")
    else:
        print(Fore.WHITE + "This action is " + Fore.RED + " [Non Violence]")

    video_reader.release()
    cv2.destroyAllWindows()

In [None]:
plt.style.use("default")

# To show Random Frames from the saved output predicted video (output predicted video doesn't show on the notebook but can be downloaded)
def show_pred_frames(pred_video_path):
    """
    Shows the frames that have been predicted by the predictor. This is useful for visualizing the prediction results in a way that can be visualized by clicking on the pred_video_path
    
    Args:
      pred_video_path: Path to the video
    """

    plt.figure(figsize=(20,15))

    video_reader = cv2.VideoCapture(pred_video_path)

    # Get the number of frames in the video.
    frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Get Random Frames from the video then Sort it
    random_range = sorted(random.sample(range (SEQUENCE_LENGTH , frames_count ), 12))

    # Plot a random frame position of the video.
    for counter, random_index in enumerate(random_range, 1):

        plt.subplot(5, 4, counter)

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, random_index)

        ok, frame = video_reader.read()

        # If the user is not ok break the loop.
        if not ok:
          break

        frame = cv2.cvtColor(frame , cv2.COLOR_BGR2RGB)

        plt.imshow(frame);ax.figure.set_size_inches(20,20);plt.tight_layout()

    video_reader.release()

In [None]:
# Construct the output video path. This is a helper function to test the generation of video files for use with

# Construct the output video path.
test_videos_directory = 'test_videos'
os.makedirs(test_videos_directory, exist_ok = True)

output_video_file_path = f'{test_videos_directory}/Output-Test-Video.mp4'

In [None]:
input_video_file_path = "real-life-violence-situations-dataset/Real Life Violence Dataset/Violence/V_41.mp4"

# Perform Prediction on the Test Video.
predict_frames(input_video_file_path, output_video_file_path, SEQUENCE_LENGTH)

# Show random frames from the output video
# show_pred_frames(output_video_file_path)

# Play the actual video
Video_Play(input_video_file_path)

In [28]:
predict_webcam()

[37mThis action is [31m [Violence]


[ WARN:0@370.048] global cap_v4l.cpp:997 open VIDEOIO(V4L2:/dev/video0): can't open camera by index
[ERROR:0@370.048] global obsensor_uvc_stream_channel.cpp:159 getStreamChannelGroup Camera index out of range
