# INTRO

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import os
import tensorflow as tf
import numpy as np
import json


# Set the seed for random operations. 
# This let our experiments to be reproducible. 
SEED = 1234
tf.random.set_seed(SEED)
np.random.seed(SEED)

# Get current working directory
cwd = os.getcwd()

from google.colab import drive
drive.mount('/content/drive/')

data_dir=r"/content/drive/MyDrive/anndl-2020-vqa.zip (Unzipped Files)/VQA_Dataset"
image_dir=r"/content/drive/MyDrive/anndl-2020-vqa.zip (Unzipped Files)/VQA_Dataset/Images"

os.listdir(data_dir)


Mounted at /content/drive/


['Images', 'test_questions.json', 'train_questions_annotations.json', 'Splits']

# PREPROCESSING TEXT

In [None]:
question_sentences = []
answer = []
question_sentences_test = []


f=open(os.path.join(data_dir, 'train_questions_annotations.json'), encoding='utf-8')
g=json.load(f)
count=0
for line in g:
  answer.append(g[line]['answer'] ) 
  h=g[line]['question']
  question_sentences.append( '<sos>' + h.replace('?', ''))

length=len(answer)

type(question_sentences[0])
print('\n')

f=open(os.path.join(data_dir, 'test_questions.json'), encoding='utf-8')
g=json.load(f)
for line in g:
  h=g[line]['question']
  question_sentences_test.append( '<sos>' + h.replace('?', ''))


from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

MAX_NUM_WORDS=60000

# Create Tokenizer to convert words to integers
quest_tokenizer = Tokenizer(num_words= MAX_NUM_WORDS)
quest_tokenizer.fit_on_texts(question_sentences)
quest_tokenized = quest_tokenizer.texts_to_sequences(question_sentences)

quest_wtoi = quest_tokenizer.word_index
print('Total question words:', len(quest_wtoi))
max_quest_length = max(len(sentence) for sentence in quest_tokenized)
print('Max question sentence length:', max_quest_length)

quest_test_tokenizer = Tokenizer(num_words= MAX_NUM_WORDS)
quest_test_tokenizer.fit_on_texts(question_sentences_test)
quest_test_tokenized = quest_tokenizer.texts_to_sequences(question_sentences_test)

quest_test_wtoi = quest_test_tokenizer.word_index
print('Total question words:', len(quest_test_wtoi))
max_quest_test_length = max(len(sentence) for sentence in quest_test_tokenized)
print('Max test question sentence length:', max_quest_test_length)

max_length=max(max_quest_length,max_quest_test_length)
quest_encoder_inputs = pad_sequences(quest_tokenized, maxlen=max_length)
type(quest_encoder_inputs)

quest_test_encoder_inputs = pad_sequences(quest_test_tokenized, maxlen=max_length)
type(quest_test_encoder_inputs)

num_answers=58
labels_dict =  {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
  }


ans_indices = [labels_dict[a] for a in answer]
ans_Y = tf.keras.utils.to_categorical(ans_indices)


str



Total question words: 4641
Max question sentence length: 22
Total question words: 1374
Max test question sentence length: 19


numpy.ndarray

numpy.ndarray

# PREPROCESSING IMAGE


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

apply_data_augmentation = False

# Create training ImageDataGenerator object

if apply_data_augmentation:
    img_data_gen = ImageDataGenerator(rotation_range=10,
                                      width_shift_range=10,
                                      height_shift_range=10,
                                      zoom_range=0.3,
                                      horizontal_flip=True,
                                      vertical_flip=True,
                                      fill_mode='reflect')
else:
    img_data_gen = ImageDataGenerator(rescale=1./255)
    

In [None]:
from PIL import Image

class CustomDataset(tf.keras.utils.Sequence):

  """
    CustomDataset inheriting from tf.keras.utils.Sequence.

    3 main methods:
      - __init__: save dataset params like directory, filenames..
      - __len__: return the total number of samples in the dataset
      - __getitem__: return a sample from the dataset

    Note: 
      - the custom dataset return a single sample from the dataset. Then, we use 
        a tf.data.Dataset object to group samples into batches.
      - in this case we have a different structure of the dataset in memory. 
        We have all the images in the same folder and the training and validation splits
        are defined in text files.

  """

  def __init__(self, dataset_dir, which_subset, img_generator=None, mask_generator=None, 
               preprocessing_function=None, out_shape=[256, 256]):
    

    f=open(os.path.join(data_dir, 'train_questions_annotations.json'), encoding='utf-8')   
    subset_file=json.load(f)
    
    subset_filenames = []
    answer=[]

    for line in subset_file:
      subset_filenames.append(subset_file[line]['image_id'])
      
    
    
    self.which_subset = which_subset
    self.dataset_dir = dataset_dir
    self.subset_filenames = subset_filenames
    self.img_generator = img_generator
    self.preprocessing_function = preprocessing_function
    self.out_shape = out_shape

  def __len__(self):
    return len(self.subset_filenames)

  def __getitem__(self, index):
    # Read Image
    curr_filename = self.subset_filenames[index]
    img = Image.open(os.path.join(self.dataset_dir, 'Images', curr_filename + '.png'))
    

    # Resize image and mask
    img = img.resize([256,256]) ##MANUALE
    
    img_arr = np.array(img)[...,:3]
    arr=img_arr
    img_arr = ((arr - arr.min()) * (1/(arr.max() - arr.min()) * 255)).astype('uint8')



    if self.which_subset == 'training':
      if self.img_generator is not None:
        # Perform data augmentation
        # We can get a random transformation from the ImageDataGenerator using get_random_transform
        # and we can apply it to the image using apply_transform
        img_t = self.img_generator.get_random_transform(img_arr.shape, seed=SEED)
        img_arr = self.img_generator.apply_transform(img_arr, img_t)
        # ImageDataGenerator use bilinear interpolation for augmenting the images.
        
    
      
    
    if self.preprocessing_function is not None:
      img_arr = self.preprocessing_function(img_arr)
    
    q=quest_encoder_inputs[index] ##extract from text preprocessing
  
    a=ans_Y[index]

    #inputs=(img_arr,q)
    inputs={'input_1': img_arr,'input_2':q}

    output=a

    return inputs,output

In [None]:
from tensorflow.keras.applications.vgg16 import preprocess_input 

img_h = 256
img_w = 256

dataset = CustomDataset(data_dir, 'training', 
                        img_generator=img_data_gen,  #else=img_data_gen
                        preprocessing_function=preprocess_input)



types = ( (tf.float32,tf.int64), (tf.int64) ) 
shapes = (([img_h, img_w, 3],[max_quest_length]),
          [58])

types2= ( { 'input_1':tf.float32, 'input_2' : tf.int64}, tf.int64) 
shapes2 = ({ 'input_1':[img_h, img_w, 3], 'input_2' : [max_quest_length]},
          [58])

full_dataset = tf.data.Dataset.from_generator(lambda: dataset,
                                               output_types=types2,
                                               output_shapes=shapes2)

bs=64
train_size = int(0.8 * length)
valid_size=length-train_size

#full_dataset = full_dataset.shuffle()
train_dataset = full_dataset.take(train_size)
valid_dataset = full_dataset.skip(train_size)


train_dataset = train_dataset.batch(bs)
train_dataset = train_dataset.repeat()
valid_dataset = valid_dataset.batch(bs)
valid_dataset = valid_dataset.repeat()


In [None]:
# Let's test data generator
# -------------------------
import time
from matplotlib import cm
import matplotlib.pyplot as plt

%matplotlib inline


train_dataset
next(iter(train_dataset))[0]['input_1']


# MODEL

In [None]:
# Import Keras 
from keras.layers import Conv2D, MaxPooling2D, Flatten
from keras.layers import Input, LSTM, Embedding, Dense
from keras.models import Model, Sequential

# Define CNN for Image Input
vision_model = Sequential()
vision_model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(256, 256, 3)))
vision_model.add(Conv2D(64, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(128, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(Conv2D(256, (3, 3), activation='relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Flatten())

image_input = Input(shape=(256, 256, 3))
encoded_image = vision_model(image_input)

In [None]:
# Define RNN for language input
question_input = Input(shape=[max_quest_length], dtype='int32')
embedded_question = Embedding(len(quest_wtoi)+1, output_dim=256, input_length=max_quest_length)(question_input)
encoded_question = LSTM(256)(embedded_question)

In [None]:
# Combine CNN and RNN to create the final model

merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
output = Dense(58, activation='softmax')(merged)
vqa_model = Model(inputs=[image_input, question_input], outputs=output)
vqa_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 22)]         0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, 22, 256)      1188352     input_2[0][0]                    
__________________________________________________________________________________________________
input_1 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
lstm (LSTM)                     (None, 256)          525312      embedding[0][0]                  
____________________________________________________________________________________________

In [None]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
vqa_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

import os
from datetime import datetime

cwd = os.getcwd()

exps_dir = os.path.join('/content/drive/My Drive/KerasRNN', 'translation_experiments')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

exp_name = 'exp'

exp_dir = os.path.join(exps_dir, exp_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# ----------------

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
    callbacks.append(es_callback)

# ---------------------------------



# How to visualize Tensorboard

# 1. tensorboard --logdir EXPERIMENTS_DIR --port PORT     <- from terminal
# 2. localhost:PORT   <- in your browser

# RUNN

In [None]:
vqa_model.load_weights("/content/drive/My Drive/Weights/Custom_R_15Epochs.h5")

In [None]:
vqa_model.fit(
  x=train_dataset,
  shuffle=True,
  epochs=3,
  steps_per_epoch=train_size/bs,
  validation_data=valid_dataset,
  validation_steps=valid_size/bs,
  callbacks=callbacks
)


Epoch 1/3
 19/735 [..............................] - ETA: 2:01:54 - loss: 0.6219 - accuracy: 0.7490

In [None]:
import json
def read_questions_test(path):
      with open(path, 'r') as file:
        qs = json.load(file)
      
      image_ids=[]
      ids=[]
      for key,val in qs.items():  
        image_ids.append(val['image_id'])
        ids.append(key)        
      return (ids, image_ids)


ids_test, test_image_ids = read_questions_test(os.path.join(data_dir, 'test_questions.json'))                    
  

In [None]:
# Setup Test 
from PIL import Image
def load_and_proccess_image(image_path):
    # Load image, then scale pixel values 
    img = Image.open(image_path)
    img = img.resize([256,256])
    img_arr = np.array(img)/255.0
    
    return img_arr

## Function which creates np.array given an index using test_image_ids
def load_img_from_index(index):
    id= str(test_image_ids[index]) + '.png'
    img_path = os.path.join(image_dir, id) 
    img      =load_and_proccess_image(img_path)
    img      =np.array(img[:,:,1:4])
    return img

In [None]:
index=0
id=os.path.join(image_dir,str(test_image_ids[index]) + '.png')
img=load_and_proccess_image(id)
img.shape
img=np.array(img[:,:,1:4])
img.shape

In [None]:
results = {}
index=0

# load image using test_image_ids
img=load_img_from_index(index)
img=img[np.newaxis,:,:,:]
img.shape
q=quest_test_encoder_inputs[index]
q=q[np.newaxis,:]
q.shape
inputs={'input_1': img,'input_2':q}
out_sigmoid = vqa_model.predict(x=inputs)
out_sigmoid.shape
out_sigmoid
max(out_sigmoid[0,:])
predicted_class = tf.argmax(out_sigmoid[0,:])
predicted_class.shape
predicted_class
results[id]=predicted_class


In [None]:
## Evaluate on test set

import os
from datetime import datetime

def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

results = {}
index=0
for id in ids_test:
  # load image using test_image_ids
  if (index%10==0):
    print(index)
  img=load_img_from_index(index)
  img=img[np.newaxis,:,:,:]
  q=quest_test_encoder_inputs[index]
  q=q[np.newaxis,:]
  inputs={'input_1': img,'input_2':q}
  out_sigmoid = vqa_model.predict(x=inputs)
  predicted_class = tf.argmax(out_sigmoid[0,:])
  results[id]=predicted_class.numpy()
  index+=1




In [None]:
save_dir=r"/content/drive/MyDrive"
create_csv(results,save_dir)