# RATING Project

Comic Mischief Detection Task

Files:

1. "train.csv" : 
-- Contains multiclass classification content annotations for each video scene used in the training set.
-- Annotations are on a scene level and do not correspond to a specific modality
-- a ".csv" file containing video URLs as well as the IDs of the scenes used in the training set.
-- Videos are available in the form of URLs, collected from the Youtube and the IMDB websites.
-- Contains metadata about the videos.
-- Four content categories related to comic mischief are used (Sarcasm, Slapstick Humor, Gory Humor, Mature Humor).

2. "val.csv" : 
-- Contains multiclass classification content annotations for each video scene used in the validation set.
-- You can use this set for performing model hyperparameter tuning before using the test set


3. "test.csv" : 
-- Contains multiclass classification content annotations for each video scene used in the test set.
-- You can use this set for evaluating your method

In [22]:
from tensorflow_docs.vis import embed
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score,accuracy_score, confusion_matrix
from tensorflow.keras import backend as K
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.models import Model
from keras_preprocessing import image

import tensorflow as tf
import pandas as pd
import numpy as np
import glob
import cv2
import os

In [23]:
## global variables

train_dir = os.getcwd() + "/train_data/"
val_dir = os.getcwd() + "/val_data/"
test_dir = os.getcwd() + "/test_data/"

# Hyperparameters
MAX_SEQ_LENGTH = 128
FRAME_GAP = 11
NUM_FEATURES = 1024
IMG_SIZE = 224

EPOCHS = 10
BATCH_SIZE = 64

## Load test set

In [24]:
train_image_data, train_labels = np.load("seq_length_128/extracted_data/train_data.npy"), np.load("seq_length_128/extracted_data/train_labels.npy")
val_image_data, val_labels = np.load("seq_length_128/extracted_data/val_data.npy"), np.load("seq_length_128/extracted_data/val_labels.npy")
test_image_data, test_labels = np.load("seq_length_128/extracted_data/test_data.npy"), np.load("seq_length_128/extracted_data/test_labels.npy")

train_spectrograms = glob.glob('audio_classification/extracted_train_spectrogram/*')
val_spectrograms = glob.glob('audio_classification/extracted_val_spectrogram/*')
test_spectrograms = glob.glob('audio_classification/extracted_test_spectrogram/*')

## Utility functions

In [25]:
# Utilities to open video files using CV2
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y:start_y+min_dim,start_x:start_x+min_dim]

def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    count = 0
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            
            if not ret:
                break

            if count % 5 == 0:
                frame = crop_center_square(frame)
                frame = cv2.resize(frame, resize)
                frame = frame[:, :, [2, 1, 0]]
                frames.append(frame)

            count=count+1
            
            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [26]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

## Evaluate Transformer-based model on test set

In [27]:
# Embedding Layer
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask

In [28]:
# Subclassed layer
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

In [29]:
def get_transformer_model():
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    # classes = len(label_processor.get_vocabulary())
    classes = 4

    inputs = keras.Input(shape=(None, None))
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="sigmoid")(x)
    model = keras.Model(inputs, outputs)


    optimizer = keras.optimizers.Adam(learning_rate=1e-4)
    # compile the model
    model.compile(
        optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy',f1_m,precision_m, recall_m]
    )
    model.summary()
    return model

In [30]:
filepath = os.getcwd() + "/seq_length_128/video_chkpt_128/video_classifier"
transformer = get_transformer_model()
transformer.load_weights(filepath)

# evaluate the transformer model
loss, accuracy, f1_score, precision, recall = transformer.evaluate(test_image_data, test_labels, verbose=0)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
print(f"F1 score: {round(f1_score, 2)}")
print(f"Precision: {round(precision, 2)}")
print(f"Recall: {round(recall, 2)}")

Model: "model_8"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_9 (InputLayer)         [(None, None, None)]      0         
_________________________________________________________________
frame_position_embedding (Po (None, None, 1024)        131072    
_________________________________________________________________
transformer_layer (Transform (None, None, 1024)        4211716   
_________________________________________________________________
global_max_pooling1d_8 (Glob (None, 1024)              0         
_________________________________________________________________
dropout_8 (Dropout)          (None, 1024)              0         
_________________________________________________________________
dense_26 (Dense)             (None, 4)                 4100      
Total params: 4,346,888
Trainable params: 4,346,888
Non-trainable params: 0
_________________________________________________

In [28]:
def prepare_single_video(frames):
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    # Pad shorter videos.
    if len(frames) < MAX_SEQ_LENGTH:
        diff = MAX_SEQ_LENGTH - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate((frames, padding))

    frames = frames[None, ...]

    # Extract features from the frames of the current video.
    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
            else:
                frame_features[i, j, :] = 0.0

    return frame_features


In [29]:
predict = []
actual = []

for path in test_df['path']:
    frames = load_video(os.path.join("test", path))
    frame_features = prepare_single_video(frames)
    y_pred = transformer_model.predict(frame_features)[0]
    
    # round probabilities to class labels
    y_pred = y_pred.round()
    predict.append(y_pred)

In [30]:
recall = recall_score(y_true=test_labels, y_pred=predict, average='weighted')
print("Recall", recall)
precision = precision_score(y_true=test_labels, y_pred=predict, average='weighted')
print("Precision", precision)
f1 = f1_score(y_true=test_labels, y_pred=predict, average='weighted')
print("F1 score", f1)

Recall 0.3516483516483517
Precision 0.5134746206174778
F1 score 0.4059426489003954


  _warn_prf(average, modifier, msg_start, len(result))


In [31]:
print(classification_report(test_labels, predict))

              precision    recall  f1-score   support

           0       0.22      0.08      0.12        24
           1       0.00      0.00      0.00        18
           2       0.50      0.83      0.62         6
           3       0.89      0.58      0.70        43

   micro avg       0.68      0.35      0.46        91
   macro avg       0.40      0.37      0.36        91
weighted avg       0.51      0.35      0.41        91
 samples avg       0.27      0.27      0.27        91



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=7327af42-8a03-4c46-b38e-e6931aa020f3' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>

## Evaluate CNN model on test set

In [31]:
def get_cnn_model():

    #Create CNN model
    model_pretrained = VGG19(include_top=True, input_shape=(IMG_SIZE, IMG_SIZE, 3))#, weights="imagenet")
    x = layers.Dense(4096, activation='relu', name='predictions1', dtype='float32')(model_pretrained.layers[-2].output)
    output = layers.Dense(4, activation='sigmoid', name='predictions', dtype='float32')(x)
    model = Model(model_pretrained.input, output)


    optimizer = keras.optimizers.SGD(learning_rate=0.0000001, decay=1e-6, momentum=0.9, nesterov=True)
    # optimizer = keras.optimizers.SGD(learning_rate=0.0000001, decay=1e-6, momentum=0.9)

    # compile the model
    model.compile(
        optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy',f1_m,precision_m, recall_m]
    )

    model.summary()
    return model

In [32]:
train_audio_data = []
val_audio_data = []
test_audio_data = []

for f in train_spectrograms:
    img = image.load_img(f, target_size= (IMG_SIZE,IMG_SIZE))
    img = image.img_to_array(img)
    train_audio_data.append(img)
    
train_audio_data = np.array(train_audio_data)

for f in val_spectrograms:
    img = image.load_img(f, target_size= (IMG_SIZE,IMG_SIZE))
    img = image.img_to_array(img)
    val_audio_data.append(img)
    
val_audio_data = np.array(val_audio_data)

for f in test_spectrograms:
    img = image.load_img(f, target_size= (IMG_SIZE,IMG_SIZE))
    img = image.img_to_array(img)
    test_audio_data.append(img)
    
test_audio_data = np.array(test_audio_data)

In [34]:
filepath = os.getcwd() + "/audio_classification/audio_chkpt_sgd/audio_classifier"
# filepath = os.getcwd() + "/audio_temp/audio_classifier"
cnn_model = get_cnn_model()
cnn_model.load_weights(filepath)

# evaluate the cnn model
loss, accuracy, f1_score, precision, recall = cnn_model.evaluate(test_audio_data, test_labels, verbose=0)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
print(f"F1 score: {round(f1_score, 2)}")
print(f"Precision: {round(precision, 2)}")
print(f"Recall: {round(recall, 2)}")

Model: "model_10"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_11 (InputLayer)        [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 56, 56, 128)       0  

In [None]:
y_pred = cnn_model.predict(test_audio_data)
y_pred = (y_pred > 0.5).astype(int)
# print(y_pred)

from sklearn.metrics import accuracy_score
print(accuracy_score(test_labels, y_pred))
from sklearn.metrics import classification_report
print(classification_report(test_labels, y_pred))

## Visualization

In [None]:
# def predict_action(path):
#     class_vocab = label_processor.get_vocabulary()

#     frames = load_video(os.path.join("test", path))
#     frame_features = prepare_single_video(frames)
#     probabilities = trained_model.predict(frame_features)[0]

#     for i in np.argsort(probabilities)[::-1]:
#         print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")

#     # pred = np.argsort(probabilities)[::-1]
#     # print(pred)
#     return frames


# # This utility is for visualization.
# # Referenced from:
# # https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
# def to_gif(images):
#     converted_images = images.astype(np.uint8)
#     imageio.mimsave("animation.gif", converted_images, fps=10)
#     return embed.embed_file("animation.gif")