### Imports

In [7]:
import random
import json 
import os
import pandas as pd
import numpy as np
#import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras

tf.config.set_visible_devices([], 'GPU')

### Load Dataset

In [8]:
with open("v2_OpenEnded_mscoco_train2014_questions.json") as f:
    data = json.load(f)
    
questions = data['questions']
training_questions_df = pd.DataFrame(questions)

In [9]:
with open("v2_mscoco_train2014_annotations.json") as f:
    data = json.load(f)
    
annotations = data['annotations']
training_annotations_df = pd.DataFrame(annotations)

### Preparing the data

In [10]:
def id_to_path(image_id):
    
    return f'train2014/COCO_train2014_{str(image_id).zfill(12)}.jpg'

In [11]:
def Preprocessing(questions_df, annotations_df, filename):
    df_ = pd.DataFrame(annotations_df[['question_id', 'image_id', 'question_type', 'multiple_choice_answer']])
    df_.rename(columns = {'multiple_choice_answer': 'answer', 'image_id':'image_path' }, inplace = True)
    df_merged = df_.merge(questions_df[['question_id', 'question']], left_on='question_id', right_on='question_id', how='left')
    
    final = df_merged[['question_id', 'image_path', 'question', 'question_type', 'answer']]
    final['image_path'] = final['image_path'].apply(id_to_path)
    final.to_csv(filename)
    
    return final.head()

In [12]:
Preprocessing(training_questions_df, training_annotations_df,'dataset.csv')

Unnamed: 0,question_id,image_path,question,question_type,answer
0,458752000,train2014/COCO_train2014_000000458752.jpg,What is this photo taken looking through?,what is this,net
1,458752001,train2014/COCO_train2014_000000458752.jpg,What position is this man playing?,what,pitcher
2,458752002,train2014/COCO_train2014_000000458752.jpg,What color is the players shirt?,what color is the,orange
3,458752003,train2014/COCO_train2014_000000458752.jpg,Is this man a professional baseball player?,is this,yes
4,262146000,train2014/COCO_train2014_000000262146.jpg,What color is the snow?,what color is the,white


In [13]:
dataset = pd.read_csv('dataset.csv', index_col = 0)

In [14]:
dataset.head()

Unnamed: 0,question_id,image_path,question,question_type,answer
0,458752000,train2014/COCO_train2014_000000458752.jpg,What is this photo taken looking through?,what is this,net
1,458752001,train2014/COCO_train2014_000000458752.jpg,What position is this man playing?,what,pitcher
2,458752002,train2014/COCO_train2014_000000458752.jpg,What color is the players shirt?,what color is the,orange
3,458752003,train2014/COCO_train2014_000000458752.jpg,Is this man a professional baseball player?,is this,yes
4,262146000,train2014/COCO_train2014_000000262146.jpg,What color is the snow?,what color is the,white


### Load Glove Embeddings

In [15]:
# Load GloVe embeddings

def load_glove_embeddings(glove_file):
    
    embeddings = {}
    with open(glove_file, 'r', encoding='utf-8') as f:
        
        lines = f.readlines()
        for line in lines:
            word, str_vector = line.split(maxsplit = 1)
            vector = np.asarray(str_vector.split(" "), dtype = np.float32) 
            embeddings[word] = vector
            
    return embeddings

In [None]:
glove_file = 'glove.42B.300d.txt'
embeddings = load_glove_embeddings(glove_file)

### Tokenization

In [None]:
# tokenizer for the dataset
from tensorflow.keras.preprocessing.text import Tokenizer

tokenizer = Tokenizer(num_words=1000, oov_token='<OOV>')

# converts the sentences into tokens(individual words) aftering making them lowercase and stripping them from punctuation
tokenizer.fit_on_texts(dataset['question'] + " " + dataset['answer'])

In [None]:
# Get word index (word -> index mapping)
word_index = tokenizer.word_index

In [None]:
# Prepare embeddings matrix (GloVe is 300-dimensional)

vocab_size = len(word_index) + 1  # +1 for padding
embedding_dim = 300
embeddings_matrix = np.zeros((vocab_size, embedding_dim))

# Fill the embedding matrix with GloVe vectors
for word, index in word_index.items():
    embedding_vector = embeddings.get(word)
    
    if embedding_vector is not None:
        embeddings_matrix[index] = embedding_vector

In [None]:
reverse_word_index = {item[1]: item[0] for item in word_index.items()}

### Preprocessing data

In [15]:
def Preprocessing2(dataset):
    df = pd.DataFrame(dataset[['image_path', 'question','answer']])
    df['question'] = df['question'].apply(lambda x: tf.keras.utils.pad_sequences(tokenizer.texts_to_sequences([x]), padding= "post", maxlen = 10))
    df['question'] = df['question'].apply(np.squeeze)
    
    return (np.stack(df['image_path']), np.stack(df['question']),
            np.stack(tokenizer.texts_to_matrix(df['answer'])))

In [16]:
dataset.isna().sum()

question_id      0
image_path       0
question         0
question_type    0
answer           0
dtype: int64

In [17]:
images, questions, answers = Preprocessing2(dataset)

### Model Building

In [18]:
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [19]:
# Import pre-trained ResNet50 Architecture
from keras.applications import ResNet50V2
from keras.applications import VGG16

In [20]:
vgg = ResNet50V2(weights = 'imagenet', input_shape=(224,224,3))

In [21]:
vgg_model = keras.Model(inputs = vgg.input, outputs = vgg.layers[-2].output)
vgg_model.trainable = False

In [22]:
vgg_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                              

In [23]:
# model inputs
input_image = keras.layers.Input(shape = (224,224,3), name = 'Image Input')
input_question = keras.layers.Input(shape =(10,), name = "Question Input")

In [24]:
# the VGG16 model expects inputs that are preprocessed with the function keras.applications.vgg16.preprocess_input()
# the fucntion returns preprocessed `numpy.array` or a `tf.Tensor` with type `float32`. The images are converted from RGB to BGR, 
# then each color channel is zero-centered with respect to the ImageNet dataset, without scaling.

# get image embedding
x = vgg_model(input_image)
x = keras.layers.Dense(2048, activation = 'tanh', name = 'Dense1')(x)
image_rep = keras.layers.Dense(1024, activation = 'tanh', name = 'Dense2')(x)

# get question embedding
embedding_layer_output = keras.layers.Embedding(vocab_size, embedding_dim, weights = [embeddings_matrix], trainable  = False, name = "EmbeddingsLayer")(input_question)
lstm1_out, hidden1, cell1 = keras.layers.LSTM(512, activation = 'tanh', return_sequences=True, return_state= True, name = "lstm1")(embedding_layer_output)
lstm2_out, hidden2, cell2 = keras.layers.LSTM(512, activation = 'tanh', return_sequences=False, return_state = True, name = "lstm2")(lstm1_out)
concat_out = keras.layers.Concatenate(name= 'ConcatenateLayer')([hidden1, cell1, hidden2, cell2])
question_rep = keras.layers.Dense(1024, activation= 'tanh', name = 'Dense3')(concat_out)

In [25]:
# When you provide multiple inputs to a Lambda layer, they are passed as a single list or tuple
# Lambda layer for element-wise multiplication
combined_rep = keras.layers.Lambda(lambda tensors: tf.multiply(tensors[0], tensors[1]))([image_rep, question_rep])

# Multi-layer perceptron branch
dense1_out = keras.layers.Dense(1000, activation = 'tanh', name = 'Dense4')(combined_rep)
dropdense1_out = keras.layers.Dropout(0.2, name = "Dropout1",)(dense1_out)
dense2_out = keras.layers.Dense(1000, activation = 'tanh', name = 'Dense5')(dropdense1_out)
dropdense2_out = keras.layers.Dropout(0.3, name = "Dropout2",)(dense2_out)
softmax_out = keras.layers.Dense(1000, activation = 'softmax', name = "LastLayer")(dropdense2_out)

In [26]:
vqa_model = keras.Model(inputs = [input_image, input_question], outputs = softmax_out)

vqa_model.summary(show_trainable=True)

Model: "model_1"
_____________________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     Trainable  
 Question Input (InputLayer)    [(None, 10)]         0           []                               Y          
                                                                                                             
 EmbeddingsLayer (Embedding)    (None, 10, 300)      5913600     ['Question Input[0][0]']         N          
                                                                                                             
 Image Input (InputLayer)       [(None, 224, 224, 3  0           []                               Y          
                                )]                                                                           
                                                                                                       

In [27]:
callbacks_list = [keras.callbacks.ModelCheckpoint(filepath = "best_vqa_model8.keras", save_best_only= True, monitor = 'val_loss'),
                  keras.callbacks.EarlyStopping(monitor = 'val_loss', patience= 5, mode = 'min')]

In [28]:
vqa_model.compile(optimizer = keras.optimizers.RMSprop(learning_rate=0.001), loss= 'categorical_crossentropy', 
                  metrics = [keras.metrics.CategoricalAccuracy()])

In [29]:
images = tf.convert_to_tensor(images)
questions = tf.convert_to_tensor(questions, dtype = tf.int32)
answers = tf.convert_to_tensor(answers, dtype = tf.int32)

In [30]:
train_images = images[4000:50000]
train_questions = questions[4000:50000]
train_answers = answers[4000:50000]

valid_images = images[:8000]
valid_questions = questions[:8000]
valid_answers = answers[:8000]

test_images = images[2000: 4000]
test_questions = questions[2000: 4000]
test_answers = answers[2000: 4000]

In [31]:
training_data = tf.data.Dataset.from_tensor_slices((train_images, train_questions, train_answers))
validation_data = tf.data.Dataset.from_tensor_slices((valid_images, valid_questions, valid_answers))
testing_data = tf.data.Dataset.from_tensor_slices((test_images, test_questions, test_answers))

In [32]:
# tf.data.Dataset.map() works with symbolic tensors, 
# keras.utils.load_img, which relies on a file path in Python string format and cannot process symbolic tensors

def preprocess_image(image_path, question, answer):
    
    image = tf.io.read_file(image_path)
    image_array = tf.image.decode_jpeg(image, channels= 3)
    
    return (tf.image.resize(image_array, [224, 224]) /255.0, question), answer

In [33]:
training_data_optimized = (training_data
                                .map(preprocess_image, num_parallel_calls= tf.data.AUTOTUNE)  # Parallel processing
                                .batch(batch_size = 64,num_parallel_calls = tf.data.AUTOTUNE, drop_remainder= False)
                                .prefetch(buffer_size= tf.data.AUTOTUNE))  # Asynchronous prefetching

validation_data_optimized = (validation_data
                                .map(preprocess_image, num_parallel_calls= tf.data.AUTOTUNE)  # Parallel processing
                                .batch(batch_size = 64,num_parallel_calls = tf.data.AUTOTUNE, drop_remainder= False)
                                .prefetch(buffer_size= tf.data.AUTOTUNE))  # Asynchronous prefetching

testing_data_optimized = (testing_data
                                .map(preprocess_image, num_parallel_calls= tf.data.AUTOTUNE)  # Parallel processing
                                .batch(batch_size = 64,num_parallel_calls = tf.data.AUTOTUNE, drop_remainder= False)
                                .prefetch(buffer_size= tf.data.AUTOTUNE))  # Asynchronous prefetching

In [34]:
history = vqa_model.fit(training_data_optimized, epochs = 10, callbacks= callbacks_list,
                        validation_data= validation_data_optimized)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [3]:
loaded_model = keras.models.load_model('best_vqa_model8.keras')

In [36]:
# loaded_model.evaluate(testing_data_optimized)

In [4]:
def test_input(model, image_path, input_question):

    img = keras.utils.load_img(image_path, target_size = (224,224))
    img_array = keras.utils.img_to_array(img) /255
    tokenized_question = keras.utils.pad_sequences(tokenizer.texts_to_sequences([input_question]), padding= "post", maxlen = 10)
    prediction = model((tf.expand_dims(img_array, axis = 0), tokenized_question))

    return reverse_word_index[np.argmax(prediction)]

In [5]:
test_input(loaded_model, image_path = 'test_image.jpg', input_question = 'How many people are there?')

NameError: name 'tokenizer' is not defined