# Homework 3 - Visual Question Answering
The notebook is divided into several sections:

* Setup - Importing libraries, defining the create_csv function, mounting Drive and unzipping the dataset in the proper Drive directory. Indeed, the notebook was created using the Drive integration with Colab, therefore the main directory is the folder /AN2DL/VisualQuestionAnswering, which was created in advance with the dataset in it.
* Preparing the data - The training set and the validation set are prepared, preprocessing images and creating the Datasets objects to be used by the models.
* Models:
  * First Model
  * Second Model
  * Third Model (VGG-16)

  In each model section, the datasets are created, the architecture is defined, the optimization parameters are set, the callbacks are created, the model is trained and finally the predictions on the test set are computed, exporting the results in a csv format.

# Setup

In [None]:
# Importing the necessary libraries and setting the seed(s) to make the code replicable
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import os
import tensorflow as tf
import numpy as np
from PIL import Image
from datetime import datetime
import json

SEED = 1234
tf.random.set_seed(SEED)
np.random.seed(SEED)

In [None]:
# Defining the create_csv function, which will be used to export the prediction results on the test set
def create_csv(results, results_dir='./'):

  csv_fname = 'results_'
  csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

  with open(os.path.join(results_dir, csv_fname), 'w') as f:

    f.write('Id,Category\n')

    for key, value in results.items():
      f.write(key + ',' + str(value) + '\n') 

In [None]:
# Mounting Drive to Colab, as the Drive folder /AN2DL/VisualQuestionAnswering is the main directory for this homework
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Unzipping the dataset (named "anndl-2020-vqa.zip"), which has to be previously put in the homework directory
!unzip '/content/drive/My Drive/AN2DL/VisualQuestionAnswering/anndl-2020-vqa.zip'

# Saving the directories for the dataset, the training set and the test set (to be used later)
cwd = os.getcwd()                                                               # This is the current working directory, in which the dataset has been unzipped
dataset_dir = os.path.join(cwd, 'VQA_Dataset')                                  # This is the dataset directory, which contains the Images folders, along with the training and test questions json
images_dir = os.path.join(dataset_dir, 'Images')                                # This is the directory which contains the images for the homework

# Answers labels
A simple dictionary mapping each possible answer to an integer number.

In [None]:
# This dictionary allows to map each answer (target) of the training set into its corresponding integer
# It is exactly the one reported on the homework page on Kaggle
labels_dict = {'0': 0,
               '1': 1,
               '2': 2,
               '3': 3,
               '4': 4,
               '5': 5,
               'apple': 6,
               'baseball': 7,
               'bench': 8,
               'bike': 9,
               'bird': 10,
               'black': 11,
               'blanket': 12,
               'blue': 13,
               'bone': 14,
               'book': 15,
               'boy': 16,
               'brown': 17,
               'cat': 18,
               'chair': 19,
               'couch': 20,
               'dog': 21,
               'floor': 22,
               'food': 23,
               'football': 24,
               'girl': 25,
               'grass': 26,
               'gray': 27,
               'green': 28,
               'left': 29,
               'log': 30,
               'man': 31,
               'monkey bars': 32,
               'no': 33,
               'nothing': 34,
               'orange': 35,
               'pie': 36,
               'plant': 37,
               'playing': 38,
               'red': 39,
               'right': 40,
               'rug': 41,
               'sandbox': 42,
               'sitting': 43,
               'sleeping': 44,
               'soccer': 45,
               'squirrel': 46,
               'standing': 47,
               'stool': 48,
               'sunny': 49,
               'table': 50,
               'tree': 51,
               'watermelon': 52,
               'white': 53,
               'wine': 54,
               'woman': 55,
               'yellow': 56,
               'yes': 57}

# Data Preparation (First alternative - Standard)
Each training json item is split into three different lists (questions, image IDs, answers). Then, questions are tokenized and padded so that they all have the same length. Additionally, answers are converted into integers using the previously defined dictionary. Finally, the lists are split to create separate training and validation data.

In [None]:
# The training json is loaded.
# Then, all the questions are put in a list, all the image IDs in another list, and all the answers (targets) in another one.
question_input = []
image_ids = []
output = []

with open('/content/VQA_Dataset/train_questions_annotations.json') as json_file:
  training_json = json.load(json_file)
  training_json_list = list(training_json.values())

  for index, item in enumerate(training_json_list):
    question_input.append(training_json_list[index]['question'])
    image_ids.append(training_json_list[index]['image_id'])
    output.append(training_json_list[index]['answer']) 

In [None]:
# The questions are passed through a Tokenizer to convert words to integers.
# Then, padding is applied so that they all have the same length (max_question_length).
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Tokenization
question_tokenizer = Tokenizer()
question_tokenizer.fit_on_texts(question_input)
question_tokenized = question_tokenizer.texts_to_sequences(question_input)

question_wtoi = question_tokenizer.word_index
print('Unique words in the questions:', len(question_wtoi))

max_question_length = max(len(sentence) for sentence in question_tokenized)
print('Maximum questions length:', max_question_length)

# Padding
question_encoder_inputs = pad_sequences(question_tokenized, maxlen=max_question_length)
print("Shape of the padded questions:", question_encoder_inputs.shape)

In [None]:
# Converting answers into labels, using the dictionary previously defined
output_number = []

for o in output:
  output_number.append(labels_dict[o])

In [None]:
# Splitting tokenized+padded questions, image IDs and answers (targets) labels into training and validation sets.
# In particular, image IDs are written into two separate txt files, to be used later in the CustomDataset object.
# The split ratio used here is 70% for training and 30% for validation.
train_question_input = []
valid_question_input = []
train_images_names = open('/content/VQA_Dataset/train_images_names.txt', 'a')
valid_images_names = open('/content/VQA_Dataset/valid_images_names.txt', 'a')
train_output = []
valid_output = []

counter = 1
for item in question_encoder_inputs:
  if (counter < 0.7 * len(question_encoder_inputs)):
    train_question_input.append(item)
  else:
    valid_question_input.append(item)
  counter = counter + 1

counter = 1
for item in output_number:
  if (counter < 0.7 * len(output_number)):
    train_output.append(item)
  else:
    valid_output.append(item)
  counter = counter + 1

counter = 1
for item in image_ids:
  if (counter < 0.7 * len(image_ids)):
    _ = train_images_names.write(item)
    if (counter < (0.7 * len(image_ids)) - 1):
      _ = train_images_names.write('\n')
  else:
    _ = valid_images_names.write(item)
    if (counter < len(image_ids)):
      _ = valid_images_names.write('\n')
  counter = counter + 1

train_images_names.close()
valid_images_names.close()

# Data Preparation (Second alternative - Split by category)
Training json items are divided according to the category they belong to, after having converted answers into integers using the previously defined dictionary. Then, each item is split into three different lists (for each category, for a total of nine lists) to get questions, image IDs and answers. Finally, questions are tokenized and padded so they all have the same length and everything is split into training and validation data, respecting the category distribution.

In [None]:
# The training json is loaded and its data is converted into a list for easier use
with open('/content/VQA_Dataset/train_questions_annotations.json') as json_file:
  training_json = json.load(json_file)
  training_json = list(training_json.values())

In [None]:
# Converting every answer in the training list into the corresponding integer, according to the dictionary previously defined
for item in training_json:
  item['answer'] = labels_dict[item['answer']]

In [None]:
# Dividing the items in the training list into the three categories (yes/no, counting, other)
yes_no_items = []
counting_items = []
other_items = []

for item in training_json:
  if (item['answer'] == 33 or item['answer'] == 57):
    yes_no_items.append(item)
  elif (item['answer'] <= 5):
    counting_items.append(item)
  else:
    other_items.append(item)

In [None]:
# Shuffling the three category lists using a fixed seed
import random

random.Random(SEED).shuffle(yes_no_items)
random.Random(SEED).shuffle(counting_items)
random.Random(SEED).shuffle(other_items)

In [None]:
# Dividing each category list into questions, image IDs and answers lists
yes_no_questions = []
yes_no_image_ids = []
yes_no_answers = []
counting_questions = []
counting_image_ids = []
counting_answers = []
other_questions = []
other_image_ids = []
other_answers = []

for item in yes_no_items:
  yes_no_questions.append(item['question'])
  yes_no_image_ids.append(item['image_id'])
  yes_no_answers.append(item['answer'])

for item in counting_items:
  counting_questions.append(item['question'])
  counting_image_ids.append(item['image_id'])
  counting_answers.append(item['answer'])

for item in other_items:
  other_questions.append(item['question'])
  other_image_ids.append(item['image_id'])
  other_answers.append(item['answer'])

In [None]:
# Splitting data into training and validation, using a 70/30 split ratio.
# Data distribution is maintained, since 70% of each category is put in the training set and 30% in the validation set.
TRAIN_PERC = 0.7

train_questions = []
valid_questions = []
train_image_ids = []
valid_image_ids = []
train_answers = []
valid_answers = []

# Training questions
train_questions.extend(yes_no_questions[:round(TRAIN_PERC * len(yes_no_questions))])
train_questions.extend(counting_questions[:round(TRAIN_PERC * len(counting_questions))])
train_questions.extend(other_questions[:round(TRAIN_PERC * len(other_questions))])

# Validation questions
valid_questions.extend(yes_no_questions[round(TRAIN_PERC * len(yes_no_questions)):])
valid_questions.extend(counting_questions[round(TRAIN_PERC * len(counting_questions)):])
valid_questions.extend(other_questions[round(TRAIN_PERC * len(other_questions)):])

# Training answers (targets)
train_answers.extend(yes_no_answers[:round(TRAIN_PERC * len(yes_no_answers))])
train_answers.extend(counting_answers[:round(TRAIN_PERC * len(counting_answers))])
train_answers.extend(other_answers[:round(TRAIN_PERC * len(other_answers))])

# Validation answers (targets)
valid_answers.extend(yes_no_answers[round(TRAIN_PERC * len(yes_no_answers)):])
valid_answers.extend(counting_answers[round(TRAIN_PERC * len(counting_answers)):])
valid_answers.extend(other_answers[round(TRAIN_PERC * len(other_answers)):])

# Training image IDs
train_image_ids.extend(yes_no_image_ids[:round(TRAIN_PERC * len(yes_no_image_ids))])
train_image_ids.extend(counting_image_ids[:round(TRAIN_PERC * len(counting_image_ids))])
train_image_ids.extend(other_image_ids[:round(TRAIN_PERC * len(other_image_ids))])

# Validation image IDs
valid_image_ids.extend(yes_no_image_ids[round(TRAIN_PERC * len(yes_no_image_ids)):])
valid_image_ids.extend(counting_image_ids[round(TRAIN_PERC * len(counting_image_ids)):])
valid_image_ids.extend(other_image_ids[round(TRAIN_PERC * len(other_image_ids)):])

# Writing training image IDs into a txt file, to be used by the CustomDataset later
with open('/content/VQA_Dataset/train_images_names.txt', 'a') as train_images_txt:
  for idx, item in enumerate(train_image_ids, start=1):
    _ = train_images_txt.write(item)
    if (idx < len(train_image_ids)):
      _ = train_images_txt.write("\n")

# Writing validation image IDs into a txt file, to be used by the CustomDataset later
with open('/content/VQA_Dataset/valid_images_names.txt', 'a') as valid_images_txt:
  for idx, item in enumerate(valid_image_ids, start=1):
    _ = valid_images_txt.write(item)
    if (idx < len(valid_image_ids)):
      _ = valid_images_txt.write("\n")

In [None]:
# The questions are put together into a single list and passed through a Tokenizer to convert words to integers.
# Then, padding is applied so that they all have the same length (max_question_length).
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Merging training questions and validation questions into a single list
question_input = []
question_input.extend(train_questions)
question_input.extend(valid_questions)

# Tokenization
question_tokenizer = Tokenizer()
question_tokenizer.fit_on_texts(question_input)
question_tokenized = question_tokenizer.texts_to_sequences(question_input)

question_wtoi = question_tokenizer.word_index
print('Unique words in the questions:', len(question_wtoi))

max_question_length = max(len(sentence) for sentence in question_tokenized)
print('Maximum questions length:', max_question_length)

# Padding
question_encoder_inputs = pad_sequences(question_tokenized, maxlen=max_question_length)
print("Shape of the padded questions:", question_encoder_inputs.shape)

In [None]:
# Splitting the tokenized+padded questions into training and validation questions, using the same split ratio as before.
# Additionally, training and validation answers are renamed just to avoid modifying the dataset structure.
train_question_input = []
valid_question_input = []
train_output = train_answers
valid_output = valid_answers

train_question_input = question_encoder_inputs[:round(TRAIN_PERC * len(question_encoder_inputs))]
valid_question_input = question_encoder_inputs[round(TRAIN_PERC * len(question_encoder_inputs)):]

# Custom Dataset

In [None]:
# A CustomDataset object is defined, to be used later in each model.
# It selects the right data (training/validation) as specified in the input.
# Additionally, the input parameters must include the right questions and answers (targets) prepared before, according to the selected subset.
# A proper preprocessing function can also be included.
# The output of the CustomDataset is composed by a tuple ((image, question), answer).

class CustomDataset(tf.keras.utils.Sequence):

  def __init__(self, dataset_dir, which_subset, train_question_input=None, valid_question_input=None, train_output=None, valid_output=None, preprocessing_function=None, out_shape=None):
    if which_subset == 'training':
      subset_file = os.path.join(dataset_dir, 'train_images_names.txt')
      self.question_input = train_question_input
      self.output = train_output
    elif which_subset == 'validation':
      subset_file = os.path.join(dataset_dir, 'valid_images_names.txt')
      self.question_input = valid_question_input
      self.output = valid_output
    
    with open(subset_file, 'r') as f:
      lines = f.readlines()
    
    subset_filenames = []
    for line in lines:
      subset_filenames.append(line.strip()) 

    self.which_subset = which_subset
    self.dataset_dir = dataset_dir
    self.subset_filenames = subset_filenames
    self.preprocessing_function = preprocessing_function
    self.out_shape = out_shape

  def __len__(self):
    return len(self.subset_filenames)

  def __getitem__(self, index):
    curr_filename = self.subset_filenames[index]
    img = Image.open(os.path.join(self.dataset_dir, 'Images', curr_filename + '.png')).convert('RGB').resize(self.out_shape)
    img_arr = np.array(img)

    curr_question = self.question_input[index]
    curr_output = self.output[index]
    
    if self.preprocessing_function is not None:
      img_arr = self.preprocessing_function(img_arr)

    return ((img_arr, curr_question), curr_output)

# First Model

In [None]:
# Creating the CustomDataset object for the model, followed by the actual dataset that will be used for training

# Image dimensions to work with
img_h = 256
img_w = 256

# Batch size
bs = 32

# Training Dataset
dataset = CustomDataset('/content/VQA_Dataset', 'training', train_question_input=train_question_input, train_output=train_output, out_shape=[img_w, img_h])

train_dataset = tf.data.Dataset.from_generator(lambda: dataset,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

train_dataset = train_dataset.batch(bs)                                                          
train_dataset = train_dataset.repeat()

# Validation Dataset
dataset_valid = CustomDataset('/content/VQA_Dataset', 'validation', valid_question_input=valid_question_input, valid_output=valid_output, out_shape=[img_w, img_h])

valid_dataset = tf.data.Dataset.from_generator(lambda: dataset_valid,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

valid_dataset = valid_dataset.batch(bs)
valid_dataset = valid_dataset.repeat()

In [None]:
# Importing Keras libraries for easier use
from keras.layers import Conv2D, MaxPooling2D, Flatten
from keras.layers import Input, LSTM, Embedding, Dense
from keras.models import Model, Sequential

# Defining the CNN part, which deals with the image
# It is composed by two convolutional layers using 64 3x3 filters, a 2x2 MaxPooling, two convolutional layers using 128 3x3 filters, a MaxPooling,
# three convolutional layers using 256 3z3 filters, a MaxPooling, and the final Flatten layer
vision_model = Sequential()
vision_model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(img_h, img_w, 3)))
vision_model.add(Conv2D(64, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(128, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Flatten())

image_input = Input(shape=(img_h, img_w, 3))
encoded_image = vision_model(image_input)

# Defining the RNN part, which deals with the question
# It is composed by an Enbedding of the question, followed by a single LSTM layer with 256 units
question_input_model = Input(shape=[max_question_length], dtype='int32')
embedded_question = Embedding(input_dim=len(question_wtoi)+1, output_dim=256, input_length=max_question_length)(question_input_model)
encoded_question = LSTM(256)(embedded_question)

# Comcatenating the CNN and the RNN parts to get the output
merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
output_model = Dense(len(labels_dict), activation='softmax')(merged)
vqa_model = Model(inputs=[image_input, question_input_model], outputs=output_model)

# Printing out a summary of the network
vqa_model.summary()

In [None]:
# Optimization parameters

# Loss function
loss = tf.keras.losses.SparseCategoricalCrossentropy()

# Learning rate and Optimizer
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

# Validation metrics
metrics = ['accuracy']

# Compile Model
vqa_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
# Setting up the callbacks and Early Stopping
# The purpose of this piece of code is to create a "vqa_experiments" folder inside the directory of this homework (if not already created).
# Inside it, it creates a folder called "VQA_" followed by the date and the time of execution, to recognize the experiment.
# Then, it sets up the callback for the training of the model, saving the model weights after each epoch inside the previously mentioned folder, only if the model improved in accuracy on the Validation set.
# Moreover, Ealy Stopping is also inserted in the callback, to monitor the loss on the Validation set and to stop the training procedure if it becomes worse for "patience" steps.
# Finally, the model is fitted using the training and validation data defined before.

# Creating the "multiclass_segmentation_experiments" folder if not already created
exps_dir = os.path.join(cwd, 'drive/My Drive/AN2DL/VisualQuestionAnswering/', 'vqa_experiments')
if not os.path.exists(exps_dir):
  os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

# Creating the folder in which the model weights will be saved
model_name = 'VQA'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
  os.makedirs(exp_dir)

# Setting up the callback to save the model weights after each epoch only if there is an improvement in term of validation accuracy  
callbacks = []

ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
  os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, 
                                                   monitor='val_accuracy',
                                                   mode='max',
                                                   verbose=0,
                                                   save_best_only=True,
                                                   save_weights_only=True)
callbacks.append(ckpt_callback)

# Early Stopping is inserted in the callback, stopping the training procedure if the validation loss increases for too long
early_stop = True
if early_stop:
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
  callbacks.append(es_callback)

# Fitting the model
# It can go on up to 100 epochs, but the Early Stopping callback explained before allows to stop much earlier.
vqa_model.fit(train_dataset,
              epochs=100,
              steps_per_epoch=(len(dataset) // bs),
              validation_data=valid_dataset,
              validation_steps=(len(dataset_valid) // bs),
              callbacks=callbacks)

In [None]:
# Loading the best weights of the trained model
full_path = os.path.join('/content/drive/My Drive/AN2DL/VisualQuestionAnswering/vqa_experiments', exp_dir)
latest = tf.train.latest_checkpoint(full_path)
vqa_model.load_weights(latest)

In [None]:
# Checking how the model predictions on the validation set
import time
from matplotlib import cm
import matplotlib.pyplot as plt
%matplotlib inline

iterator = iter(valid_dataset)

In [None]:
# Visualizing a target for an item in the validation set and its corresponding model prediction
(image, question), target = next(iterator)

image = image[0]
question = question[0]
target = target[0]

x = tf.expand_dims(image,0)
y = tf.expand_dims(question, 0)
out_sigmoid = vqa_model.predict([x,y])
predicted_class = tf.argmax(out_sigmoid, -1)

print("Target:")
tf.print(target)
print("Prediction:")
print(predicted_class.numpy()[0])

In [None]:
# Loading the json file related to the test set, saving the items IDs, the questions and the images IDs in some lists
id_test = []
question_input_test = []
image_ids_test = []

with open('/content/VQA_Dataset/test_questions.json') as json_file:
  test_json = json.load(json_file)                                         

  for item in test_json:
    id_test.append(item)
    question_input_test.append(test_json[item]['question'])
    image_ids_test.append(test_json[item]['image_id'])

In [None]:
# Using the same Tokenizer of the training part to convert the words in the questions to integers
question_tokenized_test = question_tokenizer.texts_to_sequences(question_input_test)

# Padding to the same max question length used for training
question_encoder_inputs_test = pad_sequences(question_tokenized_test, maxlen=max_question_length)

In [None]:
# Computing the predictions on the test set, giving each image with the related question to the model and computing the argmax of the output
# In particular, each image is first resized to the same dimensions used during training
# The predicted output is then saved into the results_test list
results_test = []

for index, question in enumerate(question_encoder_inputs_test):
  img_test = Image.open(os.path.join(images_dir, image_ids_test[index] + '.png')).convert('RGB').resize((img_w, img_h))
  img_arr_test = np.array(img_test)
  img_arr_test = preprocess_input(img_arr_test)
  img_arr_test = tf.expand_dims(img_arr_test, 0)
  
  question = tf.expand_dims(question, 0)
  
  prediction = vqa_model.predict([img_arr_test, question])
  results_test.append(tf.argmax(prediction, -1))

In [None]:
# Creating the dictionary to be written in the output csv file
# It will contain the ID of each test item as the key, and the corresponding predicted output as the value
dictionary = {}

for index, id in enumerate(id_test):
  dictionary[id] = results_test[index].numpy()[0]

# Exporting the dictionary created into a csv file
create_csv(dictionary, '/content/drive/My Drive/AN2DL/VisualQuestionAnswering/')

# Second Model

In [None]:
# Creating the CustomDataset object for the model, followed by the actual dataset that will be used for training

# Image dimensions to work with
img_h = 256
img_w = 256

# Batch size
bs = 32

# Training Dataset
dataset = CustomDataset('/content/VQA_Dataset', 'training', train_question_input=train_question_input, train_output=train_output, out_shape=[img_w, img_h])

train_dataset = tf.data.Dataset.from_generator(lambda: dataset,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

train_dataset = train_dataset.batch(bs)                                                          
train_dataset = train_dataset.repeat()

# Validation Dataset
dataset_valid = CustomDataset('/content/VQA_Dataset', 'validation', valid_question_input=valid_question_input, valid_output=valid_output, out_shape=[img_w, img_h])

valid_dataset = tf.data.Dataset.from_generator(lambda: dataset_valid,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

valid_dataset = valid_dataset.batch(bs)
valid_dataset = valid_dataset.repeat()

In [None]:
# Importing Keras libraries for easier use
from keras.layers import Conv2D, MaxPooling2D, Flatten
from keras.layers import Input, LSTM, Embedding, Dense
from keras.models import Model, Sequential

# Defining the CNN part, which deals with the image
# It is composed by two convolutional layers using 64 3x3 filters, a 2x2 MaxPooling, two convolutional layers using 128 3x3 filters, a MaxPooling,
# three convolutional layers using 256 3z3 filters, a MaxPooling, and the final Flatten layer
vision_model = Sequential()
vision_model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(img_h, img_w, 3)))
vision_model.add(Conv2D(64, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(128, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Flatten())

image_input = Input(shape=(img_h, img_w, 3))
encoded_image = vision_model(image_input)

# Defining the RNN part, which deals with the question
# It is composed by an Enbedding of the question, followed by a three LSTM layers, each one with 256 units
question_input_model = Input(shape=[max_question_length], dtype='int32')
embedded_question = Embedding(input_dim=len(question_wtoi)+1, output_dim=256, input_length=max_question_length)(question_input_model)
encoded_question_1 = LSTM(256, return_sequences=True)(embedded_question)
encoded_question_2 = LSTM(256, return_sequences=True)(encoded_question_1)
encoded_question_3 = LSTM(256)(encoded_question_2)

# Comcatenating the CNN and the RNN parts to get the output
merged = tf.keras.layers.concatenate([encoded_question_3, encoded_image])
output_model = Dense(len(labels_dict), activation='softmax')(merged)
vqa_model_2 = Model(inputs=[image_input, question_input_model], outputs=output_model)

# Printing out a summary of the network
vqa_model_2.summary()

In [None]:
# Optimization parameters

# Loss function
loss = tf.keras.losses.SparseCategoricalCrossentropy()

# Learning rate and Optimizer
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

# Validation metrics
metrics = ['accuracy']

# Compile Model
vqa_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
# Setting up the callbacks and Early Stopping
# The purpose of this piece of code is to create a "vqa_experiments" folder inside the directory of this homework (if not already created).
# Inside it, it creates a folder called "VQA_2_" followed by the date and the time of execution, to recognize the experiment.
# Then, it sets up the callback for the training of the model, saving the model weights after each epoch inside the previously mentioned folder, only if the model improved in accuracy on the Validation set.
# Moreover, Ealy Stopping is also inserted in the callback, to monitor the loss on the Validation set and to stop the training procedure if it becomes worse for "patience" steps.
# Finally, the model is fitted using the training and validation data defined before.

# Creating the "multiclass_segmentation_experiments" folder if not already created
exps_dir = os.path.join(cwd, 'drive/My Drive/AN2DL/VisualQuestionAnswering/', 'vqa_experiments')
if not os.path.exists(exps_dir):
  os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

# Creating the folder in which the model weights will be saved
model_name = 'VQA_2'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
  os.makedirs(exp_dir)

# Setting up the callback to save the model weights after each epoch only if there is an improvement in term of validation accuracy
callbacks = []

ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
  os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, 
                                                   monitor='val_accuracy',
                                                   mode='max',
                                                   verbose=0,
                                                   save_best_only=True,
                                                   save_weights_only=True)
callbacks.append(ckpt_callback)

# Early Stopping is inserted in the callback, stopping the training procedure if the validation loss increases for too long
early_stop = True
if early_stop:
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8)
  callbacks.append(es_callback)

# Fitting the model
# It can go on up to 100 epochs, but the Early Stopping callback explained before allows to stop much earlier.
vqa_model_2.fit(train_dataset,
                epochs=100,
                steps_per_epoch=(len(dataset) // bs),
                validation_data=valid_dataset,
                validation_steps=(len(dataset_valid) // bs),
                callbacks=callbacks)

In [None]:
# Loading the best weights of the trained model
full_path = os.path.join('/content/drive/My Drive/AN2DL/VisualQuestionAnswering/vqa_experiments', exp_dir)
latest = tf.train.latest_checkpoint(full_path)
vqa_model_2.load_weights(latest)

In [None]:
# Checking how the model predictions on the validation set
import time
from matplotlib import cm
import matplotlib.pyplot as plt
%matplotlib inline

iterator = iter(valid_dataset)

In [None]:
# Visualizing a target for an item in the validation set and its corresponding model prediction
(image, question), target = next(iterator)

image = image[0]
question = question[0]
target = target[0]

x = tf.expand_dims(image,0)
y = tf.expand_dims(question, 0)
out_sigmoid = vqa_model_2.predict([x,y])
predicted_class = tf.argmax(out_sigmoid, -1)

print("Target:")
tf.print(target)
print("Prediction:")
print(predicted_class.numpy()[0])

In [None]:
# Loading the json file related to the test set, saving the items IDs, the questions and the images IDs in some lists
id_test = []
question_input_test = []
image_ids_test = []

with open('/content/VQA_Dataset/test_questions.json') as json_file:
  test_json = json.load(json_file)                                         

  for item in test_json:
    id_test.append(item)
    question_input_test.append(test_json[item]['question'])
    image_ids_test.append(test_json[item]['image_id'])

In [None]:
# Using the same Tokenizer of the training part to convert the words in the questions to integers
question_tokenized_test = question_tokenizer.texts_to_sequences(question_input_test)

# Padding to the same max question length used for training
question_encoder_inputs_test = pad_sequences(question_tokenized_test, maxlen=max_question_length)

In [None]:
# Computing the predictions on the test set, giving each image with the related question to the model and computing the argmax of the output
# In particular, each image is first resized to the same dimensions used during training
# The predicted output is then saved into the results_test list
results_test = []

for index, question in enumerate(question_encoder_inputs_test):
  img_test = Image.open(os.path.join(images_dir, image_ids_test[index] + '.png')).convert('RGB').resize((img_w, img_h))
  img_arr_test = np.array(img_test)
  img_arr_test = preprocess_input(img_arr_test)
  img_arr_test = tf.expand_dims(img_arr_test, 0)
  
  question = tf.expand_dims(question, 0)
  
  prediction = vqa_model_2.predict([img_arr_test, question])
  results_test.append(tf.argmax(prediction, -1))

In [None]:
# Creating the dictionary to be written in the output csv file
# It will contain the ID of each test item as the key, and the corresponding predicted output as the value
dictionary = {}

for index, id in enumerate(id_test):
  dictionary[id] = results_test[index].numpy()[0]

# Exporting the dictionary created into a csv file
create_csv(dictionary, '/content/drive/My Drive/AN2DL/VisualQuestionAnswering/')

# Third Model (using VGG)

In [None]:
# Creating the CustomDataset object for the model, followed by the actual dataset that will be used for training
from tensorflow.keras.applications.vgg16 import preprocess_input

# Image dimensions to work with
img_h = 400
img_w = 700

# Batch size
bs = 16

# Training Dataset
dataset = CustomDataset('/content/VQA_Dataset', 'training', train_question_input=train_question_input, train_output=train_output, preprocessing_function=preprocess_input, out_shape=[img_w, img_h])

train_dataset = tf.data.Dataset.from_generator(lambda: dataset,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

train_dataset = train_dataset.batch(bs)                                                          
train_dataset = train_dataset.repeat()

# Validation Dataset
dataset_valid = CustomDataset('/content/VQA_Dataset', 'validation', valid_question_input=valid_question_input, valid_output=valid_output, preprocessing_function=preprocess_input, out_shape=[img_w, img_h])

valid_dataset = tf.data.Dataset.from_generator(lambda: dataset_valid,
                                               output_types=((tf.float32, tf.int32), tf.int32),
                                               output_shapes=(([img_h, img_w, 3], [max_question_length]), ()))

valid_dataset = valid_dataset.batch(bs)
valid_dataset = valid_dataset.repeat()

In [None]:
# Importing the VGG-16 architecture, without the top part
vgg = tf.keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))

# Setting the Fine-Tuning parameter
finetuning = True

if finetuning:
  freeze_until = 13
  for layer in vgg.layers[:freeze_until]:
    layer.trainable = False
else:
  vgg.trainable = False

In [None]:
# Importing Keras libraries for easier use
from keras.layers import Conv2D, MaxPooling2D, Flatten
from keras.layers import Input, LSTM, Embedding, Dense
from keras.models import Model, Sequential

# Defining the CNN part, which deals with the image
# It consists in the VGG-16 architecture imported before, followed of course by a Flatten layer
vision_model = Sequential()
vision_model.add(vgg)
vision_model.add(Flatten())

image_input = Input(shape=(img_h, img_w, 3))
encoded_image = vision_model(image_input)

# Defining the RNN part, which deals with the question
# It consists in an Embedding, followed by three LSTM layers, each one with 256 units
question_input_model = Input(shape=[max_question_length], dtype='int32')
embedded_question = Embedding(input_dim=len(question_wtoi)+1, output_dim=256, input_length=max_question_length)(question_input_model)
encoded_question_1 = LSTM(256, return_sequences=True)(embedded_question)
encoded_question_2 = LSTM(256, return_sequences=True)(encoded_question_1)
encoded_question_3 = LSTM(256)(encoded_question_2)

# Comcatenating the CNN and the RNN parts to get the output
merged = tf.keras.layers.concatenate([encoded_question_3, encoded_image])
output_model = Dense(len(labels_dict), activation='softmax')(merged)
vqa_model_3 = Model(inputs=[image_input, question_input_model], outputs=output_model)

# Printing out a summary of the network
vqa_model_3.summary()

In [None]:
# Optimization parameters

# Loss function
loss = tf.keras.losses.SparseCategoricalCrossentropy()

# Learning rate and Optimizer
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

# Validation metrics
metrics = ['accuracy']

# Compile Model
vqa_model_3.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
# Setting up the callbacks and Early Stopping
# The purpose of this piece of code is to create a "vqa_experiments" folder inside the directory of this homework (if not already created).
# Inside it, it creates a folder called "VQA_3_" followed by the date and the time of execution, to recognize the experiment.
# Then, it sets up the callback for the training of the model, saving the model weights after each epoch inside the previously mentioned folder, only if the model improved in accuracy on the Validation set.
# Moreover, Ealy Stopping is also inserted in the callback, to monitor the loss on the Validation set and to stop the training procedure if it becomes worse for "patience" steps.
# Finally, the model is fitted using the training and validation data defined before.

# Creating the "multiclass_segmentation_experiments" folder if not already created
exps_dir = os.path.join(cwd, 'drive/My Drive/AN2DL/VisualQuestionAnswering/', 'vqa_experiments')
if not os.path.exists(exps_dir):
  os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

# Creating the folder in which the model weights will be saved
model_name = 'VQA_3'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
  os.makedirs(exp_dir)

# Setting up the callback to save the model weights after each epoch only if there is an improvement in term of validation accuracy    
callbacks = []

ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
  os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, 
                                                   monitor='val_accuracy',
                                                   mode='max',
                                                   verbose=0,
                                                   save_best_only=True,
                                                   save_weights_only=True)
callbacks.append(ckpt_callback)

# Early Stopping is inserted in the callback, stopping the training procedure if the validation loss increases for too long
early_stop = True
if early_stop:
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
  callbacks.append(es_callback)

# Fitting the model
# It can go on up to 100 epochs, but the Early Stopping callback explained before allows to stop much earlier.
vqa_model_3.fit(train_dataset,
                epochs=100,
                steps_per_epoch=(len(dataset) // bs),
                validation_data=valid_dataset,
                validation_steps=(len(dataset_valid) // bs),
                callbacks=callbacks)


In [None]:
# Loading the best weights of the trained model
full_path = os.path.join('/content/drive/My Drive/AN2DL/VisualQuestionAnswering/vqa_experiments', exp_dir)
latest = tf.train.latest_checkpoint(full_path)
vqa_model_3.load_weights(latest)

In [None]:
# Checking how the model predictions on the validation set
import time
from matplotlib import cm
import matplotlib.pyplot as plt
%matplotlib inline

iterator = iter(valid_dataset)

In [None]:
# Visualizing a target for an item in the validation set and its corresponding model prediction
(image, question), target = next(iterator)

image = image[0]
question = question[0]
target = target[0]

x = tf.expand_dims(image,0)
y = tf.expand_dims(question, 0)
out_sigmoid = vqa_model_3.predict([x,y])
predicted_class = tf.argmax(out_sigmoid, -1)

print("Target:")
tf.print(target)
print("Prediction:")
print(predicted_class.numpy()[0])

In [None]:
# Loading the json file related to the test set, saving the items IDs, the questions and the images IDs in some lists
id_test = []
question_input_test = []
image_ids_test = []

with open('/content/VQA_Dataset/test_questions.json') as json_file:
  test_json = json.load(json_file)                                         

  for item in test_json:
    id_test.append(item)
    question_input_test.append(test_json[item]['question'])
    image_ids_test.append(test_json[item]['image_id'])

In [None]:
# Using the same Tokenizer of the training part to convert the words in the questions to integers
question_tokenized_test = question_tokenizer.texts_to_sequences(question_input_test)

# Padding to the same max question length used for training
question_encoder_inputs_test = pad_sequences(question_tokenized_test, maxlen=max_question_length)

In [None]:
# Computing the predictions on the test set, giving each image with the related question to the model and computing the argmax of the output
# In particular, each image is first resized to the same dimensions used during training
# The predicted output is then saved into the results_test list
results_test = []

for index, question in enumerate(question_encoder_inputs_test):
  img_test = Image.open(os.path.join(images_dir, image_ids_test[index] + '.png')).convert('RGB').resize((img_w, img_h))
  img_arr_test = np.array(img_test)
  img_arr_test = preprocess_input(img_arr_test)
  img_arr_test = tf.expand_dims(img_arr_test, 0)
  
  question = tf.expand_dims(question, 0)
  
  prediction = vqa_model_3.predict([img_arr_test, question])
  results_test.append(tf.argmax(prediction, -1))

In [None]:
# Creating the dictionary to be written in the output csv file
# It will contain the ID of each test item as the key, and the corresponding predicted output as the value
dictionary = {}

for index, id in enumerate(id_test):
  dictionary[id] = results_test[index].numpy()[0]

# Exporting the dictionary created into a csv file
create_csv(dictionary, '/content/drive/My Drive/AN2DL/VisualQuestionAnswering/')