In [None]:
import tensorflow as tf
import numpy as np 
import os
from PIL import Image
import json
import matplotlib.pyplot as plt

SEED = 1234

tf.random.set_seed(SEED)
np.random.seed(SEED)

def fromCwd(path):
    return os.path.join(os.getcwd(), path)

In [None]:
if os.path.exists("/kaggle"):
    # kaggle settings
    dataset_dir = "/kaggle/input/vqaset/VQA_Dataset"
    output_dir = "/kaggle/working"
elif os.path.exists("/content/gdrive"):
    # colab settings here
    dataset_dir = ""
    output_dir = ""
else:
    # local settings
    dataset_dir = fromCwd("data")
    output_dir = fromCwd("output")


if not os.path.exists(output_dir):
    os.makedirs(output_dir)

train_json_path = os.path.join(dataset_dir,'train_questions_annotations.json')
img_path = os.path.join(dataset_dir,'Images')

test_json_path = os.path.join(dataset_dir,'test_questions.json')

In [None]:
labels_dict = {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
}

In [None]:
# utility functions

# lower text in data items
def normalize_question(question):
    return question.lower()

# find max question lenght in the set
def calc_max_question_len(data):
    mval = 0
    for k in data:
        v = len(data[k]["question"].split())
        if mval < v:
            mval = v
    
    return mval


# convert answer to integer
def convert_answer(answer):
    return tf.keras.utils.to_categorical(labels_dict[answer], num_classes=len(labels_dict), dtype='float32')

# generate image path from image_id
def make_image_path(img_id):
    return os.path.join(img_path, img_id + ".png")

def load_image(path, size):
    img = Image.open(path).convert('RGB')
    img = img.resize(size)
    return np.array(img) / 255.

#################################################################################################################


# tokenize and pad question
def tokenize_question(question, tokenizer, pad_to_size):
    word_index = tokenizer.word_index
  
    result = tokenizer.texts_to_sequences([question])[0] # tokenize
   
    pad_num = pad_to_size - len(result)
    # pad to max len with zeros (<pad>)
    if pad_num > 0:
        for i in range(0, pad_num):
            result.append(0)

    # append <eos> token
    result.append(word_index['<eos>']) #TODO: is it really needed?

    return result



In [None]:
# load train json
train_data = []

with open(train_json_path) as json_file:
    train_data = json.load(json_file)

# normalize questions and answers
for k in train_data:
    train_data[k]["question"] = normalize_question(train_data[k]["question"])
    train_data[k]["answer"] = convert_answer(train_data[k]["answer"])
    train_data[k]["image_path"] = make_image_path(train_data[k]["image_id"])

# get max question lenght
max_question_len = calc_max_question_len(train_data)

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences


# fit tokenizer to data
def fit_tokenizer(tok, data):
    qset = set()

    for key in data:
        qset.add(data[key]['question'])

    tok.fit_on_texts(qset)
    # create custom eos token as last token
    tok.word_index['<eos>'] = len(tok.word_index) + 1 # +1 because 0 index is reserved and real last element has value of len(tok.word_index)
    return tok


# we create a global tokenizer to share in all application in this way we have
max_words = 10e4
tokenizer = Tokenizer(num_words = max_words, oov_token='<unk>')

tokenizer = fit_tokenizer(tokenizer, train_data)


# tokenize questios
for k in train_data:
    train_data[k]["question_tok"] = tokenize_question(train_data[k]["question"], tokenizer, max_question_len)

In [None]:
# print sample
print(train_data['1'])


In [None]:
# make validation data
import random

valid_percent = 0.2

print("Original size: {}".format(len(train_data)))

# create subsample of train data
valid_data = dict(random.sample(train_data.items(), int(len(train_data) * valid_percent)))

# remove from train set
for k in valid_data:
    del train_data[k]

print("train ({}) + valid ({}) = {}".format(len(train_data), len(valid_data), len(train_data)+len(valid_data)))

In [None]:
# create custom data generator

# create custom data generator

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data, image_shape):
        self.image_shape = image_shape
        self.data = data
        self.id_list = [key for key in self.data]
        
    def __len__(self):
        return len(self.id_list)

    def __getitem__(self, index):
        block = self.data[self.id_list[index]]

        return {'image': load_image(block["image_path"], self.image_shape), 'question': block["question_tok"]}, block['answer']
    


In [None]:
target_image_shape = [224, 224]
batch_size = 32

prefetch_sz = tf.data.experimental.AUTOTUNE

out_shape = ({'image': target_image_shape + [3], 'question': [max_question_len+1]}, [len(labels_dict)])
out_type = ({'image': tf.float32, 'question': tf.int32}, tf.float32)

train_gen = DataGenerator(train_data, image_shape= target_image_shape)
valid_gen = DataGenerator(valid_data, image_shape= target_image_shape)

train_dataset = tf.data.Dataset.from_generator(lambda: train_gen, output_shapes= out_shape, output_types= out_type).batch(batch_size).repeat().prefetch(prefetch_sz)
valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen, output_shapes= out_shape, output_types= out_type).batch(batch_size).repeat().prefetch(prefetch_sz)


In [None]:
print(train_dataset)

print(valid_dataset)

## Model

In [None]:
num_classes = len(labels_dict)
img_shape = target_image_shape + [3]

max_question_len += 1 # add end of sequence token
# ------------------------ CNN ----------------------


vgg = tf.keras.applications.VGG16(include_top = False, input_shape = img_shape)

for l in vgg.layers[:int(len(vgg.layers)*0.75)]:
  l.trainable = False 

vgg_output = tf.keras.layers.GlobalAveragePooling2D()(vgg.output)
vgg_output = tf.keras.layers.Dense(units = 1024,activation = 'softmax')(vgg_output)
#vgg.summary()
vgg.layers[0]._name = 'image' # rename layer to make it compatible with named dataset values

cnn = tf.keras.Model(inputs = vgg.input, outputs = vgg_output)


# ------------------------ RNN ----------------------

EMBEDDING_SIZE = 32

# ENCODER
# -------
# in keras out = layer(input)

encoder_input = tf.keras.Input(shape=[max_question_len], name='question')

encoder_embedding_layer = tf.keras.layers.Embedding(len(tokenizer.word_index)+1, EMBEDDING_SIZE, input_length=max_question_len, mask_zero=True)

encoder_embedding_out = encoder_embedding_layer(encoder_input)

# I need 128 units because I have 4 words each embedded in 32 integers values (128 lstm cells)
encoder = tf.keras.layers.LSTM(units=128, return_state=True)

encoder_output, h, c = encoder(encoder_embedding_out)
encoder_output = tf.keras.layers.Dense(units = 1024, activation = 'softmax')(encoder_output)
encoder_states = [h, c]

rnn = tf.keras.Model(inputs = encoder_input, outputs = encoder_output)


# ---------------------Merging--------------------------------

x1 = cnn.output 
x2 = rnn.output 

merging_layer = tf.keras.layers.Multiply()([x1,x2])
classifier = tf.keras.layers.Dense(units=512,activation='relu')(merging_layer)
classifier = tf.keras.layers.Dense(units=512,activation='relu')(classifier)
classifier = tf.keras.layers.Dense(units=num_classes, activation='softmax')(classifier)

VQA_model = tf.keras.Model(inputs = [cnn.input,rnn.input], outputs = classifier)

VQA_model.summary()

In [None]:

# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-5
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
VQA_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
epochs = 10

history = VQA_model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    verbose=1,
    callbacks=None,
    steps_per_epoch = int(len(train_gen)/batch_size),
    validation_steps = int(len(valid_gen)/batch_size)
)

In [None]:
# plot graphs
# not always we have tensorboard enabled

idx = 1
mtr = ['loss','accuracy']

plt.figure(figsize=(25, 8))

for m in mtr:
    x = history.history[m]
    val_x = history.history['val_' + m]

    plt.subplot(1, len(mtr), idx)
    plt.plot(x, label='Training ' + m)
    plt.plot(val_x, label='Validation ' + m)
    plt.legend(loc='lower right')
    plt.title('Training and Validation ' + m)
    idx += 1


In [None]:

def predict(item, model):

    img = load_image(item["image_path"])

    dt = {'image': img, 'question': item["question"]}

    res = model.predict(dt)
    return np.argmax(p) # return index of best class that is our porediction
    

In [None]:
import os
from datetime import datetime

def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
# load train json
test_data = []

with open(test_json_path) as json_file:
    test_data = json.load(json_file)

# normalize questions and answers
for k in test_data:
    test_data[k]["question"] = normalize_question(train_data[k]["question"])
    test_data[k]["image_path"] = make_image_path(train_data[k]["image_id"])


# make predictions
output = {}
for k in test_data:
    res = predict(test_data[k], VQA_model)
    output[k] = res

create_csv(output, results_dir=output_dir)

print("Ok :3")
