Setup

In [32]:
from tensorflow_docs.vis import embed

from tensorflow import keras
from keras import layers

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os

Data preparation

In [33]:
#INDEX 5
MAX_SEQ_LENGTH_TRANSFORM = 40
NUM_FEATURES_TRANSFORM = 1024

#INDEX 10 (GRU) y 11 (LSTM)
MAX_SEQ_LENGTH_RNN = 40
NUM_FEATURES_RNN = 2048

IMG_SIZE = 128

CLASSES = 4

CNN (DenseNet121 or InceptionV3)

In [34]:
center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE)

def crop_center(frame):
    cropped = center_crop_layer(frame[None, ...])
    cropped = cropped.numpy().squeeze()
    return cropped

def IsBlack(frame, y, x):
    height, width, _ = frame.shape
    cond = True
    r = 2
    i = -r
    j = -r
    while i < r and i + y < height and i + y > 0 and cond:
        j = 0
        while j < r and j + x < width and j + x > 0 and cond:
            if frame[y+i, x+j].any() != 0:
                cond = False
            j = j + 1
        i = i + 1
              
    return cond

def get_dimension(frame):
    height, width, _ = frame.shape

    halfWidth = int(width / 2)
    halfHeight = int(height / 2)

    up = 0
    down = height - 1
    left = halfWidth - 1
    right = halfWidth + 1

    while IsBlack(frame, up, halfWidth) and up < halfHeight:
            up = up + 1
    while IsBlack(frame, down, halfWidth) and down > halfHeight:
            down = down - 1

    while not IsBlack(frame, up + 5, left) and left > 0:
            left = left - 1
    while not IsBlack(frame, up + 5, right) and right < width:
            right = right + 1

    up = up - 1
    down = down + 1
    left = left - 1
    right = right + 1

    return right, up, left, down

def load_video(path, max_frames=0):

    cap = cv2.VideoCapture(path)
    frames = []

    try:
        ret, frame = cap.read()
        right, up, left, down = get_dimension(frame)
        while True:

            if not ret:
                break

            frame = frame[up:down, left:right]
            frame = crop_center(frame)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break            
            ret, frame = cap.read()
    finally:
        cap.release()
    return np.array(frames)

In [39]:
def build_feature_extractor():
    feature_extractor = keras.applications.DenseNet121(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.densenet.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor_TRANSFORMER = build_feature_extractor()

In [40]:
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor_RNN = build_feature_extractor()

TRANSFORMER

In [45]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
def get_compiled_model():
    sequence_length = MAX_SEQ_LENGTH_TRANSFORM
    embed_dim = NUM_FEATURES_TRANSFORM
    dense_dim = 4
    num_heads = 1

    inputs = keras.Input(shape=(None, None))
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(CLASSES, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
    )
    return model


def run_experiment():
    filepath = f"Model/Transformer/video_classifier"
    
    model = get_compiled_model()
    model.load_weights(filepath)
    model.summary()

    return model

TRANSFORMER_model = run_experiment()

Model: "model_8"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_29 (InputLayer)       [(None, None, None)]      0         
                                                                 
 frame_position_embedding (P  (None, None, 1024)       40960     
 ositionalEmbedding)                                             
                                                                 
 transformer_layer (Transfor  (None, None, 1024)       4211716   
 merEncoder)                                                     
                                                                 
 global_max_pooling1d_4 (Glo  (None, 1024)             0         
 balMaxPooling1D)                                                
                                                                 
 dropout_8 (Dropout)         (None, 1024)              0         
                                                           

GRU

In [44]:
def get_sequence_model():
    
    frame_features_input = keras.Input((MAX_SEQ_LENGTH_RNN, NUM_FEATURES_RNN))
    mask_input = keras.Input((MAX_SEQ_LENGTH_RNN,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(CLASSES, activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model

def run_experiment():
    filepath = f"Model/GRU/video_classifier"

    seq_model = get_sequence_model()
    seq_model.load_weights(filepath)
    seq_model.summary()

    return seq_model

GRU_model = run_experiment()

Model: "model_7"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_27 (InputLayer)          [(None, 40, 2048)]   0           []                               
                                                                                                  
 input_28 (InputLayer)          [(None, 40)]         0           []                               
                                                                                                  
 gru_2 (GRU)                    (None, 40, 16)       99168       ['input_27[0][0]',               
                                                                  'input_28[0][0]']               
                                                                                                  
 gru_3 (GRU)                    (None, 8)            624         ['gru_2[0][0]']            

LSTM

In [43]:
def get_sequence_model():
    frame_features_input = keras.Input((MAX_SEQ_LENGTH_RNN, NUM_FEATURES_RNN))
    mask_input = keras.Input((MAX_SEQ_LENGTH_RNN,), dtype="bool")

    x = keras.layers.LSTM(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.LSTM(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(CLASSES, activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model

def run_experiment():
    filepath = f"Model/LSTM/video_classifier"

    seq_model = get_sequence_model()
    seq_model.load_weights(filepath)
    seq_model.summary()

    return seq_model

LSTM_model = run_experiment()

Model: "model_6"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_25 (InputLayer)          [(None, 40, 2048)]   0           []                               
                                                                                                  
 input_26 (InputLayer)          [(None, 40)]         0           []                               
                                                                                                  
 lstm_2 (LSTM)                  (None, 40, 16)       132160      ['input_25[0][0]',               
                                                                  'input_26[0][0]']               
                                                                                                  
 lstm_3 (LSTM)                  (None, 8)            800         ['lstm_2[0][0]']           

Inference

In [50]:
def get_type(index):
    if (index == 0):
        return feature_extractor_TRANSFORMER, MAX_SEQ_LENGTH_TRANSFORM, NUM_FEATURES_TRANSFORM
    else:
        return feature_extractor_RNN, MAX_SEQ_LENGTH_RNN, NUM_FEATURES_RNN
    
def get_model(index):
    if (index == 0):
        return TRANSFORMER_model
    if (index == 1):
        return GRU_model
    else:
        return LSTM_model

In [62]:
def prepare_single_video(frames, index):
    feature_extractor, max_seq_length, num_features = get_type(index)
    frame_features = np.zeros(shape=(1, max_seq_length, num_features), dtype="float32")
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")

    if len(frames) < max_seq_length:
        diff = max_seq_length - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate(frames, padding)

    frames = frames[None, ...]

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(max_seq_length, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :], verbose=0)
            else:
                frame_features[i, j, :] = 0.0
        frame_mask[i, :length] = 1

    return frame_features, frame_mask


def predict_action(path, index):

    model = get_model(index)

    class_vocab = np.arange(CLASSES)

    frames = load_video(path)
    
    frame_features, frame_mask = prepare_single_video(frames, index)

    if (index == 0):
        probabilities = model.predict(frame_features)[0]
    else:        
        probabilities = model.predict([frame_features, frame_mask])[0]

    print(class_vocab)
    print(probabilities)
    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return frames

In [63]:
test_video = f"D://Facultad//PPS//Proyecto Ecografia//Noviembre2020//PAC15//PRE.mov"

print("TRANSFORMER:")
test_frames = predict_action(test_video, 0)
print("GRU:")
test_frames = predict_action(test_video, 1)
print("LSTM:")
test_frames = predict_action(test_video, 2)

[0 1 2 3]
[0.06462812 0.23497145 0.02162581 0.6787746 ]
  3: 67.88%
  1: 23.50%
  0:  6.46%
  2:  2.16%
