In [None]:
 # https://keras.io/examples/vision/video_classification/

### Importing and preparing dataframe

In [None]:
!pip install -q git+https://github.com/tensorflow/docs

  Building wheel for tensorflow-docs (setup.py) ... [?25l[?25hdone


In [None]:
!rm -rf sample_data

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
datapath = '/content/gdrive/MyDrive/data/'

In [None]:
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

In [None]:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 10

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048

In [None]:
# open the file which have names of training videos
from os import listdir
from os.path import isfile,join

badvideos = [f for f in listdir(f"{datapath}badtrain") if isfile(join(f"{datapath}badtrain",f))]
goodvideos = [f for f in listdir(f"{datapath}goodtrain") if isfile(join(f"{datapath}goodtrain",f))]
allvideos = badvideos+goodvideos

videoclass = ['bad' for _ in range(len(badvideos))] + ['good' for _ in range(len(goodvideos))]

# creating a dataframe having video names
train = pd.DataFrame({'video_name':allvideos})
train['class'] = videoclass

print(train)

             video_name class
0   20211024_221837.mp4   bad
1   20211024_221843.mp4   bad
2   20211024_221849.mp4   bad
3   20211024_221854.mp4   bad
4   20211024_221859.mp4   bad
..                  ...   ...
68        ry_good_4.MOV  good
69        ry_good_2.MOV  good
70        ry_good_5.MOV  good
71        ry_good_6.MOV  good
72       ry_good_10.MOV  good

[73 rows x 2 columns]


In [None]:
# open the .txt file which have names of test videos
from os import listdir
from os.path import isfile,join

badtest = [f for f in listdir(f"{datapath}badtest") if isfile(join(f"{datapath}badtest",f))]
goodtest = [f for f in listdir(f"{datapath}goodtest") if isfile(join(f"{datapath}goodtest",f))]
alltest = badtest+goodtest

testclass = ['bad' for _ in range(len(badtest))] + ['good' for _ in range(len(goodtest))]

# creating a dataframe having video names
test = pd.DataFrame({'video_name':alltest})
test['class'] = testclass

print(test)

     video_name class
0     Bad 2.mov   bad
1     Bad 4.mov   bad
2     Bad 3.mov   bad
3     Bad 6.mov   bad
4     Bad 5.mov   bad
5     Bad 7.mov   bad
6    Bad 10.mov   bad
7     Bad 9.mov   bad
8     Bad 8.mov   bad
9    Bad 12.mov   bad
10   Bad 11.mov   bad
11    Bad 1.mov   bad
12   Bad 13.mov   bad
13   Bad 14.mov   bad
14   Bad 15.mov   bad
15   Good 2.mov  good
16   Good 3.mp3  good
17   Good 6.mov  good
18   Good 7.mov  good
19   Good 1.mov  good
20   Good 8.mov  good
21   Good 9.mov  good
22   Good 4.mov  good
23  Good 10.mov  good
24  Good 12.mov  good
25  Good 11.mov  good
26  Good 13.mov  good


### Model Training

In [None]:
def load_video(path, i, max_frames=0):
    if path == "train":
        videoname = train['video_name'][i]
        videoclass = train['class'][i]
        video = datapath+videoclass+"train/"+videoname
    elif path == "test":
        videoname = test['video_name'][i]
        videoclass = test['class'][i]
        video = datapath+videoclass+"test/"+videoname
    cap = cv2.VideoCapture(video)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frameID = cap.get(1)
            if (frameID%2==0):
                frame = cv2.resize(frame, (224, 224))
                frames.append(frame)
            # storing the frames in a new folder named train_1
            # filename = f"{path}_1/{train['video_name'][i]}_frame{frameID/2}_{train['class'][i]}.jpg"
            # cv2.imwrite(filename, frame)
            # if len(frames) == max_frames:
            #     break
    finally:
        cap.release()
    return np.array(frames)

In [None]:
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()


In [None]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train['class'])
)
print(label_processor.get_vocabulary())

['bad', 'good']


In [None]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()
    labels = df["class"].values
    labels = label_processor(labels[..., None]).numpy()

    # `frame_masks` and `frame_features` are what we will feed to our sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    # masked with padding or not.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        # print(os.path.join(datapath, "bad"+ root_dir, path))
        frames = load_video(root_dir, idx)
        # frames2 = load_video(os.path.join(datapath, "bad"+root_dir, path), idx)
        frames = frames[None, ...]
        # print(frames)

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels


train_data, train_labels = prepare_all_videos(train, "train")
# test_data, test_labels = prepare_all_videos(test, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

Frame features in train set: (73, 20, 2048)
Frame masks in train set: (73, 20)


In [None]:
# Utility for our sequence model.
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# Utility for running experiments.
def run_experiment():
    filepath = "/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()


Epoch 1/10
Epoch 00001: val_loss improved from inf to 1.24986, saving model to /tmp/video_classifier
Epoch 2/10
Epoch 00002: val_loss did not improve from 1.24986
Epoch 3/10
Epoch 00003: val_loss improved from 1.24986 to 1.22207, saving model to /tmp/video_classifier
Epoch 4/10
Epoch 00004: val_loss did not improve from 1.22207
Epoch 5/10
Epoch 00005: val_loss did not improve from 1.22207
Epoch 6/10
Epoch 00006: val_loss did not improve from 1.22207
Epoch 7/10
Epoch 00007: val_loss did not improve from 1.22207
Epoch 8/10
Epoch 00008: val_loss did not improve from 1.22207
Epoch 9/10
Epoch 00009: val_loss did not improve from 1.22207
Epoch 10/10
Epoch 00010: val_loss did not improve from 1.22207
Test accuracy: 55.56%


### Model Evaluation

In [None]:
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask


def sequence_prediction(i):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video("test", i)
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = sequence_model.predict([frame_features, frame_mask])[0]

    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return frames


for idx, video in enumerate(test["video_name"].values.tolist()):
    print(f"Test video path: {video}")
    test_frames = sequence_prediction(idx)

Test video path: Bad 2.mov
  bad: 62.45%
  good: 37.55%
Test video path: Bad 4.mov
  bad: 68.52%
  good: 31.48%
Test video path: Bad 3.mov
  bad: 67.41%
  good: 32.59%
Test video path: Bad 6.mov
  bad: 67.43%
  good: 32.57%
Test video path: Bad 5.mov
  bad: 66.45%
  good: 33.55%
Test video path: Bad 7.mov
  bad: 70.39%
  good: 29.61%
Test video path: Bad 10.mov
  bad: 68.35%
  good: 31.65%
Test video path: Bad 9.mov
  bad: 67.48%
  good: 32.52%
Test video path: Bad 8.mov
  bad: 67.88%
  good: 32.12%
Test video path: Bad 12.mov
  bad: 67.91%
  good: 32.09%
Test video path: Bad 11.mov
  bad: 63.73%
  good: 36.27%
Test video path: Bad 1.mov
  bad: 65.55%
  good: 34.45%
Test video path: Bad 13.mov
  bad: 69.36%
  good: 30.64%
Test video path: Bad 14.mov
  bad: 65.76%
  good: 34.24%
Test video path: Bad 15.mov
  bad: 64.59%
  good: 35.41%
Test video path: Good 2.mov
  bad: 67.10%
  good: 32.90%
Test video path: Good 3.mp3
  bad: 67.80%
  good: 32.20%
Test video path: Good 6.mov
  bad: 64.13

In [None]:
# def sequence_prediction(path, i):
#     class_vocab = label_processor.get_vocabulary()

#     # frames = load_video(os.path.join("test", path), i)
#     test_data, test_labels = prepare_all_videos(test, "test")
#     probabilities = sequence_model.predict(test_data)[0]

#     for i in np.argsort(probabilities)[::-1]:
#         print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
#     # return frames

# for idx, video in enumerate(test["video_name"].values.tolist()):
#     print(f"Test video path: {video}")
#     test_frames = sequence_prediction(video, idx)
