<a href="https://colab.research.google.com/github/mohamedalifaragitiai/Video-captioning-using-deep-learning-with-greedy-search/blob/main/Model_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import  Libraries

In [None]:
import os
import numpy as np
import pickle
import tensorflow as tf
from tensorflow import keras
import tensorflow.keras.backend as K
import tensorflow_hub as hub
import cv2

%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
# log into drive from colab
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%ls

[0m[01;34mdrive[0m/  [01;34msample_data[0m/


# Variables

In [None]:
os.chdir('/content/drive/MyDrive/MyMaster/Final VD/Data')

In [None]:
%ls

AllVideoDescriptions.txt  model.png              video_frames_dict.pickle
glove.6B.300d.txt         [0m[01;34mTestData[0m/              word_to_id.pickle
id_to_word.pickle         [01;34mvideo_frame_features[0m/  [01;34mYouTubeClips[0m/


In [None]:
data_path = os.path.dirname(os.getcwd()) + '/Data/'
test_videos_path = os.path.dirname(os.getcwd()) + '/Data/TestData/'
video_features_path = os.path.dirname(os.getcwd()) + '/Data/video_frame_features/'
saved_model_path = '/content/drive/MyDrive/MyMaster/Final VD/Video_Captioning/saved_model/whole_model'
effNet_model_path = '/content/drive/MyDrive/MyMaster/Final VD/Video_Captioning/saved_model/effNet'


# Load EfficientNet and Extract frames from video

In [None]:
def extract_frames_from_video(video_name):
    path_input = test_videos_path + video_name + ".avi"

    cap = cv2.VideoCapture(path_input)
    cap.set(cv2.CAP_PROP_POS_AVI_RATIO, 0)

    frameCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    frameWidth = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    frameHeight = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    videoFPS = int(cap.get(cv2.CAP_PROP_FPS))

    buffer = np.empty((frameCount, frameHeight, frameWidth, 3), np.dtype("uint8"))

    fc = 0
    while fc < frameCount:
        ret, buffer[fc] = cap.read()
        fc += 1

    representative_frames = buffer[::videoFPS, :, :, :] # ::videoFPS means for start to end with step # videoFPS

    cap.release()
    del buffer
    del cap

    return representative_frames

In [None]:
def save_features_from_video_frames(video_name):
    representative_frames = extract_frames_from_video(video_name)
    representative_frames = representative_frames / 255
    resized_frames = tf.image.resize_with_crop_or_pad(representative_frames, 600, 600)
    frames_features = effNet_model.predict(resized_frames)

    np.save(video_features_path + video_name, frames_features)

In [None]:
effNet_model = keras.models.load_model(effNet_model_path)



In [None]:
effNet_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 keras_layer (KerasLayer)    (None, 2560)              64097680  
                                                                 
Total params: 64,097,680
Trainable params: 0
Non-trainable params: 64,097,680
_________________________________________________________________


# Load Tokenizer

In [None]:
with open(data_path + 'id_to_word.pickle', "rb") as handle:
    id_to_word = pickle.load(handle)

with open(data_path + 'word_to_id.pickle', "rb") as handle:
    word_to_id = pickle.load(handle)

In [None]:
words = tf.constant(list(id_to_word.values()))
word_ids = tf.constant(list(word_to_id.values()), dtype=tf.int64)
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
num_oov_buckets = 1
table = tf.lookup.StaticVocabularyTable(vocab_init, num_oov_buckets)

In [None]:
vocab_size = len(words)
id_to_word[len(id_to_word)] = "<unk>"

## Data Padding

In [None]:
def padding_part_of_caption_sequence(video_length):
    return video_length * ["<pad>"]

def append_bos_token(sequence_list):
    return sequence_list + ["<bos>"]

def append_eos_token(sequence_list):
    return sequence_list + ["<eos>"]

In [None]:
# load video features array and pad it, return padded features
def get_padded_input_cnn(video_name, caption_length):
    stored_features_path = video_features_path + video_name + ".npy"
    features_array = np.load(stored_features_path)
    Number_of_frames = features_array.shape[0]

    input_cnn_padding_length = caption_length + 1
    input_cnn_padding_array = np.full([input_cnn_padding_length, 2560], 0)
    input_cnn_padded_array = np.concatenate((features_array, input_cnn_padding_array))
    return tf.constant(input_cnn_padded_array), Number_of_frames

# Load the model

In [None]:
model = keras.models.load_model(saved_model_path)

In [None]:
# model.summary()

In [None]:
# tf.keras.utils.plot_model(model)

# Enter Video Name

In [None]:
videoName = "testVideo8"
#testVideo4 && testVideo7 && testVideo2 && testVideo1 && testVideo8 && testVideo9 && testVideo10 && testVideo11

# Extract video frame features using an EfficientNet

In [None]:
_, _, features_files = next(os.walk(video_features_path))
features_files = [x[:-4] for x in features_files]
unprocessed_video_names = []

In [None]:
if (videoName not in features_files):
    try:
        save_features_from_video_frames(videoName)
        features_files.append(videoName)
    except:
        unprocessed_video_names.append(videoName)



# Inference (Greedy)

In [None]:
sample_cnn_input, Number_of_frames = get_padded_input_cnn(videoName, 20)

input_seq = table.lookup(
    tf.constant(
        append_bos_token(
            padding_part_of_caption_sequence(Number_of_frames)
        )
    )
)

In [None]:
initial_input_seq_length = len(input_seq)
END_TOKEN = 2

next_token = -1
for i in range(20):
    next_token = model.predict(
        (
            tf.expand_dims(sample_cnn_input[: len(input_seq)], axis=0),
            tf.expand_dims(input_seq, axis=0),
        )
    )[0][-1].argmax()

    if next_token == END_TOKEN:
        break

    input_seq = tf.concat(
        [tf.cast(input_seq, tf.int32), tf.cast(tf.constant([next_token]), tf.int32)],
        axis=-1,
    )



In [None]:
[id_to_word[id_] for id_ in input_seq[initial_input_seq_length:].numpy().tolist()]

['a', 'cat', 'is', 'playing']

# Beam Search

In [None]:
# with probabilities
beam_width = 10
maximum_caption_length = 10

sample_cnn_input, Number_of_frames = get_padded_input_cnn(videoName, maximum_caption_length)

input_seq = table.lookup(
    tf.constant(
        append_bos_token(
            padding_part_of_caption_sequence(Number_of_frames)
        )
    )
)

In [None]:
initial_input_seq_length = len(input_seq)
certainty_lists = [[] for _ in range(beam_width)]
sentences = [input_seq for _ in range(beam_width)]
sentence_probabilities = [1 for _ in range(beam_width)]
END_TOKEN = 2

In [None]:
for i in range(maximum_caption_length):
    if i == 0:
        prediction = model.predict(
            (
                tf.expand_dims(sample_cnn_input[: len(input_seq)], axis=0),
                tf.expand_dims(input_seq, axis=0),
            )
        )[0][-1] # last token in sequence probability

        next_tokens = prediction.argsort()[-beam_width:][::-1]
        for j in range(beam_width):
            sentences[j] = tf.concat(
                [
                    tf.cast(sentences[j], tf.int32),
                    tf.cast(tf.constant([next_tokens[j]]), tf.int32),
                ],
                axis=-1,
            )
            sentence_probabilities[j] = (sentence_probabilities[j] * prediction[next_tokens[j]])
            certainty_lists[j].append(prediction[next_tokens[j]])

    if i > 0:
        next_sentences = sentences.copy()
        next_certainty_lists = [[] for _ in range(beam_width)]
        next_token_probabilities = [[] for _ in range(beam_width)]
        next_sentence_probabilities = [[] for _ in range(beam_width)]

        for j in range(beam_width):
            prediction = model.predict(
                (
                    tf.expand_dims(sample_cnn_input[: len(sentences[j])], axis=0),
                    tf.expand_dims(sentences[j], axis=0),
                )
            )[0][-1]

            next_sentence_probabilities[j] = prediction * sentence_probabilities[j]
            next_token_probabilities[j] = prediction

        # np.dstack make (concat, stack and block)
        indices_of_most_probable_sentences = np.dstack(
            np.unravel_index(
                np.argsort(np.array(next_sentence_probabilities).ravel()),
                (beam_width, vocab_size + num_oov_buckets),
        ))[0][-beam_width:][::-1]   # shape before slice (1, #beam_width * #(vocab_size + num_oov_buckets), # beam_width)

        for k in range(beam_width):
            next_sentence_index = indices_of_most_probable_sentences[k][0]
            next_token_index = indices_of_most_probable_sentences[k][1]
            next_sentences[k] = tf.concat(
                [
                    tf.cast(sentences[next_sentence_index], tf.int32),
                    tf.cast(tf.constant([next_token_index]), tf.int32),
                ],
                axis=-1,
            )

            sentence_probabilities[k] = next_sentence_probabilities[next_sentence_index][next_token_index]

            next_certainty_lists[k] = certainty_lists[next_sentence_index].copy()
            next_certainty_lists[k].append(next_token_probabilities[next_sentence_index][next_token_index])

        sentences = next_sentences
        certainty_lists = next_certainty_lists



In [None]:
for i in range(beam_width):
    try:
        stop_index = sentences[i][initial_input_seq_length:].numpy().tolist().index(END_TOKEN)
        predicted_caption = [id_to_word[id_] for id_ in sentences[i][initial_input_seq_length:]
                            .numpy().tolist()[:stop_index]]
    except:
        predicted_caption = [id_to_word[id_] for id_ in sentences[i][initial_input_seq_length:]
                            .numpy().tolist()]

    print(predicted_caption)
    print("Sentence probability:", sentence_probabilities[i], '\n')

    for word, certainty in zip(predicted_caption, certainty_lists[i]):
        print(word, '\t', certainty)
    print("\n")

['a', 'cat', 'is', 'playing']
Sentence probability: 0.0011712479 

a 	 0.7833383
cat 	 0.16917801
is 	 0.8747256
playing 	 0.09724133


['a', 'small', 'group', 'of', 'kids', 'are', 'playing']
Sentence probability: 0.0009851756 

a 	 0.7833383
small 	 0.1205752
group 	 0.30174574
of 	 0.99583995
kids 	 0.14416866
are 	 0.71174026
playing 	 0.9530341


['a', 'cat', 'is', 'playing']
Sentence probability: 0.0009255728 

a 	 0.7833383
cat 	 0.16917801
is 	 0.8747256
playing 	 0.09724133


['a', 'small', 'group', 'of', 'people', 'are', 'playing']
Sentence probability: 0.00087840436 

a 	 0.7833383
small 	 0.1205752
group 	 0.30174574
of 	 0.99583995
people 	 0.26272553
are 	 0.7720742
playing 	 0.83425546


['a', 'cat', 'is', 'playing']
Sentence probability: 0.0008031002 

a 	 0.7833383
cat 	 0.16917801
is 	 0.8747256
playing 	 0.09724133


['a', 'cat', 'is', 'meowing']
Sentence probability: 0.0007846419 

a 	 0.7833383
cat 	 0.16917801
is 	 0.8747256
meowing 	 0.073722064


['a', 'person', 