In [2]:
import tensorflow as tf 
from tensorflow.keras.preprocessing.text import Tokenizer 
from tensorflow.keras.preprocessing.sequence import pad_sequences 
from tensorflow.keras.layers import Dense, Layer, Embedding, GRU, Bidirectional, LSTM, GlobalAveragePooling2D , Dropout
from tensorflow.keras.utils import Sequence 
import numpy as np
import cv2
import pandas as pd 
from tqdm.notebook  import tqdm

# Tokenizer

In [3]:
with open("/kaggle/input/datasett/combined_qa(2).txt" , 'r') as file:
    text = file.read()
text = text.split('\n')
tokenizer = Tokenizer(filters = '' , oov_token= '<OOV>')
tokenizer.fit_on_texts(text)

In [4]:
sample_questions = ' answer of this question is '
sequece_tokens = tokenizer.texts_to_sequences([sample_questions])
print(sequece_tokens)

[[8, 4, 6, 9, 3]]


In [5]:
sequences = tokenizer.texts_to_sequences(text)

In [6]:
word_index = tokenizer.word_index


In [7]:
start_token = len(word_index) + 1
end_token = len(word_index) + 2
tokenizer.word_index['start'] = start_token
tokenizer.word_index['end']  = end_token
tokenizer.index_word[start_token] = 'start'
tokenizer.index_word[end_token]  = 'end'

In [8]:
tokenizer.word_index['start'] , tokenizer.word_index['end'] , tokenizer.index_word[3134] , tokenizer.index_word[3135]

(3134, 3135, 'start', 'end')

In [9]:
sequences_with_se = [[start_token] + seq + [end_token]  for seq in sequences ]

In [10]:
se = tokenizer.texts_to_sequences(["what is between the the two white and black garbage bins in the image1 ? answer of this question is  chair"])[0]
print(se)

[10, 3, 85, 2, 2, 521, 37, 66, 48, 145, 446, 5, 2, 327, 7, 8, 4, 6, 9, 3, 26]


In [11]:
[tokenizer.index_word[idx] for idx in se]

['what',
 'is',
 'between',
 'the',
 'the',
 'two',
 'white',
 'and',
 'black',
 'garbage',
 'bins',
 'in',
 'the',
 'image1',
 '?',
 'answer',
 'of',
 'this',
 'question',
 'is',
 'chair']

In [12]:
len(sequences_with_se)

12469

In [13]:
pad_len = max(len(seq) for seq in sequences_with_se)
pad_len

39

In [14]:
padded_sequence = pad_sequences(sequences_with_se ,maxlen=pad_len ,padding= 'post' )

In [15]:
tokenizer.index_word[0] = '<pad>'
tokenizer.word_index['<pad>'] = 0

In [16]:
[tokenizer.index_word[seq] for seq in padded_sequence[0]]

['start',
 'what',
 'is',
 'on',
 'the',
 'left',
 'side',
 'of',
 'the',
 'white',
 'oven',
 'on',
 'the',
 'floor',
 'and',
 'on',
 'right',
 'side',
 'of',
 'the',
 'blue',
 'armchair',
 'in',
 'the',
 'image1',
 '?',
 'answer',
 'of',
 'this',
 'question',
 'is',
 'garbage_bin',
 'end',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>',
 '<pad>']

In [17]:
padded_sequence = np.array(padded_sequence)

In [18]:
padded_sequence[:4]

array([[3134,   10,    3,   11,    2,   19,   15,    4,    2,   37,  139,
          11,    2,   33,   66,   11,   18,   15,    4,    2,   71,  351,
           5,    2,  327,    7,    8,    4,    6,    9,    3,   77, 3135,
           0,    0,    0,    0,    0,    0],
       [3134,   10,    3,   11,    2,   19,   15,    4,    2, 1001, 1637,
          66,   11,    2,   18,   15,    4,    2,   26,    5,    2,  327,
           7,    8,    4,    6,    9,    3,   13, 3135,    0,    0,    0,
           0,    0,    0,    0,    0,    0],
       [3134,   10,    3,   85,    2,    2,  521,   37,   66,   48,  145,
         446,    5,    2,  327,    7,    8,    4,    6,    9,    3,   26,
        3135,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0],
       [3134,   16,   17,   35,   12,   85,    2, 1001, 1637,   66,    2,
          37,  139,   11,    2,   33,    5,    2,  327,    7,    8,    4,
           6,    9,    3,   45, 3135,    0,    0,  

In [19]:
def tokenize(sequence, tokenizer , pad_len):
    tokens = tokenizer.texts_to_sequences([sequence])[0]
    padded_sequence = pad_sequences([tokens] ,maxlen=pad_len ,padding= 'post' )[0]
    return padded_sequence

In [20]:
seq = "what is between the the two white and black garbage bins in the image1 ? answer of this question is  chair"
tokenize(seq , tokenizer , pad_len)

array([ 10,   3,  85,   2,   2, 521,  37,  66,  48, 145, 446,   5,   2,
       327,   7,   8,   4,   6,   9,   3,  26,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
      dtype=int32)

# CustomDataGenerator

In [21]:

class CustomDataGenerator(Sequence):
    def __init__(self, dataframe, tokenizer, pad_len, batch_size=32, new_shape=(224, 224), shuffle=True):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.shuffle = shuffle
        self.pad_len = pad_len
        self.batch_size = batch_size
        self.new_shape = new_shape
        self.indexes = np.arange(len(dataframe))
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.floor(len(self.dataframe) / self.batch_size))
                   
    def __getitem__(self, index):
        start_index = index * self.batch_size 
        end_index = (index + 1) * self.batch_size 
        batch_indexes = self.indexes[start_index:end_index]
        batch_images, batch_texts, batch_decoder_input, batch_decoder_target = [], [], [], []
        
        for idx in batch_indexes:
            row = self.dataframe.iloc[idx]
            image_id = row['image_id']
            image_path = f"/kaggle/input/visual-question-answering-computer-vision-nlp/dataset/images/{image_id}.png"     
            question = row['question']
            answer = row['answer']
            image_data = cv2.imread(image_path)
            image_data = cv2.resize(image_data, self.new_shape)
            image_data = image_data / 255.0  
            
            text = f"start {question} answer of this question is {answer} end"
            tokenized_text = self.tokenize(text)
            
            decoder_input = tokenized_text[:-1]
            decoder_target = tokenized_text[1:]
            
            batch_images.append(image_data)
            batch_texts.append(tokenized_text)
            batch_decoder_input.append(decoder_input)
            batch_decoder_target.append(decoder_target)
            
        batch_images = np.array(batch_images, dtype=np.float32)
        batch_decoder_input = np.array(batch_decoder_input, dtype=np.int32)
        batch_decoder_target = np.array(batch_decoder_target, dtype=np.int32)
        
        encoder_input = tf.convert_to_tensor(batch_images, dtype=tf.float32)
        decoder_input = tf.convert_to_tensor(batch_decoder_input, dtype=tf.int32)
        decoder_target = tf.convert_to_tensor(batch_decoder_target, dtype=tf.int32)
        
        return (encoder_input, decoder_input), decoder_target
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)
            
    def tokenize(self, sequence):
        tokens = self.tokenizer.texts_to_sequences([sequence])[0]
        padded_sequence = pad_sequences([tokens], maxlen=self.pad_len, padding='post')[0]
        return padded_sequence


In [22]:
train_df = pd.read_csv("/kaggle/input/visual-question-answering-computer-vision-nlp/dataset/data_train.csv")
val_df = pd.read_csv("/kaggle/input/visual-question-answering-computer-vision-nlp/dataset/data_eval.csv")

In [23]:
train_data = CustomDataGenerator(train_df , tokenizer , pad_len)

In [24]:
for inputs , target in train_data:
    print(inputs[0].shape)
    print(inputs[1].shape)
    print(target.shape)
    break

(32, 224, 224, 3)
(32, 38)
(32, 38)


In [25]:

def detokenize_sequence( tokenizer,sequence):
    word_index = tokenizer.word_index
    index_to_word = {idx: word for word, idx in word_index.items()}
    
    if isinstance(sequence, tf.Tensor):
        sequence = sequence.numpy()
    
    # Ensure sequence is 1-dimensional
    sequence = sequence.flatten()

    text = " ".join([index_to_word.get(int(token), '') for token in sequence if int(token) != 0])
    return text

In [26]:
for inputs , targets in train_data:
    print("Image input", inputs[0].shape)
    encoder_input = [detokenize_sequence(tokenizer , ip) for ip in inputs[1]]
    print("Encoder Input :", encoder_input)
    print("\n")
    target = [detokenize_sequence(tokenizer , tar) for tar in targets]
    print("Encoder Output :" ,target)
    break

Image input (32, 224, 224, 3)
Encoder Input : ['start what object is stuck on the right storage rack answer of this question is tissue_roll end', 'start what object is found on the right top answer of this question is oven end', 'start what is on the left side of the monitor answer of this question is remote_control end', 'start what objects are found in front of the table answer of this question is books, toy, photo end', 'start what are the dark brown objects in this picture answer of this question is piano, piano_bench end', 'start what is to the right of the piano answer of this question is stroller end', 'start what is the object on the floor close to the wall divider answer of this question is stool end', 'start what is on the table answer of this question is tablecloth end', 'start what is found in front the man on the left side answer of this question is door end', 'start what is on the flush tank answer of this question is tissue_box end', 'start what is colour of television a

# VQA Model

In [272]:
class ImageEncoder(tf.keras.Model):
    def __init__(self , input_shape):
        super(ImageEncoder, self).__init__()
        self.input_shape = input_shape
        self.base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=self.input_shape)
        self.base_model.trainable = False 
        self.model = tf.keras.Sequential([
            self.base_model,
            GlobalAveragePooling2D(),
            Dropout(0.5),
            Dense(256, activation='relu')
        ])
        
    def call(self, x):
        x = self.model(x)
        return x 
    
    
class LanguageEncoder(tf.keras.Model):
    def __init__(self, vocab_size, emb_dim, rnn_units):
        super(LanguageEncoder, self).__init__()
        self.embedding = Embedding(input_dim=vocab_size, output_dim=emb_dim)
        self.rnn = Bidirectional(LSTM(rnn_units, return_sequences=True))

    def call(self, text):
        emb = self.embedding(text)
        output = self.rnn(emb)
        return output 


# Attention mechanism
# Additive attention are used
class InsampleAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(InsampleAttention, self).__init__()
        self.w1 = Dense(units)
        self.w2 = Dense(units)
        self.v = Dense(1)
        
    def call(self, query, values):
        query = tf.expand_dims(query, axis=1)
        score = self.v(tf.nn.tanh(self.w1(query) + self.w2(values)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

class CrossSampleAttention(Layer):
    def __init__(self, units):
        super(CrossSampleAttention, self).__init__()
        self.w1 = Dense(units)
        self.w2 = Dense(units)
        self.v = Dense(1)
        
    def call(self, query, values):
        values = tf.expand_dims(values, axis=1)
        
        query = self.w1(query)
        value = self.w2(values)

        score = self.v(tf.nn.tanh(query + value))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector, attention_weights

    
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, emb_dim, rnn_units):
        super(Decoder, self).__init__()
        self.embedding = Embedding(vocab_size, emb_dim)
        self.rnn = LSTM(rnn_units, return_sequences=True, return_state=True)
        self.dense = Dense(vocab_size)
        
    def call(self, context_vector, target_sequence, training=False):

        target_emb = self.embedding(target_sequence)
        context_vector = tf.expand_dims(context_vector, axis=1)
    
        context_vector = tf.cast(context_vector, dtype=tf.float32)

        context_vector = tf.tile(context_vector, [1, tf.shape(target_sequence)[1], 1])
        rnn_input = tf.concat([target_emb, context_vector], axis=-1)
        rnn_output, _, _ = self.rnn(rnn_input, training=training)
        logits = self.dense(rnn_output)
        logits = logits[:, :38, :] 
        
        return logits


class VQAModel(tf.keras.Model):
    def __init__(self, vocab_size, emb_dim, rnn_units):
        super(VQAModel, self).__init__()
        self.image_encoder = ImageEncoder(input_shape=(224, 224, 3)) 
        self.language_encoder = LanguageEncoder(vocab_size, emb_dim, rnn_units)
        self.insample_attention = InsampleAttention(rnn_units)
        self.cross_attention = CrossSampleAttention(rnn_units)
        self.decoder = Decoder(vocab_size, emb_dim, rnn_units)
        self.dense = Dense(vocab_size)

    def call(self, images, texts, training=False):
        image_feature = self.image_encoder(images)
        text_feature = self.language_encoder(texts)
        
        context_vector_is, _ = self.insample_attention(image_feature, text_feature)
        context_vector_cs, _ = self.cross_attention(text_feature, image_feature)

        concat_context = tf.concat([context_vector_is, context_vector_cs], axis=-1)
        
        return concat_context

    def decode(self, context_vector, target_sequence, training=False):
        # Ensure target_sequence is properly formatted for the decoder
        return self.decoder(target_sequence, context_vector, training=training)


In [273]:
vocab_size =len(tokenizer.word_index)+1 
emb_dims = 512 
rnn_units = 128 
head = 8 
layer = 6
model = VQAModel(vocab_size , emb_dims , rnn_units )
model.summary()

# Custom training Method

In [274]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')
train_loss = tf.keras.metrics.Mean(name='train_loss')
val_loss = tf.keras.metrics.Mean(name='val_loss')

@tf.function
def train_step(model, encoder_inputs, decoder_inputs, decoder_outputs):
    with tf.GradientTape() as tape:
        context_vector = model(encoder_inputs, decoder_inputs, training=True)
        predictions = model.decode(context_vector, decoder_inputs, training=True)
       
        loss = loss_object(decoder_outputs, predictions)
        
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    train_loss.update_state(loss)
    train_accuracy.update_state(decoder_outputs, predictions)

@tf.function
def val_step(model, encoder_inputs, decoder_inputs, decoder_outputs):
    context_vector = model(encoder_inputs, decoder_inputs, training=False)
    predictions = model.decode(context_vector, decoder_inputs, training=False)
    loss = loss_object(decoder_outputs, predictions)
    
    val_loss.update_state(loss)
    val_accuracy.update_state(decoder_outputs, predictions)


In [275]:
train_data = CustomDataGenerator(train_df , tokenizer , pad_len)
val_data = CustomDataGenerator(val_df , tokenizer , pad_len)

In [None]:
epochs = 5
train_steps = len(train_data) + 1
val_steps = len(val_data) + 1

for epoch in range(epochs):
    train_loss.reset_state()
    train_accuracy.reset_state()
    val_loss.reset_state()
    val_accuracy.reset_state()

    print(f"Epoch {epoch + 1} / {epochs}")

    train_tqdm = tqdm(train_data, total=len(train_data), desc="Training", unit='batch')
    for step, (inputs, target) in enumerate(train_tqdm):
        encoder_inputs, decoder_inputs = inputs
        train_step(model, encoder_inputs, decoder_inputs, target)
        train_tqdm.set_postfix(loss=train_loss.result().numpy(), accuracy=train_accuracy.result().numpy() * 100)
        if step + 1 >= train_steps:
            break

    val_data_tqdm = tqdm(val_data, total=len(val_data), desc="Validation", unit='batch')
    for step, (inputs, target) in enumerate(val_data_tqdm):
        val_encoder_inputs, val_decoder_inputs = inputs
        val_step(model, val_encoder_inputs, val_decoder_inputs, target)
        val_data_tqdm.set_postfix(loss=val_loss.result().numpy(), accuracy=val_accuracy.result().numpy() * 100)
        if step + 1 >= val_steps:
            break

    template = "Epoch {} Loss: {:.4f}, Accuracy: {:.2f}%, Val Loss: {:.4f}, Val Accuracy: {:.2f}%"
    print(template.format(epoch + 1,
                          train_loss.result().numpy(),
                          train_accuracy.result().numpy() * 100,
                          val_loss.result().numpy(),
                          val_accuracy.result().numpy() * 100))


Epoch 1 / 5


Training:   0%|          | 0/311 [00:00<?, ?batch/s]

Validation:   0%|          | 0/77 [00:00<?, ?batch/s]

Epoch 1 Loss: 2.0432, Accuracy: 61.35%, Val Loss: 2.0573, Val Accuracy: 61.50%
Epoch 2 / 5


Training:   0%|          | 0/311 [00:00<?, ?batch/s]

Validation:   0%|          | 0/77 [00:00<?, ?batch/s]

Epoch 2 Loss: 2.0300, Accuracy: 61.32%, Val Loss: 2.0267, Val Accuracy: 61.42%
Epoch 3 / 5


Training:   0%|          | 0/311 [00:00<?, ?batch/s]

Validation:   0%|          | 0/77 [00:00<?, ?batch/s]

Epoch 3 Loss: 2.0077, Accuracy: 61.54%, Val Loss: 2.0128, Val Accuracy: 62.18%
Epoch 4 / 5


Training:   0%|          | 0/311 [00:00<?, ?batch/s]

In [None]:
model.save("knr.h5")

In [281]:

def process_image(image_path, new_shape=(224, 224)):
    image_data = cv2.imread(image_path)
    image_data = cv2.resize(image_data, new_shape)
    image_data = image_data / 255.0  
    return image_data
import tensorflow as tf

def generate_answer(model, image, question, start_token, end_token, max_length=38):
    context_vector = model(image[tf.newaxis], question[tf.newaxis])  # Add batch dimension

    decoder_inputs = tf.fill([1, 1], start_token) 

    predicted_tokens = []

    for _ in range(max_length):
        predictions = model.decode(context_vector, decoder_inputs) 
        
        next_token_logits = predictions[:, -1, :]  
        next_token = tf.argmax(next_token_logits, axis=-1) 
        next_token = tf.cast(next_token, tf.int32)

        predicted_tokens.append(next_token[0].numpy())

        decoder_inputs = tf.concat([decoder_inputs, next_token[:, tf.newaxis]], axis=1)

        if next_token[0].numpy() == end_token:
            break

    return predicted_tokens

start_token = 3134  
end_token = 3135  

processed_image = process_image("/kaggle/input/visual-question-answering-computer-vision-nlp/dataset/images/image1.png")

question = "What is on the left side of the white oven on the floor and on right side of the blue armchair in the image1?"
tokenized_question = tokenize(question, tokenizer, 38)

predicted_tokens = generate_answer(model, processed_image, tokenized_question, start_token, end_token)


predicted_text =[detokenize_sequence(tokenizer,pre)for pre in predicted_tokens] 
print(predicted_text)


(1, 256)
(1, 256)
(1, 512)
(1, 1, 1)
(1, 512, 512)
(1, 1, 2)
(1, 512, 512)
(1, 1, 3)
(1, 512, 512)
(1, 1, 4)
(1, 512, 512)
(1, 1, 5)
(1, 512, 512)
(1, 1, 6)
(1, 512, 512)
(1, 1, 7)
(1, 512, 512)
(1, 1, 8)
(1, 512, 512)
(1, 1, 9)
(1, 512, 512)
(1, 1, 10)
(1, 512, 512)
(1, 1, 11)
(1, 512, 512)
(1, 1, 12)
(1, 512, 512)
(1, 1, 13)
(1, 512, 512)
(1, 1, 14)
(1, 512, 512)
(1, 1, 15)
(1, 512, 512)
(1, 1, 16)
(1, 512, 512)
(1, 1, 17)
(1, 512, 512)
(1, 1, 18)
(1, 512, 512)
(1, 1, 19)
(1, 512, 512)
(1, 1, 20)
(1, 512, 512)
(1, 1, 21)
(1, 512, 512)
(1, 1, 22)
(1, 512, 512)
(1, 1, 23)
(1, 512, 512)
(1, 1, 24)
(1, 512, 512)
(1, 1, 25)
(1, 512, 512)
(1, 1, 26)
(1, 512, 512)
(1, 1, 27)
(1, 512, 512)
(1, 1, 28)
(1, 512, 512)
(1, 1, 29)
(1, 512, 512)
(1, 1, 30)
(1, 512, 512)
(1, 1, 31)
(1, 512, 512)
(1, 1, 32)
(1, 512, 512)
(1, 1, 33)
(1, 512, 512)
(1, 1, 34)
(1, 512, 512)
(1, 1, 35)
(1, 512, 512)
(1, 1, 36)
(1, 512, 512)
(1, 1, 37)
(1, 512, 512)
(1, 1, 38)
(1, 512, 512)
['but', 'most', 'game_table', 'w