## Data collection

In [2]:
!wget -q https://github.com/sayakpaul/Action-Recognition-in-TensorFlow/releases/download/v1.0.0/ucf101_top5.tar.gz
!tar -xf ucf101_top5.tar.gz

## Import Dependencies

In [30]:
import os

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
from tensorflow.keras.applications.densenet import DenseNet121, preprocess_input
from tensorflow.keras.activations import gelu, softmax
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import History
from tensorflow.keras.losses import sparse_categorical_crossentropy


import pandas as pd
import numpy as np
import imageio
import cv2 as cv

## Define Hyperparameters

In [4]:
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 1024
IMG_SIZE = 128

EPOCHS = 5

## Preparing Data

### Load Up The Data Frames

In [2]:
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

print(f'[INFO]  Total videos for training: {len(train_df)}')
print(f'[INFO]  Total videos for testing: {len(test_df)}')

[INFO]  Total videos for training: 594
[INFO]  Total videos for testing: 224


In [None]:
center_crop_layer = layers.CenterCrop(IMG_SIZE, IMG_SIZE)


def crop_center(frame: np.ndarray) -> tf.Tensor:
    cropped = center_crop_layer(frame[None, ...])
    cropped = tf.convert_to_tensor(cropped)
    cropped = tf.squeeze(cropped)
    return cropped


def load_video(path: str, max_frames: int = 0) -> np.ndarray:
    cap = cv.VideoCapture(path)
    frames = []
    try:
        while True:
            success, frame = cap.read()
            if not success:
                break
            frame = frame[:, :, [2, 1, 0]]
            frame = crop_center(frame)
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)


def build_feature_extractor() -> keras.Model:
    base_net = DenseNet121(
        weights='imagenet',
        include_top=False,
        pooling='avg',
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = base_net(preprocessed)
    return keras.Model(inputs, outputs, name='feature_extractor')

In [None]:
feature_extractor = build_feature_extractor()

In [3]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df['tag']), mask_token=None
)
print(label_processor.get_vocabulary())


def prepare_all_videos(df: pd.DataFrame, root_dir: str) -> tuple[np.ndarray, np.ndarray]:
    num_samples = len(df)
    video_paths = df['video_name'].values.tolist()
    labels = df['tag'].values
    labels = label_processor(labels[..., None]).numpy()

    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype=np.float32
    )

    for idx, path in enumerate(video_paths):
        frames = load_video(os.path.join(root_dir, path))

        if len(frames) < MAX_SEQ_LENGTH:
            diff = MAX_SEQ_LENGTH - len(frames)
            padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
            frames = np.concatenate(frames, padding)

        frames = frames[None, ...]

        temp_frame_features_placeholder = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype='float32'
        )

        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                if np.mean(batch[j, :]) > 0.0:
                    temp_frame_features_placeholder[i, j, :] = feature_extractor.predict(
                        batch[None, j, :]
                    )

                else:
                    temp_frame_features_placeholder[i, j, :] = 0.0

        frame_features[idx,] = temp_frame_features_placeholder.squeeze()

    return frame_features, labels


['CricketShot', 'PlayingCello', 'Punch', 'ShavingBeard', 'TennisSwing']


In [None]:
!!wget -q https://git.io/JZmf4 -O top5_data_prepared.tar.gz
!!tar -xf top5_data_prepared.tar.gz

In [5]:
train_data, train_labels = np.load('train_data.npy'), np.load('train_labels.npy')
test_data, test_labels = np.load('test_data.npy'), np.load('test_labels.npy')

print(f'[INFO]   Frame features in train set: {train_data.shape}')

[INFO]   Frame features int train set: (594, 20, 1024)


## Build The Transformer-Based Model

#### Build A Custom Layer To Create Positional Embeddings

In [31]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def build(self, input_shape):
        self.position_embeddings.build(input_shape)

    def call(self, x):
        x = tf.cast(x, self.compute_dtype)
        length = tf.shape(x)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return x + embedded_positions


#### Build An Encoder Layer Of The Transformer Architecture.

In [32]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)

        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )

        self.norm_1 = layers.LayerNormalization()
        self.norm2 = layers.LayerNormalization()

        self.dense1 = layers.Dense(dense_dim, activation=gelu)
        self.dense2 = layers.Dense(embed_dim)

    def call(self, x, mask=None):
        attention_output = self.attention(x, x, attention_mask=mask)
        dense1_input = self.norm_1(x + attention_output)
        dens1_output = self.dense1(dense1_input)
        dens2_output = self.dense2(dens1_output)
        return self.norm2(dense1_input + dens2_output)

## Train

#### Utility Functions For Training

In [33]:
def get_compiled_model(shape: tuple) -> keras.Model:
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    classes = len(label_processor.get_vocabulary())

    inputs = keras.Input(shape=shape)
    x = PositionalEmbedding(
        sequence_length, embed_dim, name='frame_position_embedding'
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name='transformer_layer')(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(.5)(x)
    outputs = layers.Dense(classes, activation=softmax)(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer=Adam(),
        loss=sparse_categorical_crossentropy,
        metrics=['accuracy'],
    )
    return model


def run_experiment() -> keras.Model:
    filepath = '/tmp/video_classifier.weights.h5'
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    model = get_compiled_model(train_data.shape[1:])
    history: History = model.fit(
        train_data,
        train_labels,
        validation_split=0.15,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    model.load_weights(filepath)
    accuracy = model.evaluate(test_data, test_labels)[1]
    print(f'Test accuracy: {round(accuracy * 100, 2)}%')

    return model


## Model Training

In [34]:
trained_model = run_experiment()

Epoch 1/5
Epoch 1: val_loss improved from inf to 2.16958, saving model to /tmp/video_classifier.weights.h5
Epoch 2/5
Epoch 2: val_loss did not improve from 2.16958
Epoch 3/5
Epoch 3: val_loss did not improve from 2.16958
Epoch 4/5
Epoch 4: val_loss improved from 2.16958 to 1.79922, saving model to /tmp/video_classifier.weights.h5
Epoch 5/5
Epoch 5: val_loss did not improve from 1.79922
Test accuracy: 89.73%


## Inference

In [1]:
def load_video() -> np.ndarray:
    return np.random.rand(30, IMG_SIZE, IMG_SIZE, 3)


base_net = keras.applications.DenseNet121(
    include_top=False, pooling='avg', input_shape=(IMG_SIZE, IMG_SIZE, 3))


def prepare_single_video(frames) -> np.ndarray:
    frame_features = np.zeros(
        shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype='float32')

    if len(frames) < MAX_SEQ_LENGTH:
        diff = MAX_SEQ_LENGTH - len(frames)
        padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3))
        frames = np.concatenate([frames, padding], axis=0)

    frames = frames[None, ...]

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            if np.mean(batch[j, :]) > 0.0:
                frame_features[i, j, :] = base_net.predict(batch[None, j, :])
            else:
                frame_features[i, j, :] = 0.0

    return frame_features


def predict_action(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join('test', path), offload_to_cpu=True)
    frame_features = prepare_single_video(frames)
    probabilities = trained_model.predict(frame_features)[0]

    for i in np.argsort(probabilities)[::-1]:
        print(f'  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%')

    return frames


def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave('animation.gif', converted_images, fps=10)


# Assuming test_df is a DataFrame with a column 'video_name' containing video file names
test_video = np.random.choice(test_df['video_name'].values.tolist())
print(f'Test video path: {test_video}')
test_frames = predict_action(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])

Test video path: v_ShavingBeard_g03_c02.avi
 1/1 ━━━━━━━━━━━━━━━━━━━━ 20s 20s/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 8ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 10ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 11ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 12ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 12ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 9ms/step
 1/1 ━━━━━━━━━━━━━━━━━━━━ 1s 557ms/step
  ShavingBeard: 100.00%
  Punch:  0.00%
  CricketShot:  0.00%
  TennisSwing:  0.00%
  PlayingCello:  0.00%


The performance of our model is far from optimal, because it was trained on a
small dataset.