In [None]:
# ---------- visualizzo piu output a schermo insieme ------------------------------
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity="all"
#-------------------------- scelgo se usare CPU o GPU ------------------------------
from PIL import Image
import numpy as np
import json
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
import os
 
COMMIT = True 
SEED = 1234
DATASET_SPLIT = 0.8
img_h = 224
img_w = 224
BATCH_SIZE = 32

tf.random.set_seed(SEED)
cwd = os.getcwd()

classes = {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
}

N_CLASSES = len(classes)

In [None]:
# percorsi
train_json_path = cwd + '/VQA_Dataset/train_questions_annotations.json'
test_json_path = cwd + '/VQA_Dataset/test_questions.json'
imgs_path = cwd + '/VQA_Dataset/Images'

In [None]:
from PIL import Image
from tensorflow.keras.applications.vgg16 import preprocess_input 
class DataGenerator(tf.keras.utils.Sequence):
    
    def __init__(self, answers_ID, image_name, train_input_questions, max_length, to_fit=True,
                 batch_size=32, dim=(img_h, img_w), n_channels=3, n_classes=N_CLASSES, shuffle=False):
        self.answers_ID = answers_ID
        self.train_input_questions = train_input_questions
        self.image_name = image_name
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.img_h = dim[0]
        self.img_w = dim[1]
        self.max_length = max_length
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.image_name) / self.batch_size))

    def __getitem__(self, index):
        batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        X = self._generate_X(batch_indexes)
        if self.to_fit:
            # answers
            Y = np.asarray([self.answers_ID[k] for k in batch_indexes])
            return X, Y
        else:
            return X
    
    def on_epoch_end(self):
        # salvo indici totali (globali)
        self.indexes = np.arange(len(self.image_name))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, batch_indexes):
        imm = np.empty((self.batch_size, img_h, img_w, self.n_channels))
        quest = np.empty((self.batch_size, self.max_length))
        # i = indice batch   ID = indice globale --> creo abbinamenti domanda/immagine per il batch
        for i, ID in enumerate(batch_indexes):
            #ID dell'immagine
            imm[i,] = self._load_image(self.image_name[ID], self.img_w, self.img_h)
            #question: lista di id domande
            quest[i,] = (self.train_input_questions[ID]).tolist()
        X = [np.array(quest), np.array(imm)]
        return X

    def _load_image(self, image_name, img_w, img_h):
        image = np.array(Image.open(os.path.join(imgs_path,image_name+'.png')).resize([img_h,img_w]))[:,:,:3]
        #image = image/ 255.
        image = preprocess_input(image)
        return image

In [None]:
# funzioni per estrarre dati

def readTestJson(data):
    key_list=list(data.keys())
    image_IDs = []
    questions = []
    questionsID = []

    for key in key_list:
        image_IDs.append(data[key]['image_id'])
        questionsID.append(key)
        tmp_question = (data[key]['question'].replace("?","")).split(" ")
        questions.append(tmp_question)

    return image_IDs, questionsID, questions

In [None]:
# creo vocabolario su train + test

tokenizer = tf.keras.preprocessing.text.Tokenizer()

with open(train_json_path, 'r') as f:
    train_data = json.load(f)
f.close()

with open(test_json_path, 'r') as f:
    test_data = json.load(f)
f.close()

key_list=list(train_data.keys())
image_IDs = []
questions = []
answers = []

for key in key_list:
    tmp = (train_data[key]['question'].replace("?","")).split(" ")
    image_IDs.append(train_data[key]['image_id'])
    questions.append(tmp)
    answers.append(classes[train_data[key]['answer']])

all_questions_4Tokenizer = questions
test_images, test_questionsID, test_questions = readTestJson(test_data)
all_questions_4Tokenizer = all_questions_4Tokenizer + test_questions


In [None]:
tokenizer.fit_on_texts(all_questions_4Tokenizer)      
words_number = len(tokenizer.word_index) + 1

train_sequences = tokenizer.texts_to_sequences(questions)
all_sequences = tokenizer.texts_to_sequences(all_questions_4Tokenizer)
test_sequences = tokenizer.texts_to_sequences(test_questions)

max_length = max(len(sequence) for sequence in all_sequences)
train_input_questions = pad_sequences(train_sequences, maxlen=max_length)

test_questions = pad_sequences(test_sequences, maxlen=max_length)

# divido in training e validation
perc = 0.8

train_images = []
train_questions = []
train_answers = []
valid_images = []
valid_questions = []
valid_answers = []

for i in range(len(image_IDs)):
    if (np.random.rand()) < perc:
        train_images.append(image_IDs[i])
        train_questions.append(train_input_questions[i])
        train_answers.append(answers[i])
    else:
        valid_images.append(image_IDs[i])
        valid_questions.append(train_input_questions[i])
        valid_answers.append(answers[i])

In [None]:
# data generators

training_generator = DataGenerator(answers_ID = train_answers,
                                   image_name = train_images,
                                   train_input_questions = train_questions,
                                   max_length = max_length,
                                   to_fit=True,
                                   batch_size=BATCH_SIZE,
                                   dim=(img_h, img_w),
                                   n_classes=N_CLASSES,
                                   shuffle=True)

validation_generator = DataGenerator(answers_ID = valid_answers,
                                     image_name = valid_images,
                                     train_input_questions = valid_questions,
                                     max_length = max_length,
                                     to_fit = True,
                                     batch_size=BATCH_SIZE,
                                     dim=(img_h, img_w),
                                     n_classes=N_CLASSES,
                                     shuffle=False)

test_generator = DataGenerator(answers_ID = None,
                               image_name = test_images,
                               train_input_questions = test_questions,
                               max_length = max_length,
                               to_fit=False,
                               batch_size=1,
                               dim=(img_h, img_w),
                               n_classes=N_CLASSES,
                               shuffle=False)

In [None]:
# creo il modello

INPUT_SIZE_MERGE = 256

base_model = tf.keras.applications.VGG16(input_shape=(img_h, img_w, 3), include_top=False, weights='imagenet')
for layer in base_model.layers:
    layer.trainable = False

# modello immagine
vision_model = tf.keras.models.Sequential()
vision_model.add(base_model)
vision_model.add(tf.keras.layers.GlobalAveragePooling2D())
vision_model.add(tf.keras.layers.Dense(INPUT_SIZE_MERGE))
image_input = tf.keras.layers.Input(shape=(img_h, img_w, 3))
encoded_image = vision_model(image_input)

# modello testo
question_input = tf.keras.layers.Input(shape=[max_length])
embedded_question = tf.keras.layers.Embedding(input_dim=words_number, output_dim=256, input_length=max_length)(question_input)
encoded_question = tf.keras.layers.LSTM(units=INPUT_SIZE_MERGE)(embedded_question)

# modello finale (immagine + testo)
merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
output = tf.keras.layers.Dense(len(classes), activation='softmax')(merged)
vqa_model = tf.keras.models.Model(inputs=[question_input, image_input], outputs=output)

load_weights = True 
if load_weights:
    model_name = "VQA" 
    model_dir = os.path.join(cwd, "model")
    vqa_model.load_weights(os.path.join(model_dir, model_name))

In [None]:
#-------- classe per cambiare lr ----------------------
class CLR(tf.keras.callbacks.Callback):
    def __init__(self, schedule):
        super(CLR, self).__init__()
        self.schedule = schedule
    def on_epoch_begin(self, epoch, logs=None):
        if not hasattr(self.model.optimizer, "lr"):
            raise ValueError('non hai settato lr')
        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        scheduled_lr = self.schedule(epoch, lr)
        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)

In [None]:
#---------------- definisco callbacks ------------------------------------
from tensorflow.keras.callbacks import EarlyStopping
    
callbacks = []

early_stop = True
if early_stop:
    es_callback = EarlyStopping(monitor='val_loss', patience=20)
    callbacks.append(es_callback)
    
#--------------------------- lookup table per lr (standard)-------------
LUT_STD = []
#------------------- funzione per passare lr ---------------------------
def get_lr_std(epoch, lr):
    if epoch < LUT_STD[0][0]:
        return LUT_STD[0][1]
    elif epoch > LUT_STD[len(LUT_STD)-1][0]:
        return LUT_STD[len(LUT_STD)-1][1]
    for i in range(len(LUT_STD)):
        if epoch == LUT_STD[i][0]:
            print("\nnuovo lr: "+str(LUT_STD[i][1]))
            return LUT_STD[i][1]
    return lr

callbacks.append(CLR(get_lr_std))

In [None]:
# compilo modello

loss = tf.keras.losses.SparseCategoricalCrossentropy()

vqa_model.compile(optimizer="Adam",
                  loss='sparse_categorical_crossentropy',
                  metrics=['sparse_categorical_accuracy'])

In [None]:
#---------- fit modello ---------------------------------------------------
EP = 15

LUT_STD = [(0, 1e-6),
           (2, 1e-6),
           (8, 1e-6)]

for layer in base_model.layers:
    layer.trainable = False

vqa_model.fit(x=training_generator,
              validation_data=validation_generator,
              epochs=EP,
              callbacks=callbacks)

In [None]:
#------------- salvo pesi --------------------------------------------------
save_weights = True

if save_weights:
    model_name = "VQA"
    model_dir = os.path.join(cwd, 'model')
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)    
    vqa_model.save_weights(os.path.join(model_dir, model_name))

In [None]:
# faccio predizioni
pred = vqa_model.predict(test_generator)

In [None]:
# creo csv
import os
from datetime import datetime

def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(str(key) + ',' + str(value) + '\n')


In [None]:
results = {}
for i in range(len(pred)):
    results[test_questionsID[i]] = np.argmax(pred[i])
    
create_csv(results)