In [None]:
!pip install -q git+https://github.com/tensorflow/docs

  Building wheel for tensorflow-docs (setup.py) ... [?25l[?25hdone


In [None]:
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd 
import numpy as np
import imageio
import cv2
import os

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
train_df = pd.read_csv("/content/drive/MyDrive/GMM_group/train.csv")
test_df = pd.read_csv("/content/drive/MyDrive/GMM_group/test.csv")

In [None]:
def crop_center_square(frame):
        y, x = frame.shape[0:2]
        min_dim = min(y, x)
        start_x = (x // 2) - (min_dim // 2)
        start_y = (y // 2) - (min_dim // 2)
        return frame[start_y : start_y+min_dim, start_x : start_x+min_dim]

def load_video(path, max_frames=0):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, (224,224))
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames) 

In [None]:
def build_cnn_model():
    cnn_model = keras.applications.ResNet50(weights="imagenet", include_top=False, pooling="avg", input_shape=(224, 224, 3))
    inputs = keras.Input((224, 224, 3))
    return keras.Model(inputs, cnn_model(keras.applications.resnet50.preprocess_input(inputs)), name="cnn_model")

cnn_model = build_cnn_model()

In [None]:
label_processor = keras.layers.experimental.preprocessing.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)
print(label_processor.get_vocabulary())

In [None]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()
    labels = df["tag"].values
    labels = label_processor(labels[..., None]).numpy()
    
    frame_masks = np.zeros(shape=(num_samples, 60), dtype="bool")
    frame_features = np.zeros(shape=(num_samples, 60, 2048),
                                dtype="float32")
    
    for idx, path in enumerate(video_paths):
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]
        
        temp_frame_mask = np.zeros(shape=(1, 60, ), dtype="bool")
        temp_frame_features = np.zeros(shape=(1, 60, 2048),
                                dtype="float32")
        
        for i, batch in enumerate(frames):  
            video_length = batch.shape[0]
            length = min(60, video_length)  
            for j in range(length):
                temp_frame_features[i, j, :] = cnn_model.predict(batch[None, j, :])  
            temp_frame_mask[i, :length] = 1

        frame_features[idx, ] = temp_frame_features.squeeze()
        frame_masks[idx, ] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels

In [None]:
train_data, train_labels = prepare_all_videos(train_df, "/content/drive/MyDrive/GMM_group/train")
test_data, test_labels = prepare_all_videos(test_df, "/content/drive/MyDrive/GMM_group/test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

Frame features in train set: (280, 60, 2048)
Frame masks in train set: (280, 60)


In [None]:
def get_rnn_model(): 
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((60, 2048))
    mask_input = keras.Input((60,), dtype="bool")

    x = keras.layers.GRU(16, return_sequences=True)(frame_features_input, mask=mask_input)
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(loss="sparse_categorical_crossentropy", 
        optimizer="adam",
        metrics=["accuracy"])
    return rnn_model

def train_and_test_rnn_model():
    rnn_model = get_rnn_model()
    history = rnn_model.fit([train_data[0], train_data[1]], train_labels,
        validation_split=0,
        epochs=20)
    
    _, accuracy = rnn_model.evaluate([train_data[0], train_data[1]], train_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, rnn_model

In [None]:
_, rnn_model = train_and_test_rnn_model()

In [None]:
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, 60, ), dtype="bool")
    frame_featutes = np.zeros(shape=(1, 60, 2048),
                            dtype="float32")
    
    for i, batch in enumerate(frames):  
        video_length = batch.shape[1]
        length = min(60, video_length)  
        for j in range(length):
            frame_featutes[i, j, :] = cnn_model.predict(batch[None, j, :])  
        frame_mask[i, :length] = 1

    return frame_featutes, frame_mask

def sequence_prediction(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join("test", path))
    print(frames.shape)
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = rnn_model.predict([frame_features, frame_mask])[0]
    
    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return frames

def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=10)
    return embed.embed_file("animation.gif")

In [None]:
test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(f'/content/drive/MyDrive/GMM_group/test/{test_video}')
to_gif(test_frames[:60])