## **DISCLAIMER:**


* It is necessary to create a shortcut to the weights of the best models we have got in the main folder of your Google Drive: https://drive.google.com/drive/folders/1BdriY_6wRI7ulC5qK4PxWvYo-gdNoZ9W?usp=sharing

* It is necessary to create a shortcut to the Dataset split into subfolders in the main folder of your Google Drive: https://drive.google.com/drive/folders/1-VT0sJeAFrYBfhBFAE_S_Puy33QkUxV8?usp=sharing

## **Importing the necessary packages**

In [None]:
import os
import tensorflow as tf
from datetime import datetime

import json
import random
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.text import Tokenizer 
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
import math

# Set the seed for easy reproducibility
SEED = 1234
tf.random.set_seed(SEED)  

# Get current working directory
cwd = os.getcwd()

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
random.seed(1234)
img_h = 200
img_w = 350 
max_words = 100
embedding_dim = 70
num_classes = 58
val_split = 0.8
max_len = 25
batch_size=32


labels_dict = {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
}


def get_number_of_labels():
    return len(list(labels_dict.keys()))


def decode_img(image_tensor: tf.Tensor, image_size: tf.shape, rescale=True):
    image_tensor = tf.io.decode_png(image_tensor, channels=3)
    image_tensor = tf.image.convert_image_dtype(image_tensor, tf.float32)
    image_tensor = tf.image.resize(image_tensor, image_size)
    if not rescale:
        image_tensor = image_tensor * 255.0
    return image_tensor

## **Tokenizer**

In [None]:
MAX_NUM_WORDS = 300
MAX_WORDS_IN_SENTENCE = 100


def get_question_shape():
    return MAX_WORDS_IN_SENTENCE


class CustomTokenizer(object):

    def __init__(self, train_file_path):
        with open(train_file_path, "r") as train_file:
            questions_list = json.load(train_file)
        all_train_questions = [questions_list[question_dict]['question'].replace("?", "") for question_dict in questions_list]
        all_train_questions.append(["sos", "eos"])

        self.tokenizer = Tokenizer(num_words=MAX_NUM_WORDS, oov_token=True)
        self.tokenizer.fit_on_texts(all_train_questions)

        self.questions_wtoi = self.tokenizer.word_index
        print('Total questions words:', len(self.questions_wtoi))

    def get_wtoi(self):
        return self.questions_wtoi

    def tokenize_and_pad_list(self, question_list):
        question_tokenized = self.tokenizer.texts_to_sequences(question_list)
        question_tokenized_and_pad = pad_sequences(question_tokenized, maxlen=MAX_WORDS_IN_SENTENCE)
        return question_tokenized_and_pad

## **Training/Validation Generator and Test Generator**

In [None]:
class CustomTrainValidGenerator(object):

    def __init__(self, questions_file_path: str, image_directory: str,
                 target_image_size: tf.shape, tokenizer: CustomTokenizer, valid_split: float,
                 split_seed, rescale_image=True):
        self.tokenizer = tokenizer

        with open(questions_file_path, 'r') as f:
            self.questions_list = json.load(f)

        self.all_train_questions = [self.questions_list[question_dict] for question_dict in self.questions_list]

        self.train_questions_list, self.valid_questions_list = train_test_split(self.all_train_questions, shuffle=True,
                                                                                test_size=valid_split,
                                                                                random_state=split_seed)
        self.target_image_size = target_image_size
        self.image_directory = image_directory
        self.rescale_image = rescale_image

    def _dataset(self, questions_list, batch_size, do_shuffle=True):

        def process_image_filenames(image_filename):
            image_filepath = tf.strings.unicode_encode(image_filename, output_encoding='UTF-8')
            image_tensor = tf.io.read_file(image_filepath)
            image_tensor = decode_img(image_tensor, self.target_image_size, rescale=self.rescale_image)

            del image_filepath

            return image_tensor

        question_string_list = []
        image_filename_list = []
        label_list = []
        for question in questions_list:
            question_string_list.append("sos {} eos".format(question['question']))


            image_filepath = os.path.join(self.image_directory, str(int(int(question['image_id'])/1500)+1) + '/' + question['image_id'] + '.png')
            image_filename_list.append(image_filepath)

            label_list.append(labels_dict[question['answer']])

        question_tokenized_list = self.tokenizer.tokenize_and_pad_list(question_string_list)
        image_filename_list = tf.strings.unicode_decode(image_filename_list, input_encoding='UTF-8')
        label_list = tf.one_hot(label_list, depth=get_number_of_labels(), dtype=tf.int32)

        question_dataset = tf.data.Dataset.from_tensor_slices(question_tokenized_list)
        label_dataset = tf.data.Dataset.from_tensor_slices(label_list)
        image_dataset = tf.data.Dataset.from_tensor_slices(image_filename_list)
        image_dataset = image_dataset.map(process_image_filenames, num_parallel_calls=tf.data.experimental.AUTOTUNE)

        dataset = tf.data.Dataset.zip(((question_dataset, image_dataset), label_dataset))
        if do_shuffle:
            dataset = dataset.shuffle(buffer_size=batch_size*5)
        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(1)
        dataset = dataset.repeat()
        return dataset

    def get_train_dataset(self, batch_size):
        return self._dataset(self.train_questions_list, batch_size, do_shuffle=True)

    def get_valid_dataset(self, batch_size):
        return self._dataset(self.valid_questions_list, batch_size)

    def get_train_samples(self):
        return len(self.train_questions_list)

    def get_valid_samples(self):
        return len(self.valid_questions_list)

class CustomTestGenerator(object):
    def __init__(self, questions_file_path: str, image_directory: str,
                 target_image_size: tf.shape, tokenizer: CustomTokenizer, rescale_image=True):
        self.tokenizer = tokenizer

        with open(questions_file_path, 'r') as f:
            self.questions_list1 = json.load(f)

        self.questions_list = [self.questions_list1[question_dict] for question_dict in self.questions_list1]

        self.target_image_size = target_image_size
        self.image_directory = image_directory
        self.rescale_image = rescale_image

    def _dataset(self, questions_list, batch_size):

        def process_image_filenames(image_filename):
            image_filepath = tf.strings.unicode_encode(image_filename, output_encoding='UTF-8')
            image_tensor = tf.io.read_file(image_filepath)
            image_tensor = decode_img(image_tensor, self.target_image_size, rescale=self.rescale_image)

            del image_filepath

            return image_tensor

        question_string_list = []
        image_filename_list = []
        for question in questions_list:
            question_string_list.append(question['question'])

            image_filepath = os.path.join(self.image_directory, str(int(int(question['image_id'])/1500)+1) + '/' + question['image_id'] + '.png')
            image_filename_list.append(image_filepath)

        question_tokenized_list = self.tokenizer.tokenize_and_pad_list(question_string_list)
        image_filename_list = tf.strings.unicode_decode(image_filename_list, input_encoding='UTF-8')

        question_dataset = tf.data.Dataset.from_tensor_slices(question_tokenized_list)
        image_dataset = tf.data.Dataset.from_tensor_slices(image_filename_list)
        image_dataset = image_dataset.map(process_image_filenames, num_parallel_calls=tf.data.experimental.AUTOTUNE)

        dataset = tf.data.Dataset.zip(((question_dataset, image_dataset), question_dataset))  # Trick to have as input 2 arrays
        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(1)
        dataset = dataset.repeat()
        return dataset

    def get_number_of_samples(self):
        return len(self.questions_list)

    def get_dataset(self, batch_size):
        return self._dataset(self.questions_list, batch_size=batch_size)

## **Data Loading**

In [None]:
def read_train_valid_data(train_questions_path: str, image_dir: str, img_h, img_w, tokenizer, split_seed,
                          batch_size=32):
    data_generator = CustomTrainValidGenerator(train_questions_path, image_dir, (img_h, img_w), tokenizer,
                                               split_seed=split_seed, valid_split=0.2)
    train_dataset = data_generator.get_train_dataset(batch_size)
    valid_dataset = data_generator.get_valid_dataset(batch_size)

    return train_dataset, valid_dataset, data_generator.get_train_samples(), data_generator.get_valid_samples()


def read_test_data(test_questions_path: str, image_dir, img_h, img_w, tokenizer, batch_size=32):
    data_generator = CustomTestGenerator(test_questions_path, image_dir, (img_h, img_w), tokenizer)
    test_dataset = data_generator.get_dataset(batch_size)
    return test_dataset, data_generator.questions_list1

In [None]:
train_questions_path = "/content/drive/My Drive/VQA/train_questions_annotations.json"
test_questions_path = "/content/drive/My Drive/VQA/test_questions.json"
train_image_dir = "/content/drive/My Drive/VQA/Images"
test_image_dir = "/content/drive/My Drive/VQA/Images"

tokenizer = CustomTokenizer(train_questions_path)
train_dataset, valid_dataset, train_samples, valid_samples = read_train_valid_data(train_questions_path,
                                                                                       train_image_dir,
                                                                                       img_h=img_h, img_w=img_w,
                                                                                       batch_size=batch_size,
                                                                                       tokenizer=tokenizer,
                                                                                       split_seed=SEED)

Total questions words: 4643


## **Model**

In [None]:
class LSTM_CNN(object):
    EMBEDDING_SIZE = 50

    def get_image_model(self, img_h, img_w, application_name="vgg16", fine_tuning=True):
        if application_name == "vgg16":
            model = tf.keras.applications.vgg16.VGG16(include_top=False,
                                                      weights='imagenet',
                                                      input_shape=(img_h, img_w, 3),
                                                      pooling="None")
        elif application_name == "resnet50v2":
            model: tf.keras.Model = tf.keras.applications.resnet_v2.ResNet50V2(include_top=False, weights='imagenet',
                                                                               input_shape=(img_h, img_w, 3),
                                                                               pooling="None")
        elif application_name == "inceptionresnetv2":
            model = tf.keras.applications.inception_resnet_v2.InceptionResNetV2(include_top=False, weights='imagenet',
                                                                                input_shape=(img_h, img_w, 3),
                                                                                pooling="None")
        else:
            raise NotImplemented("Transfer from this model is not implemented.")

        if not fine_tuning:
            model.trainable = False
        return model

    def get_question_model(self, question_len, wtoi):
        question_input = tf.keras.layers.Input(shape=question_len)
        lstm_model = tf.keras.layers.Embedding(len(wtoi) + 1, self.EMBEDDING_SIZE,
                                               input_length=question_len)(question_input)
        lstm_model = tf.keras.layers.LSTM(128, return_sequences=True, stateful=False)(lstm_model)
        lstm_model = tf.keras.layers.LSTM(128, return_sequences=False, stateful=False)(lstm_model)
        lstm_model = tf.keras.Model(inputs=question_input, outputs=lstm_model)
        return lstm_model

    def get_model(self, question_len, wtoi, img_h, img_w, batch_size, seed, fine_tuning=True, application_name="vgg16"):
        cnn_model = self.get_image_model(img_h, img_w, fine_tuning=fine_tuning, application_name=application_name)
        output_shape = cnn_model.output_shape
        follow_cnn_model = tf.keras.layers.Reshape(target_shape=(output_shape[1] * output_shape[2], output_shape[3]))(
            cnn_model.output)
        follow_cnn_model = tf.keras.layers.LSTM(units=256, return_state=False)(follow_cnn_model)

        lstm_model = self.get_question_model(question_len, wtoi)

        model = tf.keras.layers.concatenate([follow_cnn_model, lstm_model.output])
        model = tf.keras.layers.Dense(units=256, activation="relu")(model)
        model = tf.keras.layers.Dropout(0.2, seed=seed)(model)
        model = tf.keras.layers.Dense(units=get_number_of_labels(), activation="softmax")(model)

        model = tf.keras.Model(inputs=[lstm_model.input, cnn_model.input], outputs=model)

        model.compile(loss=tf.keras.losses.CategoricalCrossentropy(), metrics=["accuracy"], optimizer="adam")

        return model

In [None]:
model = LSTM_CNN().get_model(100, tokenizer.get_wtoi(), img_h=img_h, img_w=img_w, application_name = "inceptionresnetv2" ,seed=SEED, batch_size = batch_size)
model.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_resnet_v2/inception_resnet_v2_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 200, 350, 3) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 99, 174, 32)  864         input_1[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 99, 174, 32)  96          conv2d[0][0]                     
__________________________________________________________________________________________________
activation (Activation) 

## **Training**

In [None]:
cwd = os.getcwd()
exps_dir = '/content/drive/My Drive/VQA_Weights'
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)
now = datetime.now().strftime('%b%d_%H-%M-%S')
model_name = 'CNN'
exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []
# Model checkpoint
# ----------------

ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)
ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
#callbacks.append(ckpt_callback)

In [None]:
model.fit(x=train_dataset, epochs=30, steps_per_epoch=math.ceil(train_samples / batch_size),
          callbacks=callbacks, use_multiprocessing=True, validation_data=valid_dataset, 
          validation_steps=math.ceil(valid_samples / batch_size))

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<tensorflow.python.keras.callbacks.History at 0x7f9a721190f0>

## **Prediction**

In [None]:
model.load_weights('/content/drive/My Drive/VQA_Weights/CNN_Jan18_08-15-09/ckpts/cp_09.ckpt')

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f407751c828>

In [None]:
def create_csv(results, results_dir='./'):
    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:
        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')


def predict_submissions(model, test_dataset, questions_list, batch_size, result_dir="./", ):

    predictions = model.predict(x=test_dataset, steps=math.ceil(len(questions_list) / batch_size), verbose=1)
    predicted_class = np.argmax(predictions, axis=1)
    
    question_id_list = [k for k,v in questions_list.items()]

    results = dict(zip(question_id_list, predicted_class))
    create_csv(results, results_dir=result_dir)

    print("Wrote file csv")

In [None]:
    test_dataset, questions_list = read_test_data(test_questions_path, test_image_dir, img_h=img_h, img_w=img_w,
                                                  tokenizer=tokenizer, batch_size=batch_size)
    predict_submissions(model, test_dataset, questions_list, batch_size)

Wrote file csv
