# **Visual Question Answering**

This notebook presents an approach to solving the Visual Question Answering (VQA) task by leveraging a pretrained **ResNet** model to extract image embeddings, combined with various models for processing the textual component of the input.

## **Import libraries**

In [None]:
# standard libs
import os
import io
import glob
import time
import json
import re
import pickle
from datetime import datetime as dt # timing
import sys
from sklearn.metrics import confusion_matrix
from collections import Counter
import seaborn as sns

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
from tqdm import tqdm
from PIL import Image

# ResNet
import torch.nn as nn
from torchvision.models import resnet50, ResNet50_Weights

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.optim import lr_scheduler

# TorchVision
import torchvision
from torchvision import transforms, utils, models
from transformers import BertTokenizer, BertModel

# TensorBoard for logging
from torch.utils.tensorboard import SummaryWriter

## **Data preprocessing**

In [None]:
# connect to Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
SEED = 24
np.random.seed(SEED)
torch.manual_seed(SEED)

# path
drive_path = '/content/drive/MyDrive'
new_folder = 'ResNet50'
full_path = os.path.join(drive_path, new_folder)

os.makedirs(full_path, exist_ok=True)

In [None]:
# functions for loading the data

def load_split_lists(folder_path, train_filename="train_list.pkl", val_filename="val_list.pkl"):
    """
    Loads training and validation list data from pickle files.
    """
    train_path = os.path.join(folder_path, train_filename)
    val_path = os.path.join(folder_path, val_filename)

    def load_list(path):
        with open(path, "rb") as f:
            return pickle.load(f)

    train_list = load_list(train_path)
    val_list = load_list(val_path)

    print(f"Loaded {len(train_list)} training items.")
    print(f"Loaded {len(val_list)} validation items.")

    return train_list, val_list

def load_features_by_split(split_name, folder_path):
    """
    Loads image features and corresponding image IDs for a given split.
    """
    file_path = os.path.join(folder_path, f"{split_name}_image_features.pkl")
    with open(file_path, "rb") as f:
        data = pickle.load(f)
    img_ids = data["img_ids"]
    features = data["features"]
    print(f"Loaded features for {split_name}: {len(img_ids)} items.")
    return img_ids, features

## **Encode questions and answers**

In [None]:
# bert
global_bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"BERT tokenizer loaded for pre-processing. Vocabulary size: {global_bert_tokenizer.vocab_size}")

# used for padding_idx
question_vocab = None

def create_question_vocab(qa_list):
    '''
    Creates vocabulary for given json formatted dataset using BERT's tokenizer to get tokens.
    '''
    # Initialize with special tokens
    vocab = ['<PAD>', '<UNK>']

    unique_tokens = set()
    for q_item in qa_list:
        bert_tokens = global_bert_tokenizer.tokenize(q_item['question'].lower())
        unique_tokens.update(bert_tokens)

    vocab.extend(sorted(list(unique_tokens)))

    vocab_dict = {}
    for i in range(len(vocab)):
        vocab_dict[vocab[i]] = i

    print(f"Created custom question vocabulary with {len(vocab_dict)} entries.")
    return vocab_dict

def encode_questions(qa_list, question_vocab, max_length = 25):
    '''
    Given data json and custom question vocab. This function encodes each question into 25 length list
    using BERT tokenizer and then mapping to custom vocab IDs.
    '''
    encoded_questions = []

    for q_item in qa_list:
        # tokeniser bert
        bert_tokens = global_bert_tokenizer.tokenize(q_item['question'].lower())

        # tokens in vocab id's
        question_ids = [question_vocab.get(token, question_vocab['<UNK>']) for token in bert_tokens]

        # padding
        length = len(question_ids)
        if(length < max_length):
            question_ids += [question_vocab['<PAD>'] for i in range(max_length - length)]
        else:
            question_ids = question_ids[:max_length]

        encoded_questions.append(question_ids)
    return encoded_questions


def create_answer_vocab(qalist, top=1000):
    counts = {}
    for annotation in qalist:
        for answer in annotation['answers']:
            a = answer['answer'].lower()
            counts[a] = counts.get(a, 0) + 1
        a = annotation['multiple_choice_answer'].lower()
        counts[a] = counts.get(a, 0) + 1

    sorted_ans = sorted([(count, ans) for ans, count in counts.items()], reverse=True)
    answers_list = [sorted_ans[i][1] for i in range(min(top - 1, len(sorted_ans)))]
    if '<UNK>' not in answers_list:
        answers_list.append('<UNK>')

    return {ans: i for i, ans in enumerate(answers_list)}

def encode_answers(qa_list, answer_vocab):
    '''
    returns list of indexes corresponding to top_answers
    '''
    encoded_answers = []
    encoded_multi_answers = []
    # get UNK ID
    unk_answer_id = answer_vocab.get('<UNK>', len(answer_vocab) - 1)

    for annotation in qa_list:
        all_answers = list(set([answer["answer"].lower() for answer in annotation['answers']]))

        valid_answers = [a for a in all_answers if a in answer_vocab]
        if not valid_answers:
            valid_answers = ['<UNK>']

        answers_ids = [answer_vocab[answer] for answer in valid_answers]

        # multi_answers to a fixed length based on batch
        multi_answers_padded = [-11]*10
        multi_answers_padded[:len(answers_ids)] = answers_ids
        encoded_multi_answers.append(multi_answers_padded)

        primary_answer_id = answer_vocab.get(annotation['multiple_choice_answer'].lower(), unk_answer_id)

        # consensus answer is UNK, but other annotators gave valid answers, pick one of those
        if primary_answer_id == unk_answer_id and answers_ids:
            valid_non_unk_answers = [aid for aid in answers_ids if aid != unk_answer_id]
            if valid_non_unk_answers:
                encoded_answers.append(np.random.choice(valid_non_unk_answers))
            else:
                encoded_answers.append(unk_answer_id)
        else:
            encoded_answers.append(primary_answer_id)

    return encoded_answers, np.array(encoded_multi_answers)

def filter_dataset(qa_list, answer_type):
    '''
    filters the dataset based on the given answer type
    '''
    return [x for x in qa_list if(x['answer_type'] == answer_type) ]

## **Create dataset**

In [None]:
# VQA Dataset Class
class VQADataset(Dataset):
    def __init__(self, json_list, question_vocab, answer_vocab, split, train_img_feats=None, val_img_feats=None, answer_type=None, max_question_length=25):
        """
          json_list (list): List of question-answer dictionaries.
          question_vocab (dict): The vocabulary mapping question tokens to IDs.
          answer_vocab (dict): The vocabulary mapping answers to IDs.
          split (str): 'train' or 'val' to determine which image features to use.
          train_img_feats (tuple): (img_ids, features) for training images.
          val_img_feats (tuple): (img_ids, features) for validation images.
          answer_type (str, optional): Filters the dataset by answer type (e.g., 'yes/no').
          max_question_length (int): Max length for question tokenization.
        """
        if answer_type is not None:
            self.text_data = filter_dataset(json_list, answer_type)
            print(f"Dataset filtered for answer_type: '{answer_type}'. New size: {len(self.text_data)}")
        else:
            self.text_data = json_list

        self.questions_encoded = encode_questions(self.text_data, question_vocab, max_length=max_question_length)
        self.answers_encoded, self.multi_answers = encode_answers(self.text_data, answer_vocab)

        if split == 'train':
            self.img_ids_list, self.img_features = train_img_feats
        elif split == 'val':
            self.img_ids_list, self.img_features = val_img_feats

        # map image_id -> index for quick lookup
        self.img_id_to_idx = {img_id: idx for idx, img_id in enumerate(self.img_ids_list)}

    def __len__(self):
        # length
        return len(self.text_data)

    def __getitem__(self, index):
        """
        take an item from the dataset at the specified index

        index (int): The index of the item to retrieve.

        """
        img_id = self.text_data[index]['image_id']
        img_idx = self.img_id_to_idx.get(img_id, None)

        if img_idx is None:
            print(f"Warning: Image ID {img_id} not found in pre-loaded image features for index {index}. Returning dummy data.")
            dummy_image_feature = torch.zeros(self.img_features.shape[1], dtype=torch.float32)
            item_image = dummy_image_feature
        else:
            item_image = self.img_features[img_idx]

        item = {}
        item['image'] = item_image
        item['question'] = torch.tensor(self.questions_encoded[index], dtype=torch.long)
        item['answer'] = torch.tensor(self.answers_encoded[index], dtype=torch.long)
        item['multi_answer'] = torch.tensor(self.multi_answers[index], dtype=torch.long)

        # return dictionary containing 'image', 'question', 'answer', and 'multi_answer' tensors.
        return item


## **Deeper LSTM**

In [None]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, img_dim, output_dim, dropout_p = 0.5):

        super(LSTM, self).__init__()

        # initializing the layers
        self.embedding = nn.Embedding(vocab_size, 300, padding_idx = question_vocab['<PAD>'])
        self.fc_embed = nn.Linear(300, 300)
        self.lstm = nn.LSTM(300, 512, num_layers =2, batch_first = True)
        self.fc_lstm = nn.Linear(4*512, 1024)
        self.fc_i = nn.Linear(img_dim, 1024)
        self.fc1 = nn.Linear(1024, 1000)
        self.fc2 = nn.Linear(1000, output_dim) # dynamically matches answer vocab size
        self.dropout = nn.Dropout(dropout_p)

        # initializing weights
        self.init_weights()


    def forward(self, img_feat_input, text_input):

        # with gelu non-linearity
        text_embedded = F.gelu(self.fc_embed(self.embedding(text_input)))

        _, (lstm_hidden, lstm_cell)  = self.lstm(text_embedded)

        # concat
        lstm_concat = torch.cat([lstm_hidden[0], lstm_hidden[1], lstm_cell[0], lstm_cell[1]], 1)

        # fully connected with gelu non-linearity
        fc_lstm = self.dropout(F.gelu(self.fc_lstm(lstm_concat)))

        # l2 normalization
        fc_i = F.gelu(self.fc_i(F.normalize(img_feat_input, dim = 1, p=2)))

        # point-wise multiplication
        pw_mul = fc_i * fc_lstm

        # dropout
        fc_1 = self.dropout(F.gelu(self.fc1(self.dropout(pw_mul))))

        # output
        output = F.softmax(self.fc2(fc_1), dim=1)

        return output

    def init_weights(self):

        # kaiming_uniform
        init = torch.nn.init.kaiming_uniform_
        for layer in self.lstm.all_weights:
            for hidden in layer[:2]:
                init(hidden)

        init(self.fc_embed.weight)
        init(self.fc_lstm.weight)
        init(self.fc_i.weight)
        init(self.fc1.weight)
        init(self.fc2.weight)

### **Prepare for training**

In [None]:
# config
num_workers = 2
batch_size = 512
epochs = 15

# data loading
train_list, val_list = load_split_lists(full_path)

train_img_ids, train_features_np = load_features_by_split("train2014", full_path)
val_img_ids, val_features_np = load_features_by_split("val2014", full_path)

train_features = torch.tensor(train_features_np, dtype=torch.float32)
val_features = torch.tensor(val_features_np, dtype=torch.float32)

actual_img_dim = train_features.shape[1]
print(f"Detected image feature dimension (img_dim): {actual_img_dim}")

In [None]:
# create vocabularies
question_vocab = create_question_vocab(train_list)
answer_vocab = create_answer_vocab(train_list, top=1000)  # top-k answers
output_vocab_size = len(answer_vocab)

print(f"Question vocab size: {len(question_vocab)}")
print(f"Answer vocab size: {output_vocab_size}")

In [None]:
# initialize datasets
train_dataset = VQADataset(json_list=train_list,
                             question_vocab=question_vocab,
                             answer_vocab=answer_vocab,
                             split='train',
                             train_img_feats=(train_img_ids, train_features))

val_dataset = VQADataset(json_list=val_list,
                            question_vocab=question_vocab,
                            answer_vocab=answer_vocab,
                            split='val',
                            val_img_feats=(val_img_ids, val_features))

# free up memory after dataset creation
del train_list
del val_list
del train_features_np
del val_features_np
del train_features
del val_features
print("Data loaded and datasets initialized. Memory for raw data cleared.")

# initialize DataLoaders
train_dataloader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers,
                              pin_memory=True)

val_dataloader = DataLoader(val_dataset,
                            batch_size=batch_size,
                            shuffle=False,
                            num_workers=num_workers,
                            pin_memory=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# model initialization
model = LSTM(
    vocab_size=len(question_vocab),
    img_dim=actual_img_dim,
    output_dim=output_vocab_size,
    dropout_p=0.5
)
model.to(device)
print(f"Model expected output size: {model.fc2.out_features}")

# Optimizer and Learning Rate Scheduler
optimizer = torch.optim.Adamax(model.parameters(), lr=0.001)
onecycle_scheduler = lr_scheduler.OneCycleLR(optimizer,
                                             steps_per_epoch=len(train_dataloader),
                                             max_lr=0.005,
                                             epochs=epochs)

### **Training**

In [None]:
unk_answer_idx = answer_vocab['<UNK>']
criterion = torch.nn.NLLLoss(ignore_index=unk_answer_idx).to(device)

min_loss = float('inf')

train_losses_lstm = []
train_accs_lstm = []
val_losses_lstm = []
val_accs_lstm = []

save_dir = "/content/drive/MyDrive/vqa_models"
os.makedirs(save_dir, exist_ok=True)
best_model_path = os.path.join(save_dir, "best_model_lstm.pth")

print(f"Starting Training Loop, saving best model to: {best_model_path}")

for epoch in range(epochs):
    start = dt.now()
    train_loss = 0.0
    train_correct = 0
    model.train()

    for batch_idx, data in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1} Training")):
        images = data['image'].to(device)
        questions = data['question'].to(device)
        answers = data['answer']
        multi_answers = data['multi_answer']

        optimizer.zero_grad()
        output = model(images, questions)
        predictions = torch.argmax(output.cpu(), dim=1)

        indices_incorrect = torch.where(~(answers == predictions))[0]
        for idx in indices_incorrect:
            if predictions[idx] in multi_answers[idx]:
                answers[idx] = multi_answers[idx][torch.where(multi_answers[idx] == predictions[idx])[0].item()]

        answers_gpu = answers.to(device)
        loss = criterion(torch.log(output + 1e-7), answers_gpu)

        loss.backward()
        optimizer.step()
        onecycle_scheduler.step()

        predictions[predictions == unk_answer_idx] = -1
        correct = (predictions == answers).sum().item()

        train_correct += correct
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_dataloader)
    avg_train_acc = train_correct / len(train_dataloader.dataset)
    train_losses_lstm.append(avg_train_loss)
    train_accs_lstm.append(avg_train_acc)

    model.eval()
    val_loss = 0.0
    val_correct = 0

    # lists cm
    all_true = []
    all_pred = []

    with torch.no_grad():
        for batch_idx, data in enumerate(tqdm(val_dataloader, desc=f"Epoch {epoch+1} Validation")):
            images = data['image'].to(device)
            questions = data['question'].to(device)
            answers = data['answer']
            multi_answers = data['multi_answer']

            output = model(images, questions)
            vloss = criterion(torch.log(output + 1e-7), answers.to(device))
            val_loss += vloss.item()

            predictions = torch.argmax(output.cpu(), 1)

            indices_incorrect = torch.where(~(answers == predictions))[0]
            for idx in indices_incorrect:
                if predictions[idx] in multi_answers[idx]:
                    answers[idx] = multi_answers[idx][torch.where(multi_answers[idx] == predictions[idx])[0].item()]

            predictions[predictions == unk_answer_idx] = -1
            correct = (predictions == answers).sum().item()
            val_correct += correct

            # labels for confusion matrix
            all_true.extend(answers.tolist())
            all_pred.extend(predictions.tolist())

    avg_val_loss = val_loss / len(val_dataloader)
    avg_val_acc = val_correct / len(val_dataloader.dataset)
    val_losses_lstm.append(avg_val_loss)
    val_accs_lstm.append(avg_val_acc)

    epoch_duration = dt.now() - start

    print(f'\nEpoch {epoch+1} Summary => '
          f'Train Loss: {avg_train_loss:.5f} | Train Acc: {avg_train_acc:.4f} | '
          f'Val Loss: {avg_val_loss:.5f} | Val Acc: {avg_val_acc:.4f} [{epoch_duration}]')

    # save best model based on validation loss
    if avg_val_loss < min_loss:
        print(f"Validation loss improved ({min_loss:.5f} -> {avg_val_loss:.5f}). Saving model...")
        min_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)

    print("\n")

    print(f"Training finished. Best model saved at: {best_model_path}")

### **Plot**

In [None]:
min_len = min(len(train_losses_lstm), len(val_losses_lstm),
              len(train_accs_lstm), len(val_accs_lstm))

epochs = range(1, min_len + 1)

plt.figure(figsize=(14, 5))

# Plot Loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses_lstm[:min_len], 'b-', label='Training Loss')
plt.plot(epochs, val_losses_lstm[:min_len], 'r-', label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot Accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accs_lstm[:min_len], 'b-', label='Training Accuracy')
plt.plot(epochs, val_accs_lstm[:min_len], 'r-', label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# reverse mapping
idx2word = {v: k for k, v in answer_vocab.items()}

all_true = np.array(all_true)
all_pred = np.array(all_pred)

# filter
mask = (all_pred != -1)
all_true_filtered = all_true[mask]
all_pred_filtered = all_pred[mask]

# find top 10
true_label_counts = Counter(all_true_filtered)
top_label_indices = [label for label, _ in true_label_counts.most_common(10)]

# cm
cm = confusion_matrix(all_true_filtered, all_pred_filtered, labels=top_label_indices)

# convert to strings
def idx_to_word(idx):
    return idx2word.get(idx, str(idx))

disp_labels = [idx_to_word(idx) for idx in top_label_indices]

plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=disp_labels, yticklabels=disp_labels, cmap='Blues')
plt.xlabel('Predicted Answers')
plt.ylabel('True Answers')
plt.title('Confusion Matrix (Top 10 Answers)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

## **LSTM with attention**

In [None]:
# Attention Mechanism
class Attention(nn.Module):
    def __init__(self, img_dim = 1024, glimpses = 2):
        super(Attention, self).__init__()

        self.glimpses  = glimpses
        self.img_conv = nn.Conv2d(img_dim, 1024, 1)
        self.glimpse_conv = nn.Conv2d(1024, glimpses, 1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, i_feats, q_feats):

        # adapt i_feats to be 4D (N, C, H, W)
        if len(i_feats.shape) == 2:
            # flattened features
            i_feats_reshaped = i_feats.unsqueeze(-1).unsqueeze(-1)
        else:
            i_feats_reshaped = i_feats # 4D (batch, C, H, W)

        # reshape
        i = F.gelu(self.img_conv(i_feats_reshaped))

        # unsqueeze and expand for broadcasting
        q = q_feats.unsqueeze(-1).unsqueeze(-1).expand_as(i)

        # adding i and q features
        combined = F.gelu(i + q)

        # getting attention features
        glimpse_conv = self.dropout(F.gelu(self.glimpse_conv(combined)))

        # glimpse
        glimpse_conv = glimpse_conv.view(i_feats.size(0), self.glimpses, -1)

        # softmax across 2 features
        attention = F.softmax(glimpse_conv, dim=-1).unsqueeze(2)

        # unsqueeze and flat
        i_feats_flattened = i.view(i.size(0), 1, i.size(1), -1)

        # getting attention weights
        weighted = attention * i_feats_flattened

        # transform
        weighted_mean = weighted.sum(dim=-1).view(i_feats.size(0), -1)

        return weighted_mean


class LSTM_Att_bert(nn.Module):
    def __init__(self, img_dim, output_dim, dropout_p = 0.5): # output_vocab_size is implicitly 1000 top answers
        super(LSTM_Att_bert, self).__init__()

        # layers
        bert_model_instance = BertModel.from_pretrained('bert-base-uncased')
        bert_embedding_dim = bert_model_instance.embeddings.word_embeddings.weight.shape[1]
        bert_vocab_size = bert_model_instance.embeddings.word_embeddings.weight.shape[0]

        # bert
        self.embedding = nn.Embedding(bert_vocab_size, bert_embedding_dim, padding_idx = 0).from_pretrained(bert_model_instance.embeddings.word_embeddings.weight, freeze = True)
        self.fc_embed = nn.Linear(bert_embedding_dim, bert_embedding_dim)

        # lstm takes bert embeddings
        self.lstm = nn.LSTM(bert_embedding_dim, 1024, num_layers =2, bidirectional = True, batch_first = True)

        # fc
        self.fc_cell = nn.Linear(1024, 1024)

        # attention
        self.attention = Attention(img_dim)

        # fc layers
        self.fc1 = nn.Linear(2048+1024, 1024)
        self.fc2 = nn.Linear(1024, output_dim) # dynamic output size
        self.dropout = nn.Dropout(dropout_p)

        # initializing weights
        self.init_weights()


    def forward(self, img_input, text_input):

        # gelu non-linearity
        text_embedded = F.gelu(self.fc_embed(self.embedding(text_input)))

        # lstm_cells
        _, (_, lstm_cell)  = self.lstm(text_embedded)

        # question features
        question_feats = self.dropout(F.gelu(self.fc_cell(lstm_cell[-1])))

        # L2-normalized img feats
        img_feats = F.normalize(img_input, dim = 1, p=2)

        # getting attention weights
        weighted_img_attention = self.attention(img_feats, question_feats)

        # combine
        combined = torch.cat([weighted_img_attention, question_feats], dim = 1)

        # fc
        fc_1 = self.dropout(F.gelu(self.fc1(combined)))

        # output
        output = F.softmax(self.fc2(fc_1), dim=1)

        return output

    def init_weights(self):
        # kaiming_uniform
        init = torch.nn.init.kaiming_uniform_
        for layer in self.lstm.all_weights:
            for hidden in layer[:2]:
                init(hidden)

        init(self.fc_embed.weight)
        init(self.attention.img_conv.weight)
        init(self.attention.glimpse_conv.weight)
        init(self.fc_cell.weight)
        init(self.fc1.weight)
        init(self.fc2.weight)

### **Prepare for training**

In [None]:
num_workers = 2
batch_size = 512
epochs = 15

# Data Loading
train_list, val_list = load_split_lists(full_path)

train_img_ids, train_features_np = load_features_by_split("train2014", full_path)
val_img_ids, val_features_np = load_features_by_split("val2014", full_path)

train_features = torch.tensor(train_features_np, dtype=torch.float32)
val_features = torch.tensor(val_features_np, dtype=torch.float32)

actual_img_dim = train_features.shape[1]
print(f"Detected image feature dimension (img_dim): {actual_img_dim}")

# create vocabs
question_vocab = create_question_vocab(train_list)
answer_vocab = create_answer_vocab(train_list, top=1000) # tok 1000 answers
output_vocab_size = len(answer_vocab)

# Initialize datasets
train_dataset = VQADataset(json_list=train_list,
                             question_vocab=question_vocab, # question_vocab
                             answer_vocab=answer_vocab,
                             split='train',
                             train_img_feats=(train_img_ids, train_features))

val_dataset = VQADataset(json_list=val_list,
                            question_vocab=question_vocab, # question_vocab
                            answer_vocab=answer_vocab,
                            split='val',
                            val_img_feats=(val_img_ids, val_features))

# Clear lists and numpy arrays to free up memory after dataset creation
del train_list
del val_list
del train_features_np
del val_features_np
del train_features
del val_features
print("Data loaded and datasets initialized. Memory for raw data cleared.")

# Initialize DataLoaders
train_dataloader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers,
                                  pin_memory=True)

val_dataloader = DataLoader(val_dataset,
                            batch_size=batch_size,
                            shuffle=False,
                            num_workers=num_workers,
                            pin_memory=True)

print("DataLoaders Initialized")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Model Initialization
model = LSTM_Att_bert(img_dim=actual_img_dim, output_dim=output_vocab_size, dropout_p=0.5)
model.to(device)
print(f"Model initialized and moved to {device}.")
print(f"Model expected output size: {model.fc2.out_features}")

# Optimizer and Learning Rate Scheduler
optimizer = torch.optim.Adamax(model.parameters(), lr=0.001)
onecycle_scheduler = lr_scheduler.OneCycleLR(optimizer,
                                             steps_per_epoch=len(train_dataloader),
                                             max_lr=0.005,
                                             epochs=epochs)


### **Training**

In [None]:
unk_answer_idx = answer_vocab['<UNK>']
criterion = torch.nn.NLLLoss(ignore_index=unk_answer_idx).to(device)

min_loss = float('inf')

train_losses_lstm_att = []
train_accs_lstm_att = []
val_losses_lstm_att = []
val_accs_lstm_att = []

save_dir = "/content/drive/MyDrive/vqa_models"
best_model_path = os.path.join(save_dir, "best_model_lstm_attention.pth")

print(f"\n--- Starting Training Loop for LSTM with Attention ---")
print(f"Saving best model to: {best_model_path}")
cell_start = dt.now()

for epoch in range(epochs):
    start = dt.now()
    train_loss = 0.0
    train_correct = 0
    model.train()

    for batch_idx, data in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1} Training")):
        images = data['image'].to(device)
        questions = data['question'].to(device)
        answers = data['answer']
        multi_answers = data['multi_answer']

        optimizer.zero_grad()
        output = model(images, questions)
        predictions = torch.argmax(output.cpu(), dim=1)

        indices_incorrect = torch.where(~(answers == predictions))[0]
        for idx in indices_incorrect:
            if predictions[idx] in multi_answers[idx]:
                answers[idx] = multi_answers[idx][torch.where(multi_answers[idx] == predictions[idx])[0].item()]

        answers_gpu = answers.to(device)
        loss = criterion(torch.log(output + 1e-7), answers_gpu)

        loss.backward()
        optimizer.step()
        onecycle_scheduler.step()

        predictions[predictions == unk_answer_idx] = -1
        correct = (predictions == answers).sum().item()

        train_correct += correct
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_dataloader)
    avg_train_acc = train_correct / len(train_dataloader.dataset)
    train_losses_lstm_att.append(avg_train_loss)
    train_accs_lstm_att.append(avg_train_acc)

    model.eval()
    val_loss = 0.0
    val_correct = 0

    # lists cm
    all_true = []
    all_pred = []

    with torch.no_grad():
        for batch_idx, data in enumerate(tqdm(val_dataloader, desc=f"Epoch {epoch+1} Validation")):
            images = data['image'].to(device)
            questions = data['question'].to(device)
            answers = data['answer']
            multi_answers = data['multi_answer']

            output = model(images, questions)
            vloss = criterion(torch.log(output + 1e-7), answers.to(device))
            val_loss += vloss.item()

            predictions = torch.argmax(output.cpu(), dim=1)

            indices_incorrect = torch.where(~(answers == predictions))[0]
            for idx in indices_incorrect:
                if predictions[idx] in multi_answers[idx]:
                    answers[idx] = multi_answers[idx][torch.where(multi_answers[idx] == predictions[idx])[0].item()]

            predictions[predictions == unk_answer_idx] = -1
            correct = (predictions == answers).sum().item()
            val_correct += correct

            # collect labels for confusion matrix
            all_true.extend(answers.tolist())
            all_pred.extend(predictions.tolist())

    avg_val_loss = val_loss / len(val_dataloader)
    avg_val_acc = val_correct / len(val_dataloader.dataset)
    val_losses_lstm_att.append(avg_val_loss)
    val_accs_lstm_att.append(avg_val_acc)

    if avg_val_loss < min_loss:
        print(f"Validation loss improved from {min_loss:.5f} to {avg_val_loss:.5f}. Saving model...")
        min_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)

    epoch_duration = dt.now() - start
    print(f'\nEpoch {epoch+1} Summary => '
          f'Train Loss: {avg_train_loss:.5f} | Train Acc: {avg_train_acc:.4f} | '
          f'Val Loss: {avg_val_loss:.5f} | Val Acc: {avg_val_acc:.4f} [{epoch_duration}]')

print("\n")
print(f"Training finished. Best model saved at: {best_model_path}")

### **Plot**

In [None]:
min_len = min(len(train_losses_lstm_att), len(val_losses_lstm_att),
              len(train_accs_lstm_att), len(val_accs_lstm_att))

epochs = range(1, min_len + 1)

plt.figure(figsize=(14, 5))

# Plot Loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses_lstm_att[:min_len], 'b-', label='Training Loss')
plt.plot(epochs, val_losses_lstm_att[:min_len], 'r-', label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

# Plot Accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accs_lstm_att[:min_len], 'b-', label='Training Accuracy')
plt.plot(epochs, val_accs_lstm_att[:min_len], 'r-', label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# reverse mapping
idx2word = {v: k for k, v in answer_vocab.items()}

all_true = np.array(all_true)
all_pred = np.array(all_pred)

# filter
mask = (all_pred != -1)
all_true_filtered = all_true[mask]
all_pred_filtered = all_pred[mask]

# find top 10
true_label_counts = Counter(all_true_filtered)
top_label_indices = [label for label, _ in true_label_counts.most_common(10)]

# cm
cm = confusion_matrix(all_true_filtered, all_pred_filtered, labels=top_label_indices)

# convert to strings
def idx_to_word(idx):
    return idx2word.get(idx, str(idx))

disp_labels = [idx_to_word(idx) for idx in top_label_indices]

plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=disp_labels, yticklabels=disp_labels, cmap='Blues')
plt.xlabel('Predicted Answers')
plt.ylabel('True Answers')
plt.title('Confusion Matrix (Top 10 Answers)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

## **1-D CNN**

In [None]:
# CNN1D model
class CNN1D(nn.Module):
    def __init__(self, vocab_size, img_dim, embedding_dim=300, num_filters=512, kernel_sizes=[3, 4, 5], output_dim=1000, dropout_p=0.5):
        super(CNN1D, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=question_vocab['<PAD>'])

        # nn module
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embedding_dim,
                      out_channels=num_filters,
                      kernel_size=k) for k in kernel_sizes
        ])

        # calculate the total output dimension
        self.cnn_output_dim = num_filters * len(kernel_sizes)

        # layers for question embeddings
        self.fc_question = nn.Linear(self.cnn_output_dim, 1024)

        # layers for image embedding
        self.fc_image = nn.Linear(img_dim, 1024)

        # fusion and classification layers
        self.fc1 = nn.Linear(1024, 1000)
        self.fc2 = nn.Linear(1000, output_dim)
        self.dropout = nn.Dropout(dropout_p)

        self.init_weights()

    def forward(self, img_feat_input, text_input):
        # embedded input
        embedded = self.embedding(text_input)

        # chage positions
        embedded = embedded.permute(0, 2, 1)

        # apply 1D CNNs and max pooling
        cnn_outputs = [F.relu(conv(embedded)) for conv in self.convs]
        pooled_outputs = [F.max_pool1d(conv_out, conv_out.size(2)).squeeze(2) for conv_out in cnn_outputs]

        # concatenate outputs
        question_features = torch.cat(pooled_outputs, 1)

        # process question features
        fc_question = self.dropout(F.gelu(self.fc_question(question_features)))

        # process image embedding
        fc_image = F.gelu(self.fc_image(F.normalize(img_feat_input, dim=1, p=2)))

        # point-wise multiplication
        pw_mul = fc_image * fc_question

        # classification layers
        fc_1 = self.dropout(F.gelu(self.fc1(self.dropout(pw_mul))))
        output = F.softmax(self.fc2(fc_1), dim=1)

        return output

    def init_weights(self):
      # Kaiming uniform initialization
      init = nn.init.kaiming_uniform_

      # initialize uniformly
      nn.init.uniform_(self.embedding.weight, -0.1, 0.1)

      # Initialize conv1d weights and biases
      for conv in self.convs:
          init(conv.weight)
          if conv.bias is not None:
              nn.init.zeros_(conv.bias)

      # Initialize fully connected layers weights and biases
      fc_layers = [self.fc_question, self.fc_image, self.fc1, self.fc2]
      for layer in fc_layers:
          init(layer.weight)
          if layer.bias is not None:
              nn.init.zeros_(layer.bias)

### **Prepare for training**

In [None]:
num_workers = 2
batch_size = 512
epochs = 15
max_question_length = 25 # 25 words for a question

# Data Loading
train_list, val_list = load_split_lists(full_path)

train_img_ids, train_features_np = load_features_by_split("train2014", full_path)
val_img_ids, val_features_np = load_features_by_split("val2014", full_path)

train_features = torch.tensor(train_features_np, dtype=torch.float32)
val_features = torch.tensor(val_features_np, dtype=torch.float32)

actual_img_dim = train_features.shape[1]
print(f"Detected image feature dimension (img_dim): {actual_img_dim}")

# Create Vocabularies
question_vocab = create_question_vocab(train_list)
answer_vocab = create_answer_vocab(train_list)

model_output_size = len(answer_vocab)
print(f"Custom question vocabulary size: {len(question_vocab)}")
print(f"Answer vocabulary size: {len(answer_vocab)}")
print(f"Model's final output layer size: {model_output_size}")

# Initialize datasets
train_dataset = VQADataset(json_list=train_list,
                             question_vocab=question_vocab,
                             answer_vocab=answer_vocab,
                             split='train',
                             train_img_feats=(train_img_ids, train_features),
                             max_question_length=max_question_length)

val_dataset = VQADataset(json_list=val_list,
                            question_vocab=question_vocab,
                            answer_vocab=answer_vocab,
                            split='val',
                            val_img_feats=(val_img_ids, val_features),
                            max_question_length=max_question_length)

# Clear lists and numpy arrays to free up memory after dataset creation
del train_list
del val_list
del train_features_np
del val_features_np
del train_features
del val_features

# Initialize DataLoaders
train_dataloader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers,
                              pin_memory=True)

val_dataloader = DataLoader(val_dataset,
                            batch_size=batch_size,
                            shuffle=False,
                            num_workers=num_workers,
                            pin_memory=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Model Initialization
model = CNN1D(vocab_size=len(question_vocab),
                       img_dim=actual_img_dim,
                       embedding_dim=300, # the dimention of the embeddings 300 suitable for resnet
                       num_filters=512,
                       kernel_sizes=[3, 4, 5],
                       output_dim=model_output_size,
                       dropout_p=0.5)
model.to(device)
print(f"Model initialized and moved to {device}.")
print(f"Model expected output size: {model.fc2.out_features}")

# Optimizer and Learning Rate Scheduler
optimizer = torch.optim.Adamax(model.parameters(), lr=0.001)
onecycle_scheduler = lr_scheduler.OneCycleLR(optimizer,
                                             steps_per_epoch=len(train_dataloader),
                                             max_lr=0.005,
                                             epochs=epochs)

# Negative Log Likelihood Loss
unk_answer_idx = answer_vocab['<UNK>']
criterion = torch.nn.NLLLoss(ignore_index=unk_answer_idx).to(device)

### **Training**

In [None]:
train_losses_cnn = []
train_accs_cnn = []
val_losses_cnn = []
val_accs_cnn = []

min_loss = float('inf')  # track best validation loss


save_dir = "/content/drive/MyDrive/vqa_models"
best_model_path = os.path.join(save_dir, "best_model_1dCnn.pth")

print("\n--- Starting Training Loop ---")
cell_start = dt.now()  # Total time counter

for epoch in range(epochs):
    start = dt.now()  # Epoch timer
    train_loss = 0.0
    train_correct = 0
    model.train()

    for batch_idx, data in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1} Training")):
        images = data['image'].to(device)
        questions = data['question'].to(device)
        answers = data['answer']
        multi_answers = data['multi_answer']

        optimizer.zero_grad()
        output = model(images, questions)
        predictions = torch.argmax(output.cpu(), dim=1)

        # handle alternative answers
        indices_incorrect = torch.where(~(answers == predictions))[0]
        for idx in indices_incorrect:
            if predictions[idx] in multi_answers[idx]:
                match_idx = torch.where(multi_answers[idx] == predictions[idx])[0].item()
                answers[idx] = multi_answers[idx][match_idx]

        answers_gpu = answers.to(device)
        loss = criterion(torch.log(output + 1e-7), answers_gpu)

        loss.backward()
        optimizer.step()
        onecycle_scheduler.step()

        predictions_for_acc = predictions.clone()
        predictions_for_acc[predictions_for_acc == unk_answer_idx] = -1
        correct = (predictions_for_acc == answers).sum().item()

        train_correct += correct
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_dataloader)
    avg_train_acc = train_correct / len(train_dataloader.dataset)
    train_losses_cnn.append(avg_train_loss)
    train_accs_cnn.append(avg_train_acc)

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0

    # cm lists
    all_true = []
    all_pred = []

    with torch.no_grad():
        for batch_idx, data in enumerate(tqdm(val_dataloader, desc=f"Epoch {epoch+1} Validation")):
            images = data['image'].to(device)
            questions = data['question'].to(device)
            answers = data['answer']
            multi_answers = data['multi_answer']

            output = model(images, questions)
            vloss = criterion(torch.log(output + 1e-7), answers.to(device))
            val_loss += vloss.item()

            predictions = torch.argmax(output.cpu(), 1)

            indices_incorrect = torch.where(~(answers == predictions))[0]
            for idx in indices_incorrect:
                if predictions[idx] in multi_answers[idx]:
                    match_idx = torch.where(multi_answers[idx] == predictions[idx])[0].item()
                    answers[idx] = multi_answers[idx][match_idx]

            predictions_for_acc = predictions.clone()
            predictions_for_acc[predictions_for_acc == unk_answer_idx] = -1
            correct = (predictions_for_acc == answers).sum().item()
            val_correct += correct

            # collect labels for confusion matrix
            all_true.extend(answers.tolist())
            all_pred.extend(predictions.tolist())

    avg_val_loss = val_loss / len(val_dataloader)
    avg_val_acc = val_correct / len(val_dataloader.dataset)
    val_losses_cnn.append(avg_val_loss)
    val_accs_cnn.append(avg_val_acc)

    epoch_duration = dt.now() - start
    print(f'\nEpoch {epoch+1} Summary => '
          f'Train Loss: {avg_train_loss:.5f} | Train Acc: {avg_train_acc:.4f} | '
          f'Val Loss: {avg_val_loss:.5f} | Val Acc: {avg_val_acc:.4f} [{epoch_duration}]')

    # Save best model  based on validation loss
    if avg_val_loss < min_loss:
        print(f"Validation loss improved ({min_loss:.5f} -> {avg_val_loss:.5f}). Saving model...")
        min_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)

total_duration = dt.now() - cell_start
print(f"\n--- Training Finished! Total time: {total_duration} ---")

### **Plot**

In [None]:
min_len = min(len(train_losses_cnn), len(val_losses_cnn),
              len(train_accs_cnn), len(val_accs_cnn))

epochs = range(1, min_len + 1)

# Plot Loss
plt.figure(figsize=(7, 5))
plt.plot(epochs, train_losses_cnn[:min_len], 'b-', label='Training Loss')
plt.plot(epochs, val_losses_cnn[:min_len], 'r-', label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('cnn_loss_plot.png')  # Save as image
plt.show()

# Plot Accuracy
plt.figure(figsize=(7, 5))
plt.plot(epochs, train_accs_cnn[:min_len], 'b-', label='Training Accuracy')
plt.plot(epochs, val_accs_cnn[:min_len], 'r-', label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('cnn_accuracy_plot.png')  # Save as image
plt.show()

In [None]:
# reverse mapping
idx2word = {v: k for k, v in answer_vocab.items()}

all_true = np.array(all_true)
all_pred = np.array(all_pred)

# filter
mask = (all_pred != -1)
all_true_filtered = all_true[mask]
all_pred_filtered = all_pred[mask]

# find top 10
true_label_counts = Counter(all_true_filtered)
top_label_indices = [label for label, _ in true_label_counts.most_common(10)]

# cm
cm = confusion_matrix(all_true_filtered, all_pred_filtered, labels=top_label_indices)

# convert to strings
def idx_to_word(idx):
    return idx2word.get(idx, str(idx))

disp_labels = [idx_to_word(idx) for idx in top_label_indices]

plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=disp_labels, yticklabels=disp_labels, cmap='Blues')
plt.xlabel('Predicted Answers')
plt.ylabel('True Answers')
plt.title('Confusion Matrix (Top 10 Answers)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('cnn_confusion_matrix.png')
plt.show()

## **RNN**

In [None]:
# RNN model
class RNN(nn.Module):
    def __init__(self, vocab_size, img_dim, embedding_dim=300, rnn_hidden_size=512, num_rnn_layers=2, output_dim=1000, dropout_p=0.5):
        super(RNN, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=question_vocab['<PAD>'])
        self.fc_embed = nn.Linear(embedding_dim, embedding_dim)

        self.rnn = nn.RNN(input_size=embedding_dim,
                          hidden_size=rnn_hidden_size,
                          num_layers=num_rnn_layers,
                          batch_first=True,
                          dropout=dropout_p if num_rnn_layers > 1 else 0) # Dropout only if > 1 layer

        # The concatenated RNN
        self.fc_rnn = nn.Linear(num_rnn_layers * rnn_hidden_size, 1024)

        # Layers for image embedding
        self.fc_image = nn.Linear(img_dim, 1024)

        # Fusion and classification layers
        self.fc1 = nn.Linear(1024, 1000)
        self.fc2 = nn.Linear(1000, output_dim)
        self.dropout = nn.Dropout(dropout_p)

        self.init_weights()

    def forward(self, img_feat_input, text_input):
        # Text Embedding
        embedded = F.gelu(self.fc_embed(self.embedding(text_input)))

        # apply RNN
        _, h_n = self.rnn(embedded)

        # concatenate hidden states from all layers for question representation
        rnn_concat = h_n.permute(1, 0, 2).reshape(h_n.size(1), -1)

        # process question features
        fc_question = self.dropout(F.gelu(self.fc_rnn(rnn_concat)))

        # process image embedding
        fc_image = F.gelu(self.fc_image(F.normalize(img_feat_input, dim=1, p=2)))

        # point-wise multiplication
        pw_mul = fc_image * fc_question

        # classification layers
        fc_1 = self.dropout(F.gelu(self.fc1(self.dropout(pw_mul))))
        output = F.softmax(self.fc2(fc_1), dim=1)

        return output

    def init_weights(self):
            init = nn.init.kaiming_uniform_

            # Initialize embedding and linear layers
            init(self.embedding.weight)
            init(self.fc_embed.weight)
            init(self.fc_rnn.weight)
            init(self.fc_image.weight)
            init(self.fc1.weight)
            init(self.fc2.weight)

            # Initialize RNN weights
            for name, param in self.rnn.named_parameters():
                if 'weight' in name:
                    init(param)
                elif 'bias' in name:
                    nn.init.constant_(param, 0)

### **Prepare for training**

In [None]:
num_workers = 2
batch_size = 512
epochs = 15
max_question_length = 25

# Data Loading
train_list, val_list = load_split_lists(full_path)

train_img_ids, train_features_np = load_features_by_split("train2014", full_path)
val_img_ids, val_features_np = load_features_by_split("val2014", full_path)

train_features = torch.tensor(train_features_np, dtype=torch.float32)
val_features = torch.tensor(val_features_np, dtype=torch.float32)

actual_img_dim = train_features.shape[1]
print(f"Detected image feature dimension (img_dim): {actual_img_dim}")

# Create Vocabularies
question_vocab = create_question_vocab(train_list)
answer_vocab = create_answer_vocab(train_list)

model_output_size = len(answer_vocab)
print(f"Custom question vocabulary size: {len(question_vocab)}")
print(f"Answer vocabulary size: {len(answer_vocab)}")
print(f"Model's final output layer size: {model_output_size}")

# Initialize datasets
train_dataset = VQADataset(json_list=train_list,
                             question_vocab=question_vocab,
                             answer_vocab=answer_vocab,
                             split='train',
                             train_img_feats=(train_img_ids, train_features),
                             max_question_length=max_question_length)

val_dataset = VQADataset(json_list=val_list,
                            question_vocab=question_vocab,
                            answer_vocab=answer_vocab,
                            split='val',
                            val_img_feats=(val_img_ids, val_features),
                            max_question_length=max_question_length)

# Clear lists and numpy arrays to free up memory after dataset creation
del train_list
del val_list
del train_features_np
del val_features_np
del train_features
del val_features
print("Data loaded and datasets initialized. Memory for raw data cleared.")

# Initialize DataLoaders
train_dataloader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers,
                              pin_memory=True)

val_dataloader = DataLoader(val_dataset,
                            batch_size=batch_size,
                            shuffle=False,
                            num_workers=num_workers,
                            pin_memory=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Model Initialisation
model = RNN(vocab_size=len(question_vocab),
                     img_dim=actual_img_dim,
                     embedding_dim=300, # suitable for resnet
                     rnn_hidden_size=512, # Hidden size per RNN layer
                     num_rnn_layers=2, # Number of stacked RNN layers
                     output_dim=model_output_size,
                     dropout_p=0.5)
model.to(device)
print(f"Model initialized and moved to {device}.")
print(f"Model expected output size: {model.fc2.out_features}")

# Optimizer and Learning Rate Scheduler
optimizer = torch.optim.Adamax(model.parameters(), lr=0.001)
onecycle_scheduler = lr_scheduler.OneCycleLR(optimizer,
                                             steps_per_epoch=len(train_dataloader),
                                             max_lr=0.005,
                                             epochs=epochs)

# Negative Log Likelihood Loss
unk_answer_idx = answer_vocab['<UNK>']
criterion = torch.nn.NLLLoss(ignore_index=unk_answer_idx).to(device)


### **Training**

In [None]:
train_losses_rnn = []
train_accs_rnn = []
val_losses_rnn = []
val_accs_rnn = []

min_loss = float('inf')  # track best validation loss


save_dir = "/content/drive/MyDrive/vqa_models"
best_model_path = os.path.join(save_dir, "best_model_RNN.pth")

cell_start = dt.now()  # time counter

for epoch in range(epochs):
    start = dt.now()
    train_loss = 0.0
    train_correct = 0
    model.train()

    for batch_idx, data in enumerate(tqdm(train_dataloader, desc=f"Epoch {epoch+1} Training")):
        images = data['image'].to(device)
        questions = data['question'].to(device)
        answers = data['answer']
        multi_answers = data['multi_answer']

        optimizer.zero_grad()
        output = model(images, questions)
        predictions = torch.argmax(output.cpu(), dim=1)

        # handle alternative answers
        indices_incorrect = torch.where(~(answers == predictions))[0]
        for idx in indices_incorrect:
            if predictions[idx] in multi_answers[idx]:
                match_idx = torch.where(multi_answers[idx] == predictions[idx])[0].item()
                answers[idx] = multi_answers[idx][match_idx]

        answers_gpu = answers.to(device)
        loss = criterion(torch.log(output + 1e-7), answers_gpu)

        loss.backward()
        optimizer.step()
        onecycle_scheduler.step()

        predictions_for_acc = predictions.clone()
        predictions_for_acc[predictions_for_acc == unk_answer_idx] = -1
        correct = (predictions_for_acc == answers).sum().item()

        train_correct += correct
        train_loss += loss.item()

    avg_train_loss = train_loss / len(train_dataloader)
    avg_train_acc = train_correct / len(train_dataloader.dataset)

    train_losses_rnn.append(avg_train_loss)
    train_accs_rnn.append(avg_train_acc)

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0

    # cm lists
    all_true = []
    all_pred = []

    with torch.no_grad():
        for batch_idx, data in enumerate(tqdm(val_dataloader, desc=f"Epoch {epoch+1} Validation")):
            images = data['image'].to(device)
            questions = data['question'].to(device)
            answers = data['answer']
            multi_answers = data['multi_answer']

            output = model(images, questions)
            vloss = criterion(torch.log(output + 1e-7), answers.to(device))
            val_loss += vloss.item()

            predictions = torch.argmax(output.cpu(), 1)

            indices_incorrect = torch.where(~(answers == predictions))[0]
            for idx in indices_incorrect:
                if predictions[idx] in multi_answers[idx]:
                    match_idx = torch.where(multi_answers[idx] == predictions[idx])[0].item()
                    answers[idx] = multi_answers[idx][match_idx]

            predictions_for_acc = predictions.clone()
            predictions_for_acc[predictions_for_acc == unk_answer_idx] = -1
            correct = (predictions_for_acc == answers).sum().item()
            val_correct += correct

            # collect labels for confusion matrix
            all_true.extend(answers.tolist())
            all_pred.extend(predictions.tolist())

    avg_val_loss = val_loss / len(val_dataloader)
    avg_val_acc = val_correct / len(val_dataloader.dataset)

    val_losses_rnn.append(avg_val_loss)
    val_accs_rnn.append(avg_val_acc)

    epoch_duration = dt.now() - start
    print(f'\nEpoch {epoch+1} Summary => '
          f'Train Loss: {avg_train_loss:.5f} | Train Acc: {avg_train_acc:.4f} | '
          f'Val Loss: {avg_val_loss:.5f} | Val Acc: {avg_val_acc:.4f} [{epoch_duration}]')

    # save best model
    if avg_val_loss < min_loss:
        print(f"Validation loss improved ({min_loss:.5f} -> {avg_val_loss:.5f}). Saving model...")
        min_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)

total_duration = dt.now() - cell_start
print(f"\n--- Training Finished! Total time: {total_duration} ---")

### **Plot**

In [None]:
min_len = min(len(train_losses_rnn), len(val_losses_rnn),
              len(train_accs_rnn), len(val_accs_rnn))
epochs = range(1, min_len + 1)

# Loss Plot
plt.figure(figsize=(7, 5))
plt.plot(epochs, train_losses_rnn[:min_len], 'b-', label='Training Loss')
plt.plot(epochs, val_losses_rnn[:min_len], 'r-', label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('rnn_loss_plot.png')
plt.show()

# Accuracy Plot
plt.figure(figsize=(7, 5))
plt.plot(epochs, train_accs_rnn[:min_len], 'b-', label='Training Accuracy')
plt.plot(epochs, val_accs_rnn[:min_len], 'r-', label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('rnn_accuracy_plot.png')
plt.show()

In [None]:
# reverse mapping
idx2word = {v: k for k, v in answer_vocab.items()}

all_true = np.array(all_true)
all_pred = np.array(all_pred)

# filter
mask = (all_pred != -1)
all_true_filtered = all_true[mask]
all_pred_filtered = all_pred[mask]

# find top 10
true_label_counts = Counter(all_true_filtered)
top_label_indices = [label for label, _ in true_label_counts.most_common(10)]

# cm
cm = confusion_matrix(all_true_filtered, all_pred_filtered, labels=top_label_indices)

# convert to strings
def idx_to_word(idx):
    return idx2word.get(idx, str(idx))

disp_labels = [idx_to_word(idx) for idx in top_label_indices]

plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=disp_labels, yticklabels=disp_labels, cmap='Blues')
plt.xlabel('Predicted Answers')
plt.ylabel('True Answers')
plt.title('Confusion Matrix (Top 10 Answers)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('rnn_confusion_matrix.png')
plt.show()

## **TEST MODELS**

In [None]:
!wget -nc http://images.cocodataset.org/zips/val2014.zip
!unzip -q val2014.zip
!rm *.zip

### **Implement trained ResNet model structure**

In [None]:
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

class SimCLR(nn.Module):
    def __init__(self, linear_eval=False, unfreeze_layer4=True):
        super(SimCLR, self).__init__()
        self.linear_eval = linear_eval

        weights = ResNet50_Weights.DEFAULT
        resnet = resnet50(weights=weights)
        resnet.fc = Identity()

        # freeze all layers
        for param in resnet.parameters():
            param.requires_grad = False

        # unfreeze layer4
        if unfreeze_layer4:
            for name, param in resnet.named_parameters():
                if "layer4" in name:
                    param.requires_grad = True

        self.encoder = resnet

        self.projection = nn.Sequential(
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 256)
        )

    def forward(self, x):
        if not self.linear_eval:
            x = torch.cat(x, dim=0)
        features = self.encoder(x)
        projection = self.projection(features)
        return projection

### **LSTM**

In [None]:
full_path = '/content/drive/MyDrive/vqa_models/best_model_lstm.pth'
simclr_path = '/content/drive/MyDrive/ResNet50/ResNet50_def.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Vocab and Mapping
idx_to_answer = {v: k for k, v in answer_vocab.items()}
question_vocab['<PAD>'] = question_vocab.get('<PAD>', 0)

# Load VQA Model
img_dim = 256
vocab_size = len(question_vocab)

model = LSTM(vocab_size=vocab_size, img_dim=img_dim, output_dim=1000).to(device)
model.load_state_dict(torch.load(full_path, map_location=device))
model.eval()

# Load SimCLR Model
simclr_model = SimCLR(linear_eval=True).to(device)  # linear_eval=True to avoid augmentation stacking
simclr_model.load_state_dict(torch.load(simclr_path, map_location=device))
simclr_model.eval()


# Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Extract Features
def extract_image_features(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        # Pass tensor directly
        features = simclr_model(image_tensor)
    return features.squeeze(0)

# Encode Question
def encode_single_question(question, question_vocab, max_length=25):
    tokens = question.lower().strip().split()
    token_ids = [question_vocab.get(t, question_vocab.get('<UNK>', 0)) for t in tokens]
    token_ids = token_ids[:max_length]
    token_ids += [question_vocab['<PAD>']] * (max_length - len(token_ids))
    return torch.tensor(token_ids).unsqueeze(0).to(device)

# Prediction
def predict_topk(image_path, question_text, k=5):
    image_tensor = extract_image_features(image_path).unsqueeze(0)
    question_tensor = encode_single_question(question_text, question_vocab)

    with torch.no_grad():
        output = model(image_tensor, question_tensor)
        probs = F.softmax(output, dim=1).squeeze(0)

        topk_probs, topk_indices = torch.topk(probs, k)
        topk_answers = [idx_to_answer.get(i.item(), "<UNK>") for i in topk_indices]

    print(f"\nQuestion: {question_text}\n")
    print("Top Predictions:")
    for i in range(k):
        print(f"{i+1}. {topk_answers[i]} ({topk_probs[i].item()*100:.2f}%)")

    img = Image.open(image_path)
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"Q: {question_text}")
    plt.show()

# Example Call
image_id = 107087
image_filename = f"COCO_val2014_{image_id:012d}.jpg"
image_path = f"/content/val2014/{image_filename}"
question = "Which animal is depicted in the figure?"

predict_topk(image_path, question, k=10)

### **LSTM-Attention**

In [None]:
# Paths
full_path = '/content/drive/MyDrive/vqa_models/best_model_lstm_attention.pth'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Vocab and Mapping
idx_to_answer = {v: k for k, v in answer_vocab.items()}
question_vocab['<PAD>'] = question_vocab.get('<PAD>', 0)

# Load VQA Model
img_dim = 256
vocab_size = len(question_vocab)

model = LSTM_Att_bert(img_dim=img_dim, output_dim=1000).to(device)
model.load_state_dict(torch.load(full_path, map_location=device))
model.eval()

# Load SimCLR Model
simclr_model = SimCLR(linear_eval=True).to(device)  # linear_eval=True to avoid augmentation stacking
simclr_model.load_state_dict(torch.load(simclr_path, map_location=device))
simclr_model.eval()

# Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Extract Features from Image
def extract_image_features(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        # Pass tensor directly, not wrapped in list
        features = simclr_model(image_tensor)
    return features.squeeze(0)

# Encode Question
def encode_single_question(question, question_vocab, max_length=25):
    tokens = question.lower().strip().split()
    token_ids = [question_vocab.get(t, question_vocab.get('<UNK>', 0)) for t in tokens]
    token_ids = token_ids[:max_length]
    token_ids += [question_vocab['<PAD>']] * (max_length - len(token_ids))
    return torch.tensor(token_ids).unsqueeze(0).to(device)

# Prediction
def predict_topk(image_path, question_text, k=5):
    image_tensor = extract_image_features(image_path).unsqueeze(0)
    question_tensor = encode_single_question(question_text, question_vocab)

    with torch.no_grad():
        output = model(image_tensor, question_tensor)
        probs = F.softmax(output, dim=1).squeeze(0)

        topk_probs, topk_indices = torch.topk(probs, k)
        topk_answers = [idx_to_answer.get(i.item(), "<UNK>") for i in topk_indices]

    print(f"\nQuestion: {question_text}\n")
    print("Top Predictions:")
    for i in range(k):
        print(f"{i+1}. {topk_answers[i]} ({topk_probs[i].item()*100:.2f}%)")

    img = Image.open(image_path)
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"Q: {question_text}")
    plt.show()

# Example Call
image_id = 107087
image_filename = f"COCO_val2014_{image_id:012d}.jpg"
image_path = f"/content/val2014/{image_filename}"
question = "Which animal is depicted in the figure?"

predict_topk(image_path, question, k=10)

### **1D-CNN**

In [None]:
full_path = '/content/drive/MyDrive/vqa_models/best_model_1dCnn.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Vocab and Mapping
idx_to_answer = {v: k for k, v in answer_vocab.items()}
question_vocab['<PAD>'] = question_vocab.get('<PAD>', 0)

# Load VQA Model
img_dim = 256
vocab_size = len(question_vocab)

model = CNN1D(vocab_size=vocab_size, img_dim=img_dim).to(device)
model.load_state_dict(torch.load(full_path, map_location=device))
model.eval()

# Load SimCLR Model
simclr_model = SimCLR(linear_eval=True).to(device)  # linear_eval=True to avoid augmentation stacking
simclr_model.load_state_dict(torch.load(simclr_path, map_location=device))
simclr_model.eval()

# Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

def extract_image_features(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        # Pass tensor directly, not wrapped in list
        features = simclr_model(image_tensor)
    return features.squeeze(0)

# Encode Question
def encode_single_question(question, question_vocab, max_length=25):
    tokens = question.lower().strip().split()
    token_ids = [question_vocab.get(t, question_vocab.get('<UNK>', 0)) for t in tokens]
    token_ids = token_ids[:max_length]
    token_ids += [question_vocab['<PAD>']] * (max_length - len(token_ids))
    return torch.tensor(token_ids).unsqueeze(0).to(device)

# Prediction
def predict_topk(image_path, question_text, k=5):
    image_tensor = extract_image_features(image_path).unsqueeze(0)
    question_tensor = encode_single_question(question_text, question_vocab)

    with torch.no_grad():
        output = model(image_tensor, question_tensor)
        probs = F.softmax(output, dim=1).squeeze(0)

        topk_probs, topk_indices = torch.topk(probs, k)
        topk_answers = [idx_to_answer.get(i.item(), "<UNK>") for i in topk_indices]

    print(f"\nQuestion: {question_text}\n")
    print("Top Predictions:")
    for i in range(k):
        print(f"{i+1}. {topk_answers[i]} ({topk_probs[i].item()*100:.2f}%)")

    img = Image.open(image_path)
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"Q: {question_text}")
    plt.show()

# Example
image_id = 107087
image_filename = f"COCO_val2014_{image_id:012d}.jpg"
image_path = f"/content/val2014/{image_filename}"
question = "Which animal is depicted in the figure?"

predict_topk(image_path, question, k=10)

### **RNN**

In [None]:
full_path = '/content/drive/MyDrive/vqa_models/best_model_RNN.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Vocab and Mapping
idx_to_answer = {v: k for k, v in answer_vocab.items()}
question_vocab['<PAD>'] = question_vocab.get('<PAD>', 0)

# Load VQA Model
img_dim = 256
vocab_size = len(question_vocab)

model = RNN(vocab_size=vocab_size, img_dim=img_dim).to(device)
model.load_state_dict(torch.load(full_path, map_location=device))
model.eval()

# Load SimCLR Model
simclr_model = SimCLR(linear_eval=True).to(device)  # linear_eval=True to avoid augmentation stacking
simclr_model.load_state_dict(torch.load(simclr_path, map_location=device))
simclr_model.eval()

# Image Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

def extract_image_features(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        # Pass tensor directly, not wrapped in list
        features = simclr_model(image_tensor)
    return features.squeeze(0)

# Encode Question
def encode_single_question(question, question_vocab, max_length=25):
    tokens = question.lower().strip().split()
    token_ids = [question_vocab.get(t, question_vocab.get('<UNK>', 0)) for t in tokens]
    token_ids = token_ids[:max_length]
    token_ids += [question_vocab['<PAD>']] * (max_length - len(token_ids))
    return torch.tensor(token_ids).unsqueeze(0).to(device)

# Prediction
def predict_topk(image_path, question_text, k=5):
    image_tensor = extract_image_features(image_path).unsqueeze(0)
    question_tensor = encode_single_question(question_text, question_vocab)

    with torch.no_grad():
        output = model(image_tensor, question_tensor)
        probs = F.softmax(output, dim=1).squeeze(0)

        topk_probs, topk_indices = torch.topk(probs, k)
        topk_answers = [idx_to_answer.get(i.item(), "<UNK>") for i in topk_indices]

    print(f"\nQuestion: {question_text}\n")
    print("Top Predictions:")
    for i in range(k):
        print(f"{i+1}. {topk_answers[i]} ({topk_probs[i].item()*100:.2f}%)")

    img = Image.open(image_path)
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"Q: {question_text}")
    plt.show()

# Example
image_id = 107087
image_filename = f"COCO_val2014_{image_id:012d}.jpg"
image_path = f"/content/val2014/{image_filename}"
question = "Which animal is depicted in the figure?"

predict_topk(image_path, question, k=10)

## **Compare Models**

In [None]:
# val_losses_lstm = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_losses_lstm.npy')
# val_losses_lstm_att = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_losses_lstm_att.npy')
# val_losses_cnn = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_losses_cnn.npy')
# val_losses_rnn = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_losses_rnn.npy')

# val_accs_rnn = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_accs_lstm.npy')
# val_accs_cnn = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_accs_cnn.npy')
# val_accs_lstm_att = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_accs_lstm_att.npy')
# val_accs_lstm = np.load('/content/drive/MyDrive/Colab Notebooks/DL-CV-project/ResNet50-ALL/val_accs_rnn.npy')

In [None]:
epochs = range(1, len(train_losses_lstm) + 1)

# Create DataFrame for Losses
df_losses = pd.DataFrame({
    'Epoch': epochs,
    'LSTM': val_losses_lstm,
    'LSTM_Attention': val_losses_lstm_att,
    '1D_CNN': val_losses_cnn,
    'RNN': val_losses_rnn
})

# Create DataFrame for Accuracies
df_accs = pd.DataFrame({
    'Epoch': epochs,
    'LSTM': val_accs_lstm,
    'LSTM_Attention': val_accs_lstm_att,
    '1D_CNN': val_accs_cnn,
    'RNN': val_accs_rnn
})

# Plot Losses
plt.figure(figsize=(12, 5))
for model in ['LSTM', 'LSTM_Attention', '1D_CNN', 'RNN']:
    plt.plot(df_losses['Epoch'], df_losses[model], label=model)

plt.title('Validation Loss Comparison')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('val_loss_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

# Plot Accuracies
plt.figure(figsize=(12, 5))
for model in ['LSTM', 'LSTM_Attention', '1D_CNN', 'RNN']:
    plt.plot(df_accs['Epoch'], df_accs[model], label=model)

plt.title('Validation Accuracy Comparison')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('val_accuracy_comparison.png', dpi=300, bbox_inches='tight')
plt.show()