In [1]:
import numpy as np
import pandas as pd
import pickle
import json
import re
from collections import defaultdict
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from google.colab import drive

# Mounting the drive to access files
drive.mount('/content/MyDrive')
!unzip /content/MyDrive/MyDrive/v2_Annotations_Train_mscoco.zip -d /content/annotations
!unzip /content/MyDrive/MyDrive/v2_Questions_Train_mscoco.zip -d /content/questions

Mounted at /content/MyDrive
Archive:  /content/MyDrive/MyDrive/v2_Annotations_Train_mscoco.zip
  inflating: /content/annotations/v2_mscoco_train2014_annotations.json  
Archive:  /content/MyDrive/MyDrive/v2_Questions_Train_mscoco.zip
  inflating: /content/questions/v2_OpenEnded_mscoco_train2014_questions.json  


In [8]:
# Global variable for dimension size
GLOBAL_DIM = 11600

# Loading image embeddings from a CSV file
image_embeddings = pd.read_csv("/content/i_embeddings.csv")
# Loading question embeddings using pickle
question_embeddings = pickle.load(open("/content/q_embeddings.pkl", "rb"))
# Extracting vectors from question embeddings
question_embeddings_processed = question_embeddings['vec'].values

In [9]:
# Reshaping question embeddings to flatten them
for i in range(question_embeddings_processed.shape[0]):
    question_embeddings_processed[i]=tf.reshape(question_embeddings_processed[i],[question_embeddings_processed[i].shape[1]*768])

# Function to align dimensions of question and image embeddings
def shrink_dimensions(question,image):
    return tf.pad(tf.concat([question,image], 0), tf.constant([[0, GLOBAL_DIM-tf.concat([question,image], 0).shape[0],]]), "CONSTANT")

# Calculating length of each question
question_length = np.array([len(quest) for quest in question_embeddings['question']])

# Extracting unique image IDs
seen = set()
image_ids = np.array([x for x in question_embeddings['image_id'] if x not in seen and (seen.add(x) or True)]) - 1

In [3]:
# Reading and parsing annotations JSON
with open("/content/annotations/v2_mscoco_train2014_annotations.json",'r') as f:
    annotations =f.read()
annotations=json.loads(annotations)['annotations']

In [10]:
# Counting occurrences of each answer
ans_sorted = defaultdict(lambda: 0)
for ann in annotations:
    for ans in ann['answers']:
        if re.search(r"[^\w\s]", ans['answer']):
            continue
        ans_sorted[ans['answer']] += 1

In [12]:
# Sorting answers and checking for '<unk>'
ans = sorted(ans_sorted, key=ans_sorted.get, reverse=True)
if '<unk>' in ans:
    raise ValueError("'<unk>' found in answers")

# Saving top 500 answers to a file
with open('/content/vocab.txt', 'w') as f:
    for answer in ['<unk>'] + ans[:500-1]:
        f.write(answer + '\n')

# Regular expression for tokenizing sentences
GLOBAL_SPLIT = re.compile(r'(\W+)')

In [13]:
# Class for handling vocabulary and tokenization
class VocabDict:
    # Private method for loading words from file
    def _load_words(self, path_ann):
        with open(path_ann, 'r') as f:
            return [l.strip() for l in f]

    # Constructor
    def __init__(self, vocab_file):
        self.word_L = self._load_words(vocab_file)
        self.convertToDict = {word: idx for idx, word in enumerate(self.word_L)}
        self.tokenToDict = self.convertToDict.get('<unk>')

    # Method to convert words to indices
    def convertWords(self, word):
        return self.convertToDict.get(word, self.tokenToDict)

    # Tokenize and index a sentence
    def tokenize_and_index(self, sentence):
        words = filter(None, [word.strip() for word in GLOBAL_SPLIT.split(sentence.lower())])
        return [self.convertWords(word) for word in words]

# Creating a vocabulary dictionary from the answers file
ans_vocab = VocabDict('/content/vocab.txt').convertToDict
answers = np.array(list(ans_vocab.values()))

In [15]:
# Initializing arrays for storing data
x_tensor = []
q_id = []

# Function to parse image vectors
def parse_image_vector(vector_str):
        return np.fromstring(vector_str.replace('\n', '').replace('[', '').replace(']', '').replace('  ', ' '), sep=' ')

# Constructing the input tensor by combining question and image features
for i in range(question_embeddings.shape[0]):
    image_features = image_embeddings.loc[image_embeddings['Image ID'] == question_embeddings.iloc[i]['image_id'], 'Image Vector'].apply(parse_image_vector).tolist()

    if not all(array.shape == image_features[0].shape for array in image_features):
        print(f"Error: Different shapes found in image_features at index {i}")
        continue

    if question_embeddings_processed[i].shape[0] < 11000:
        x_tensor.append(shrink_dimensions(question_embeddings_processed[i], tf.convert_to_tensor(image_features, dtype=tf.float32)[0]))
        q_id.append(question_embeddings.iloc[i]["question_id"])

# Function to find unique answer for a given question ID
def unique_answer(question_id):
    for answer in annotations:
        if answer['question_id']==question_id:
            return answer['multiple_choice_answer']

In [16]:
# Converting tensors to numpy arrays and initializing labels
for i in range(len(x_tensor)):
    x_tensor[i]=x_tensor[i].numpy()
x_train=np.array(x_tensor)
label=np.zeros(shape=(len(x_tensor),500))

# Assigning labels to training data
for i in range(len(q_id)):
    try:
        key=ans_vocab[unique_answer(q_id[i])]
        label[i][key]=1
    except:
        label[i][0]=1
label=np.array(label)

In [19]:
# Defining the neural network model
model = Sequential()
model.add(Dense(200, input_dim=x_tensor[0].shape[0], activation='relu'))
model.add(Dense(200, activation='relu'))
model.add(Dense(500, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Preparing data for training
x=x_train
y=label

In [20]:
# Training the model
history = model.fit(x,y, epochs=50, batch_size=64)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [21]:
# Function to predict the answer
def pred_ans(i):
    image_features = image_embeddings[image_embeddings['Image ID'] == question_embeddings.iloc[i]['image_id']]['Image Vector'].apply(lambda x:
                           np.fromstring(
                           x.replace('\n', '')
                            .replace('[', '')
                            .replace(']', '')
                            .replace('  ', ' '), sep=' ')).tolist()

    if not all(array.shape == image_features[0].shape for array in image_features):
        print(f"Error: Different shapes found in image_features at index {i}")
        return None

    image = tf.convert_to_tensor(image_features, dtype=tf.float32)
    question = question_embeddings_processed[i]
    t = shrink_dimensions(question, image[0])
    t = t.numpy()
    t = np.reshape(t, (1, GLOBAL_DIM))
    predictions = model.predict(t)
    predicted_class = np.argmax(predictions, axis=1)
    return predicted_class

# Creating lists for key-value pairs of answers
key_list = list(ans_vocab.keys())
val_list = list(ans_vocab.values())

In [None]:
# Unzipping the image dataset
!unzip /content/MyDrive/MyDrive/train2014.zip -d /content/train2014

In [None]:
# Function to retrieve VQA details for a given question ID
def get_vqa_details(question_id):
    if question_id >= len(question_embeddings):
        return "Question ID is out of range."

    image_id = question_embeddings.iloc[question_id]['image_id']
    question = question_embeddings.iloc[question_id]['question']

    # Constructing the image path
    image_path_ann = f'/content/train2014/train2014/COCO_train2014_{str(image_id).zfill(12)}.jpg'

    # Predicting the answer
    predicted_answer_id = pred_ans(question_id)
    if predicted_answer_id is None:
        predicted_answer = "Error in prediction"
    else:
        predicted_answer = key_list[val_list.index(predicted_answer_id[0])]

    # Retrieving the actual answer
    actual_answer = unique_answer(question_embeddings.iloc[question_id]["question_id"])

    return image_path_ann, question, predicted_answer, actual_answer

# Displaying the image and VQA details
from IPython.display import Image, display
pred_id = 11
image_path, question, predicted_answer, actual_answer = get_vqa_details(pred_id)
print(f"Image Path: {image_path}")
print(f"Question: {question}")
print(f"Predicted Answer: {predicted_answer}")
print(f"Actual Answer: {actual_answer}")
display(Image(filename=image_path))