In [None]:
!pip -q install opendatasets

In [None]:
!pip install pyspark



# **Importing the required libraries**

In [None]:
from pyspark.sql import SparkSession
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import numpy as np
import os
import cv2
import pandas as pd
from tensorflow.keras.layers import Dropout
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Spark Session**

In [None]:
# Step 1: Setup Spark Session
spark = SparkSession.builder \
    .appName("VideoClassification") \
    .getOrCreate()

# Constants
IMG_SIZE = 75
BATCH_SIZE = 20
EPOCHS = 10
MAX_SEQ_LENGTH = 16
NUM_FEATURES = 2048

# Load and broadcast the model
model = MobileNetV2(include_top=False, weights='imagenet', input_shape=(IMG_SIZE, IMG_SIZE, 3), pooling='avg')
bc_model = spark.sparkContext.broadcast(model)


  self.pid = _posixsubprocess.fork_exec(


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5


# **Extracting Frames**

In [None]:
# Step 2: Load and preprocess video data
def preprocess_frames(video_path, frame_rate=1):
    frames = []  # Placeholder for extracted frames
    video_reader = cv2.VideoCapture(video_path)
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(video_frames_count / MAX_SEQ_LENGTH), 1)

    for frame_counter in range(MAX_SEQ_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
        success, frame = video_reader.read()
        if not success:
            break
        resized_frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
        normalized_frame = resized_frame / 255.0
        frame_list = normalized_frame.astype(np.float32).tolist()
        frames.append(frame_list)

    while len(frames) < MAX_SEQ_LENGTH:
        frames.append(np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.float32).tolist())

    video_reader.release()
    return frames

# Function to extract features using the broadcasted MobileNetV2 model
def extract_features(frames):
    frames_array = np.array(frames)
    model = bc_model.value  # Use the broadcasted model
    features = model.predict(frames_array)
    return features

# Function to save features to a Parquet file
def save_features_to_parquet(features, output_path):
    df = pd.DataFrame(features)
    df.to_parquet(output_path, index=False)

# Processing function
def process_video(video_path, output_directory, class_label):
    frames = preprocess_frames(video_path)
    features = extract_features(frames)
    output_filename = f"{os.path.splitext(os.path.basename(video_path))[0]}_class{class_label}.parquet"
    output_path = os.path.join(output_directory, output_filename)
    save_features_to_parquet(features, output_path)
    return (output_path, class_label)


In [None]:
# Directories
video_directories = [
    "/content/drive/MyDrive/BigData/Class 0 - Safe",
    "/content/drive/MyDrive/BigData/Class 1 - Fantasy Violence",
    "/content/drive/MyDrive/BigData/Class 2 - Sex, Nudity"
]
output_directory = "/content/drive/MyDrive/BigData/Test"

# Collect all video file paths with their class labels
video_paths = []
for class_label, video_directory in enumerate(video_directories):
    video_files = [os.path.join(video_directory, video_file) for video_file in os.listdir(video_directory) if video_file.endswith(('.mp4', '.avi'))]
    for video_file in video_files:
        video_paths.append((video_file, class_label))

# Create an RDD from the video paths
video_paths_rdd = spark.sparkContext.parallelize(video_paths)

# Process videos in parallel
results = video_paths_rdd.map(lambda video_path_label: process_video(video_path_label[0], output_directory, video_path_label[1])).collect()


print("Results: ", results)


Results:  [('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part13_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part3_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part5_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part17_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part14_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part4_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part2_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part1_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part9_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 1.mp4part0_class0.parquet', 0), ('/content/drive/MyDrive/BigData/Test/Copy of RandomCN 3.mp4part11_class0.parquet', 0), ('/content/drive/MyDrive/Big

# **Encoding and Splitting Training-Testing Sets**

In [None]:
from sklearn.model_selection import train_test_split

# Extract features and labels from the results
features_list = []
labels_list = []

for result in results:
    features_path, label = result
    features = pd.read_parquet(features_path).values
    features_list.append(features)
    labels_list.append(label)

# Convert to numpy arrays
features = np.array(features_list)
labels = np.array(labels_list)

# Ensure features have the correct shape for LSTM input
features = np.reshape(features, (features.shape[0], features.shape[1], -1))

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.1, random_state=42)

# Save the split data
np.save('/content/drive/MyDrive/BigData/X_train.npy', X_train)
np.save('/content/drive/MyDrive/BigData/X_test.npy', X_test)
np.save('/content/drive/MyDrive/BigData/y_train.npy', y_train)
np.save('/content/drive/MyDrive/BigData/y_test.npy', y_test)

print("Data processing and splitting completed.")

Data processing and splitting completed.


In [None]:
print(X_train.shape,y_train.shape )
print(X_test.shape, y_test.shape)

(113, 16, 1280) (113,)
(13, 16, 1280) (13,)


In [None]:
X_train

array([[[1.0606327e+00, 0.0000000e+00, 1.7031278e-01, ...,
         5.8634591e-01, 0.0000000e+00, 4.8865478e-02],
        [2.5613976e-01, 0.0000000e+00, 1.7089987e-01, ...,
         2.0910603e-01, 0.0000000e+00, 2.8398398e-01],
        [6.3855624e-01, 0.0000000e+00, 7.4252551e-03, ...,
         4.9017906e-02, 4.4912466e-01, 4.5948696e-01],
        ...,
        [1.4702004e+00, 6.2738791e-02, 1.5029595e+00, ...,
         0.0000000e+00, 1.7512474e+00, 4.6759129e-02],
        [1.2162323e+00, 6.5420115e-01, 1.7532545e+00, ...,
         0.0000000e+00, 7.5528002e-01, 0.0000000e+00],
        [4.8062414e-01, 7.9621263e-02, 1.6409785e+00, ...,
         0.0000000e+00, 1.8228166e-01, 0.0000000e+00]],

       [[0.0000000e+00, 0.0000000e+00, 0.0000000e+00, ...,
         0.0000000e+00, 0.0000000e+00, 0.0000000e+00],
        [0.0000000e+00, 7.0095319e-01, 0.0000000e+00, ...,
         0.0000000e+00, 1.1166382e-01, 6.5884691e-01],
        [0.0000000e+00, 7.0277110e-02, 8.8806796e-01, ...,
         1.869

In [None]:
y_train

array([1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 2, 2, 0, 0, 0, 2, 2,
       0, 1, 0, 0, 1, 2, 0, 1, 1, 0, 0, 1, 0, 2, 0, 1, 1, 0, 1, 2, 0, 0,
       0, 0, 1, 0, 1, 0, 2, 0, 1, 2, 2, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 2,
       2, 1, 1, 1, 0, 1, 1, 2, 1, 2, 0, 1, 1, 0, 1, 2, 1, 1, 0, 2, 1, 1,
       2, 0, 0, 2, 0, 1, 0, 0, 0, 2, 1, 1, 2, 1, 1, 1, 2, 0, 1, 1, 2, 0,
       1, 1, 2])

# **Building The Model**

In [None]:
# Define the MobileLSTM model
def create_mobilelstm_model(input_shape):
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, input_shape=input_shape))
    model.add(Dropout(0.5))
    model.add(LSTM(64, return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(3, activation='softmax'))
    return model

# **Spark Pipeline**

In [None]:
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, IntegerType
# Train
# Convert y_train to float
y_train = y_train.astype(np.float32)

# Convert numpy arrays to lists of tuples
data_train = [(X_train[i].tolist(), float(y_train[i])) for i in range(len(X_train))]

# Define schema for the DataFrame
schema = StructType([
    StructField("features_train", ArrayType(ArrayType(FloatType())), True),
    StructField("labels_train", FloatType(), True)
])

# Create DataFrame
train_df = spark.createDataFrame(data_train, schema)

# #Test
# Convert y_test to float
y_test = y_test.astype(np.float32)

# Convert numpy arrays to lists of tuples
data_test = [(X_test[i].tolist(), float(y_test[i])) for i in range(len(X_test))]

# Define schema for the DataFrame
schema = StructType([
    StructField("features_test", ArrayType(ArrayType(FloatType())), True),
    StructField("labels_test", FloatType(), True)
])

# Create DataFrame
test_df = spark.createDataFrame(data_test, schema)


In [None]:
train_df.show()
test_df.show()

+--------------------+------------+
|      features_train|labels_train|
+--------------------+------------+
|[[1.0606327, 0.0,...|         1.0|
|[[0.0, 0.0, 0.0, ...|         1.0|
|[[0.0, 0.0, 1.295...|         0.0|
|[[0.0, 0.00719769...|         1.0|
|[[0.0, 0.9712226,...|         0.0|
|[[0.24123171, 0.1...|         1.0|
|[[0.0, 0.0, 0.104...|         0.0|
|[[0.0, 2.2483082,...|         0.0|
|[[0.0, 0.01189200...|         0.0|
|[[0.0, 0.0, 0.017...|         1.0|
|[[0.6432096, 0.09...|         0.0|
|[[0.0, 0.07047849...|         1.0|
|[[0.15117094, 0.0...|         0.0|
|[[0.0, 0.09764412...|         1.0|
|[[0.0, 0.0, 2.111...|         0.0|
|[[0.0, 0.0, 0.0, ...|         2.0|
|[[0.8973733, 0.0,...|         2.0|
|[[0.03134775, 1.1...|         0.0|
|[[0.0, 0.0, 0.359...|         0.0|
|[[0.0, 0.90706986...|         0.0|
+--------------------+------------+
only showing top 20 rows

+--------------------+-----------+
|       features_test|labels_test|
+--------------------+-----------+
|[[1.

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labels, features, model])

fitted_pipeline = pipeline.fit(train_df)
prediction = fitted_pipeline.transform(train_df)
prediction = fitted_pipeline.transform(test_df)

In [None]:
# Create the model
input_shape = (X_train.shape[1], X_train.shape[2])
model = create_mobilelstm_model(input_shape)

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# **Specifying Callbacks and Fitting**

In [None]:
# Train the model
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7b34fa22fc70>

In [None]:
# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

Test Accuracy: 92.31%


In [None]:
# Make predictions (optional)
predictions = model.predict(X_test)
predicted_labels = np.argmax(predictions, axis=1)

# Save the model (optional)
model.save('/content/drive/MyDrive/BigData/MobileLSTM_model.h5')



  saving_api.save_model(


In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image

# Load the pre-trained MobileLSTM model
MobileLSTM_model = load_model('/content/drive/MyDrive/BigData/MobileLSTM_model.h5')

# Define the image height and width based on your model's input requirements
IMAGE_HEIGHT = 75
IMAGE_WIDTH = 75

# Define the classes list
CLASSES_LIST = ['Class 0 - Safe', 'Class 1 - Fantasy Violence', 'Class 2 - Sex, Nudity']

In [None]:
# Load the MobileNetV2 model for feature extraction, excluding the top layer
feature_extractor = MobileNetV2(weights='imagenet', include_top=False, pooling='avg', input_shape=(75, 75, 3))
def extract_features(frames):
    # Preprocess frames for MobileNetV2
    frames = np.array([cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH)) for frame in frames])
    frames = frames / 255.0  # Normalize the frames
    features = feature_extractor.predict(frames)
    return features



In [None]:
def predict_video(video_file_path, SEQUENCE_LENGTH):
    video_reader = cv2.VideoCapture(video_file_path)

    # Get the width and height of the video.
    original_video_width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_video_height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Declare a list to store video frames we will extract.
    frames_list = []

    # Get the number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count / SEQUENCE_LENGTH), 1)

    # Iterating the number of times equal to the fixed length of sequence.
    for frame_counter in range(SEQUENCE_LENGTH):
        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        success, frame = video_reader.read()

        if not success:
            break

        # Append the frame to the frames list
        frames_list.append(frame)

    # Extract features from the frames using MobileNetV2
    frames_features = extract_features(frames_list)

    # Expand dimensions to match model input shape
    frames_features = np.expand_dims(frames_features, axis=0)

    # Passing the pre-processed frames to the model and get the predicted probabilities.
    predicted_labels_probabilities = MobileLSTM_model.predict(frames_features)[0]

    # Get the index of class with highest probability.
    predicted_label = np.argmax(predicted_labels_probabilities)

    # Get the class name using the retrieved index.
    predicted_class_name = CLASSES_LIST[predicted_label]

    # Display the predicted class along with the prediction confidence.
    print(f'Predicted: {predicted_class_name}\nConfidence: {predicted_labels_probabilities[predicted_label]}')

    video_reader.release()

# **Prediction For The Video**

In [None]:
# Example usage
SEQUENCE_LENGTH = 16
# Specifying video to be predicted
input_video_file_path = "/content/drive/MyDrive/BigData/Class 0 - Safe/Video_31.mp4"

# Perform Single Prediction on the Test Video.
predict_video(input_video_file_path, SEQUENCE_LENGTH)

Predicted: Class 0 - Safe
Confidence: 0.9015423059463501
