# Introduction

This is an interactive notebook containing code to run a CNN-LSTM model for video captioning. More information and credits can be found at our [Github](https://github.com/teresa-tommy/VideoCaptioningCapstone). To use the notebook, install the data files from our Github, or download data of your own and organize it as we did. Afterwards, run each of the cells in order, updating file paths as necessary. After successfully running the notebook, you will have generated a set of captions for the video dataset. If you would like to turn these captions into metrics, refer to our notebook, Metrics.ipynb, also linked in our [Github](https://github.com/teresa-tommy/VideoCaptioningCapstone).

## Setup
This note book was originally designed for use in Google Colab. If this notebook is running locally through Jupyter Notebooks or another tool, the cell below is not needed. Ensure all file paths are updated accordingly.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


The following cells install all necessary dependencies and add the necessary import statements.

In [2]:
!pip install tensorflow
!pip install keras_preprocessing
!pip install dill
!pip install -r drive/MyDrive/KerasModel/requirements.txt

Collecting absl-py==0.11.0 (from -r drive/MyDrive/KerasModel/requirements.txt (line 1))
  Using cached absl_py-0.11.0-py3-none-any.whl (127 kB)
Collecting cachetools==4.2.1 (from -r drive/MyDrive/KerasModel/requirements.txt (line 3))
  Using cached cachetools-4.2.1-py3-none-any.whl (12 kB)
Collecting certifi==2020.12.5 (from -r drive/MyDrive/KerasModel/requirements.txt (line 4))
  Using cached certifi-2020.12.5-py2.py3-none-any.whl (147 kB)
Collecting chardet==4.0.0 (from -r drive/MyDrive/KerasModel/requirements.txt (line 5))
  Using cached chardet-4.0.0-py2.py3-none-any.whl (178 kB)
Collecting dill==0.3.3 (from -r drive/MyDrive/KerasModel/requirements.txt (line 6))
  Using cached dill-0.3.3-py2.py3-none-any.whl (81 kB)
Collecting flatbuffers==1.12 (from -r drive/MyDrive/KerasModel/requirements.txt (line 7))
  Using cached flatbuffers-1.12-py2.py3-none-any.whl (15 kB)
Collecting future==0.18.2 (from -r drive/MyDrive/KerasModel/requirements.txt (line 8))
  Using cached future-0.18.2.tar

In [3]:
import json
import random
import os
from tensorflow.keras.preprocessing.text import Tokenizer
import numpy as np
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
import cv2
import shutil
import functools
import operator
import os
import time
import os
from keras.layers import Input, LSTM, Dense
from keras.models import Model, load_model
import tensorflow.keras.preprocessing
import joblib

The next cells define and create a config containing parameters to be used throughout the rest of the notebook. Since we will only be testing a pre-trained model, the hyper-parameters do not need to be adjusted. The primary parameter of concern is the test_path, which should be a path to the directory of the testing data.

In [4]:
class Config:
    def __init__(self):
        self.train_path = "drive/MyDrive/KerasModel/data/training_data/"
        self.test_path = "drive/MyDrive/KerasModel/data/testing_data/"
        self.batch_size = 320
        self.learning_rate = 0.0007
        self.epochs = 150
        self.latent_dim = 512
        self.num_encoder_tokens = 4096
        self.num_decoder_tokens = 1500
        self.time_steps_encoder = 80
        self.max_probability = -1
        self.save_model_path = 'drive/MyDrive/KerasModel/model_final'
        self.validation_split = 0.15
        self.max_length = 10
        self.search_type = 'greedy'

In [5]:
config = Config()

## Data Pre-processing

The following cells define methods for feature extraction from the video data and run feature extraction on the requested dataset. If it already detects that a video has had features extracted already, the notebook will skip over them, which means you do not have to extract all the features in one session.

In [6]:

def video_to_frames(video, config):
    path = os.path.join(config.test_path, 'temporary_images')
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)
    video_path = os.path.join(config.test_path, 'video', video)
    count = 0
    image_list = []
    # Path to video file
    cap = cv2.VideoCapture(video_path)
    while cap.isOpened():
        ret, frame = cap.read()
        if ret is False:
            break
        cv2.imwrite(os.path.join(config.test_path, 'temporary_images', 'frame%d.jpg' % count), frame)
        image_list.append(os.path.join(config.test_path, 'temporary_images', 'frame%d.jpg' % count))
        count += 1

    cap.release()
    cv2.destroyAllWindows()
    return image_list

In [7]:
def model_cnn_load():
    model = VGG16(weights="imagenet", include_top=True, input_shape=(224, 224, 3))
    out = model.layers[-2].output
    model_final = Model(inputs=model.input, outputs=out)
    return model_final


def load_image(path):
    img = cv2.imread(path)
    img = cv2.resize(img, (224, 224))
    return img


def extract_features(video, model, config):
    """
    :param video: The video whose frames are to be extracted to convert into a numpy array
    :param model: the pretrained vgg16 model
    :return: numpy array of size 4096x80
    """
    video_id = video.split(".")[0]
    print(video_id)
    print(f'Processing video {video}')

    image_list = video_to_frames(video, config)
    samples = np.round(np.linspace(
        0, len(image_list) - 1, 80))
    image_list = [image_list[int(sample)] for sample in samples]
    images = np.zeros((len(image_list), 224, 224, 3))
    for i in range(len(image_list)):
        img = load_image(image_list[i])
        images[i] = img
    images = np.array(images)
    fc_feats = model.predict(images, batch_size=128)
    img_feats = np.array(fc_feats)
    # cleanup
    shutil.rmtree(os.path.join(config.test_path, 'temporary_images'))
    return img_feats


def extract_feats_pretrained_cnn(config):
    """
    saves the numpy features from all the videos
    """
    model = model_cnn_load()
    print('Model loaded')

    if not os.path.isdir(os.path.join(config.test_path, 'feat')):
        os.mkdir(os.path.join(config.test_path, 'feat'))

    i = 1

    video_list = os.listdir(os.path.join(config.test_path, 'video'))
    for video in video_list:
        print("video " + str(i))
        i = i + 1
        # outfile = os.path.join(test_path, 'feat', video.split(".")[0] + '.npy')
        # img_feats = extract_features(video, model)
        # np.save(outfile, img_feats)
        outfile = os.path.join(config.test_path, 'feat', video.split(".")[0] + '.avi.npy')
        print(outfile)
        if os.path.exists(outfile):
            print(f"Skipping video {video}, already processed")
            continue

        img_feats = extract_features(video, model, config)
        np.save(outfile, img_feats)

In [8]:
extract_feats_pretrained_cnn(config)

Model loaded
video 1
drive/MyDrive/KerasModel/data/testing_data/feat/klteYv1Uv9A_27_33.avi.npy
klteYv1Uv9A_27_33
Processing video klteYv1Uv9A_27_33.avi
video 2
drive/MyDrive/KerasModel/data/testing_data/feat/j2Dhf-xFUxU_13_20.avi.npy
j2Dhf-xFUxU_13_20
Processing video j2Dhf-xFUxU_13_20.avi
video 3
drive/MyDrive/KerasModel/data/testing_data/feat/UXs3eq68ZjE_250_255.avi.npy
UXs3eq68ZjE_250_255
Processing video UXs3eq68ZjE_250_255.avi
video 4
drive/MyDrive/KerasModel/data/testing_data/feat/TZ860P4iTaM_15_28.avi.npy
TZ860P4iTaM_15_28
Processing video TZ860P4iTaM_15_28.avi
video 5
drive/MyDrive/KerasModel/data/testing_data/feat/u4T76jsPin0_0_11.avi.npy
u4T76jsPin0_0_11
Processing video u4T76jsPin0_0_11.avi
video 6
drive/MyDrive/KerasModel/data/testing_data/feat/WTf5EgVY5uU_124_128.avi.npy
WTf5EgVY5uU_124_128
Processing video WTf5EgVY5uU_124_128.avi
video 7
drive/MyDrive/KerasModel/data/testing_data/feat/ZbtpcGi2DWY_161_170.avi.npy
ZbtpcGi2DWY_161_170
Processing video ZbtpcGi2DWY_161_170.avi

## Running the model

The rest of the cells are responsible for defining the necessary methods for running the model and generating captions based on the previously extracted set of features.

In [9]:
def inference_model(config):
    """Returns the model that will be used for inference"""
    with open(os.path.join(config.save_model_path, 'tokenizer' + str(config.num_decoder_tokens)), 'rb') as file:
        tokenizer = joblib.load(file)
    # loading encoder model. This remains the same
    inf_encoder_model = load_model(os.path.join(config.save_model_path, 'encoder_model.h5'))

    # inference decoder model loading
    decoder_inputs = Input(shape=(None, config.num_decoder_tokens))
    decoder_dense = Dense(config.num_decoder_tokens, activation='softmax')
    decoder_lstm = LSTM(config.latent_dim, return_sequences=True, return_state=True)
    decoder_state_input_h = Input(shape=(config.latent_dim,))
    decoder_state_input_c = Input(shape=(config.latent_dim,))
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
    decoder_states = [state_h, state_c]
    decoder_outputs = decoder_dense(decoder_outputs)
    inf_decoder_model = Model(
        [decoder_inputs] + decoder_states_inputs,
        [decoder_outputs] + decoder_states)
    inf_decoder_model.load_weights(os.path.join(config.save_model_path, 'decoder_model_weights.h5'))
    return tokenizer, inf_encoder_model, inf_decoder_model

In [13]:
class VideoDescriptionInference():
    """
            Initialize the parameters for the model
            """
    def __init__(self, config):
        self.latent_dim = config.latent_dim
        self.num_encoder_tokens = config.num_encoder_tokens
        self.num_decoder_tokens = config.num_decoder_tokens
        self.time_steps_encoder = config.time_steps_encoder
        self.max_probability = config.max_probability

        # models
        self.tokenizer, self.inf_encoder_model,  self.inf_decoder_model = inference_model(config)
        self.save_model_path = config.save_model_path
        self.test_path = config.test_path
        self.search_type = config.search_type

    def greedy_search(self, loaded_array):
        """

                :param f: the loaded numpy array after creating videos to frames and extracting features
                :return: the final sentence which has been predicted greedily
                """
        inv_map = self.index_to_word()
        states_value = self.inf_encoder_model.predict(loaded_array.reshape(-1, 80, 4096))
        target_seq = np.zeros((1, 1, 1500))
        sentence = ''
        target_seq[0, 0, self.tokenizer.word_index['bos']] = 1
        for i in range(15):
            output_tokens, h, c = self.inf_decoder_model.predict([target_seq] + states_value)
            states_value = [h, c]
            output_tokens = output_tokens.reshape(self.num_decoder_tokens)
            y_hat = np.argmax(output_tokens)
            if y_hat == 0:
                continue
            if inv_map[y_hat] is None:
                break
            else:
                sentence = sentence + inv_map[y_hat] + ' '
                target_seq = np.zeros((1, 1, 1500))
                target_seq[0, 0, y_hat] = 1
        return ' '.join(sentence.split()[:-1])

    def decode_sequence2bs(self, input_seq):
        states_value = self.inf_encoder_model.predict(input_seq)
        target_seq = np.zeros((1, 1, self.num_decoder_tokens))
        target_seq[0, 0, self.tokenizer.word_index['bos']] = 1
        self.beam_search(target_seq, states_value, [], [], 0)
        return decode_seq

    def beam_search(self, target_seq, states_value, prob, path, lens):
        """

                :param target_seq: the array that is fed into the model to predict the next word
                :param states_value: previous state that is fed into the lstm cell
                :param prob: probability of predicting a word
                :param path: list of words from each sentence
                :param lens: number of words
                :return: final sentence
                """
        global decode_seq
        node = 2
        output_tokens, h, c = self.inf_decoder_model.predict(
            [target_seq] + states_value)
        output_tokens = output_tokens.reshape(self.num_decoder_tokens)
        sampled_token_index = output_tokens.argsort()[-node:][::-1]
        states_value = [h, c]
        for i in range(node):
            if sampled_token_index[i] == 0:
                sampled_char = ''
            else:
                sampled_char = list(self.tokenizer.word_index.keys())[
                    list(self.tokenizer.word_index.values()).index(sampled_token_index[i])]
            MAX_LEN = 12
            if sampled_char != 'eos' and lens <= MAX_LEN:
                p = output_tokens[sampled_token_index[i]]
                if sampled_char == '':
                    p = 1
                prob_new = list(prob)
                prob_new.append(p)
                path_new = list(path)
                path_new.append(sampled_char)
                target_seq = np.zeros((1, 1, self.num_decoder_tokens))
                target_seq[0, 0, sampled_token_index[i]] = 1.
                self.beam_search(target_seq, states_value, prob_new, path_new, lens + 1)
            else:
                p = output_tokens[sampled_token_index[i]]
                prob_new = list(prob)
                prob_new.append(p)
                p = functools.reduce(operator.mul, prob_new, 1)
                if p > self.max_probability:
                    decode_seq = path
                    self.max_probability = p

    def decoded_sentence_tuning(self, decoded_sentence):
        decode_str = []
        filter_string = ['bos', 'eos']
        uni_gram = {}
        last_string = ""
        for idx2, c in enumerate(decoded_sentence):
            if c in uni_gram:
                uni_gram[c] += 1
            else:
                uni_gram[c] = 1
            if last_string == c and idx2 > 0:
                continue
            if c in filter_string:
                continue
            if len(c) > 0:
                decode_str.append(c)
            if idx2 > 0:
                last_string = c
        return decode_str

    def index_to_word(self):
        # inverts word tokenizer
        index_to_word = {value: key for key, value in self.tokenizer.word_index.items()}
        return index_to_word

    def get_test_data(self):
        """
        loads all the numpy files
        :return: two lists containing all the video arrays and the video Id
        """
        X_test = []
        X_test_filename = []
        with open(os.path.join(self.test_path, 'testing_id.txt')) as testing_file:
            lines = testing_file.readlines()
            for filename in lines:
                filename = filename.strip()
                f = np.load(os.path.join(self.test_path, 'feat', filename + '.npy'))
                X_test.append(f)
                X_test_filename.append(filename[:-4])
            X_test = np.array(X_test)
        return X_test, X_test_filename

    def test(self):
        """
            writes the captions of all the testing videos in a text file
        """
        X_test, X_test_filename = self.get_test_data()

        # generate inference test outputs
        # results will be saved to the file with the path stated below
        with open(os.path.join(self.test_path, 't_%s.txt' % self.search_type), 'w') as file:
            for idx, x in enumerate(X_test):
                file.write(X_test_filename[idx] + ',')
                if self.search_type is 'greedy':
                    start = time.time()
                    decoded_sentence = self.greedy_search(x.reshape(-1, 80, 4096))
                    file.write(decoded_sentence + ',{:.2f}'.format(time.time()-start))
                else:
                    start = time.time()
                    decoded_sentence = self.decode_sequence2bs(x.reshape(-1, 80, 4096))
                    decode_str = self.decoded_sentence_tuning(decoded_sentence)
                    for d in decode_str:
                        file.write(d + ' ')
                    file.write(',{:.2f}'.format(time.time() - start))
                file.write('\n')

                # re-init max prob
                self.max_probability = -1

  if self.search_type is 'greedy':


In [14]:
video_to_text = VideoDescriptionInference(config)
video_to_text.test()





## References

The above code was adapted from the following:
- [Medium Article](https://medium.com/analytics-vidhya/video-captioning-with-keras-511984a2cfff)
- [GitHub](https://github.com/Shreyz-max/Video-Captioning)