# Import Library

In [None]:
import os
import re
import json
import random
from collections import defaultdict
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from sklearn.metrics import accuracy_score, f1_score

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms, models

# Preprocessing

## Image

In [None]:
def resize_images(input_dir, output_dir, size=(224, 224)):
    os.makedirs(output_dir, exist_ok=True)
    
    for img_file in os.listdir(input_dir):
        img_path = os.path.join(input_dir, img_file)
        save_path = os.path.join(output_dir, img_file)
        
        try:
            with Image.open(img_path) as img:
                img = img.resize(size, Image.LANCZOS) 
                img.save(save_path, img.format)
        except Exception as e:
            print(f"Wrong at {img_file}: {e}")

    print(f"Resized: {len(os.listdir(output_dir))} images.")

In [None]:

input_dir = "/kaggle/input/vqa-trainv2/data/places"
output_dir = "/kaggle/working/resized_images_train"
resize_images(input_dir, output_dir)

In [None]:

input_dir = "/kaggle/input/vqa-test/data/places"
output_dir = "/kaggle/working/resized_images_test"
resize_images(input_dir, output_dir)

## Text

### Create Vocab

In [None]:
def tokenizer(sentence):
    regex = re.compile(r'(\W+)')
    tokens = regex.split(sentence.lower())
    return [w.strip() for w in tokens if len(w.strip()) > 0]

In [None]:
def make_ans_vocab(annotation_file, save_path="/kaggle/working/answer_vocabs.txt"):
    with open(annotation_file, 'r') as f:
        annotations = json.load(f)['annotations']

    answers = set()
    for ann in annotations:
        for ans in ann['answers']:
            answers.add(ans['answer'])

    answers = sorted(answers) 
    answers.insert(0, '<unk>')  
    with open(save_path, 'w') as f:
        f.writelines([ans + '\n' for ans in answers])
    print(f" Generated answer vocab with {len(answers)} unique answers.")


In [None]:
annotation_file = "/kaggle/input/vqa-trainv2/data/annotations.json"
make_ans_vocab(annotation_file)


In [None]:
def make_q_vocab(question_file, save_path="/kaggle/working/question_vocabs.txt"):
    with open(question_file, 'r') as f:
        data = json.load(f)

    words = set()
    for q in data['questions']:
        words.update(tokenizer(q['question']))

    words = sorted(words)
    words.insert(0, '<pad>')
    words.insert(1, '<unk>')

    with open(save_path, 'w') as f:
        f.writelines([w + '\n' for w in words])

    print(f"Generated question vocab: {len(words)} words.")


In [None]:
question_file = "/kaggle/input/vqa-trainv2/data/questions.json"
make_q_vocab(question_file)

In [None]:
def load_vocab(vocab_file):
    with open(vocab_file, encoding="utf-8") as f:
        vocab = [line.strip() for line in f]
    return {word: idx for idx, word in enumerate(vocab)}


In [None]:
def find_image_file(image_id, image_dir):
    for filename in os.listdir(image_dir):
        if filename.startswith(f"id_{image_id}."):
            return os.path.join(image_dir, filename) 
    return None  

In [None]:
def process_data(question_file, annotation_file, image_dir, output_path, labeled=True):
    with open(question_file, 'r', encoding="utf-8") as f:
        questions = json.load(f)['questions']

    if labeled:
        with open(annotation_file, 'r', encoding="utf-8") as f:
            annotations = json.load(f)['annotations']
        q_dict = {ann['question_id']: ann for ann in annotations}

    vocab2idx = load_vocab("/kaggle/working/question_vocabs.txt")  
    ans_vocab2idx = load_vocab("/kaggle/working/answer_vocabs.txt")  

    dataset = []

    for q in questions:
        qu_id = q['question_id']
        img_id = q['image_id']
        qu_sentence = q['question']
        qu_tokens = tokenizer(qu_sentence)
        qu2idx = [vocab2idx.get(token, vocab2idx['<unk>']) for token in qu_tokens]  

        img_path = find_image_file(img_id, image_dir)

        info = {
            'img_name': os.path.basename(img_path),
            'img_path': img_path,
            'question': qu_sentence,
            'qu_tokens': qu2idx,
            'qu_id': qu_id
        }

        if labeled:
            annotation_ans = q_dict[qu_id]['answers']
            valid_ans = [ans['answer'] for ans in annotation_ans]
            ans_idx = ans_vocab2idx.get(valid_ans[0], ans_vocab2idx['<unk>'])  
            info['answer'] = valid_ans[0]
            info['ans_token'] = ans_idx  # Lưu dưới dạng số nguyên

        dataset.append(info)


    with open(output_path, 'w', encoding="utf-8") as f:
        json.dump(dataset, f, indent=4, ensure_ascii=False)  

    print(f"Saved JSON at {output_path} | Total samples: {len(dataset)}")


In [None]:
image_dir = "/kaggle/working/resized_images_train"
question_file = "/kaggle/input/vqa-trainv2/data/questions.json"
annotation_file = "/kaggle/input/vqa-trainv2/data/annotations.json"

process_data(question_file, annotation_file, image_dir, '/kaggle/working/train.json')

In [None]:
image_dir = "/kaggle/working/resized_images_test"
question_file = "/kaggle/input/vqa-test/data/questions.json"
annotation_file = "/kaggle/input/vqa-test/data/annotations.json"

process_data(question_file, annotation_file, image_dir, '/kaggle/working/test.json')

# Dataset


In [None]:
class VQADataset(Dataset):
    def __init__(self, json_path, transform=None, max_qu_len=20, typeData='train'):
        self.max_qu_len = max_qu_len
        self.transform = transform
        self.typeData = typeData
        
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        self.data = []
        if self.typeData == 'train':
            for i in range(0, len(data), 210):
                self.data.extend(data[i:i+200])  
        elif self.typeData == 'valid':
            for i in range(0, len(data), 210):
                self.data.extend(data[i+200:i+210])  
        elif self.typeData == 'test':
            self.data = data  
        
        self.transform = transform if transform else transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sample = self.data[idx]
        img_path = sample["img_path"]
        question_tokens = torch.tensor(sample["qu_tokens"], dtype=torch.long)
        answer_token = torch.tensor(sample["ans_token"], dtype=torch.long)

        if len(question_tokens) > self.max_qu_len:
            question_tokens = question_tokens[:self.max_qu_len]
        else:
            padding = torch.zeros(self.max_qu_len - len(question_tokens), dtype=torch.long)
            question_tokens = torch.cat([question_tokens, padding])
        
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        
        return image, question_tokens, answer_token


# Model

In [None]:
class ImgEncoder(nn.Module):
    def __init__(self, model, in_features, feature_size, is_train=False):
        super(ImgEncoder, self).__init__()
        self.model = model
        self.fc = nn.Linear(in_features, feature_size)
        self.is_train = is_train

    def forward(self, image):
        if self.is_train:
            img_feature = self.model(image)
        else:
            with torch.no_grad():
                img_feature = self.model(image)

        img_feature = self.fc(img_feature)
        l2_norm = F.normalize(img_feature, p=2, dim=1)
        return l2_norm

In [None]:

class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attention = nn.Linear(hidden_size, 1)  

    def forward(self, lstm_out):
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)  
        context_vector = torch.sum(attn_weights * lstm_out, dim=1)  
        return context_vector


class QuEncoder(nn.Module):
    def __init__(self, qu_vocab_size, word_embed, hidden_size, num_hidden, qu_feature_size, with_att=False):
        super(QuEncoder, self).__init__()
        self.with_att = with_att
        self.word_embedding = nn.Embedding(qu_vocab_size, word_embed)
        self.tanh = nn.Tanh()
        self.lstm = nn.LSTM(word_embed, hidden_size, num_hidden, batch_first=True)

        if self.with_att:
            self.attention = Attention(hidden_size)
            self.fc = nn.Linear(hidden_size, qu_feature_size)
        else:
            self.fc = nn.Linear(2 * num_hidden * hidden_size, qu_feature_size)

    def forward(self, question):
        qu_embedding = self.word_embedding(question)
        qu_embedding = self.tanh(qu_embedding)
        lstm_out, (hidden, cell) = self.lstm(qu_embedding)

        if self.with_att:
            attn_output = self.attention(lstm_out)
        else:
            qu_feature = torch.cat((hidden, cell), dim=2)
            qu_feature = qu_feature.transpose(0, 1).reshape(qu_feature.size(1), -1)
            attn_output = self.tanh(qu_feature)

        qu_feature = self.fc(attn_output)
        return qu_feature

In [None]:

class VQAModel(nn.Module):
    def __init__(self, img_model, img_in_features, qu_vocab_size, ans_vocab_size, word_embed, hidden_size, num_hidden, feature_size, with_att=False, is_train_image=False):
        super(VQAModel, self).__init__()


        self.img_encoder = ImgEncoder(img_model, img_in_features, feature_size, is_train_image)
        self.qu_encoder = QuEncoder(qu_vocab_size, word_embed, hidden_size, num_hidden, feature_size, with_att)

        self.dropout = nn.Dropout(0.5)
        self.tanh = nn.Tanh()

        self.fc1 = nn.Linear(feature_size * 2, 512)
        self.fc2 = nn.Linear(512, ans_vocab_size)  

    def forward(self, image, question):

        img_feature = self.img_encoder(image)  
        qst_feature = self.qu_encoder(question)  
        
        combined_feature = torch.cat([img_feature, qst_feature], dim=1)  


        combined_feature = self.dropout(combined_feature)
        combined_feature = self.tanh(combined_feature)

        combined_feature = self.fc1(combined_feature)  
        combined_feature = self.dropout(combined_feature)
        combined_feature = self.tanh(combined_feature)

        logits = self.fc2(combined_feature)  

        return logits


# Training & Testing

In [None]:
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
print(device)

In [None]:

def plot_training_history(history):
    epochs = range(1, len(history['train_accuracy']) + 1)

    plt.figure(figsize=(12, 5))

    # Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['train_accuracy'], label='Train Accuracy')
    plt.plot(epochs, history['valid_accuracy'], label='Valid Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    # Loss
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['train_loss'], label='Train Loss')
    plt.plot(epochs, history['valid_loss'], label='Valid Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
def train(model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=10, name="default", patience=10):
    model.to(device)
    best_valid_accuracy = 0.0
    epochs_no_improve = 0  

    history = {'train_loss': [], 'valid_loss': [], 'train_accuracy': [], 'valid_accuracy': []}

    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, questions, answers in train_dataloader:
            images, questions, answers = images.to(device), questions.to(device), answers.to(device)

            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, answers)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += answers.size(0)
            correct += (predicted == answers).sum().item()

        train_loss = running_loss / len(train_dataloader)
        train_accuracy = 100 * correct / total

        # Validation Phase
        model.eval()
        valid_loss = 0.0
        valid_correct = 0
        valid_total = 0

        with torch.no_grad():
            for images, questions, answers in valid_dataloader:
                images, questions, answers = images.to(device), questions.to(device), answers.to(device)

                outputs = model(images, questions)
                loss = criterion(outputs, answers)
                valid_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                valid_total += answers.size(0)
                valid_correct += (predicted == answers).sum().item()

        valid_loss = valid_loss / len(valid_dataloader)
        valid_accuracy = 100 * valid_correct / valid_total

        history['train_loss'].append(train_loss)
        history['valid_loss'].append(valid_loss)
        history['train_accuracy'].append(train_accuracy)
        history['valid_accuracy'].append(valid_accuracy)

        print(f"Epoch [{epoch+1}/{num_epochs}]:"
              f" Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}% |"
              f" Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.2f}%")

        # Save last checkpoint
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'valid_accuracy': valid_accuracy,
            'valid_loss': valid_loss
        }, f"last_{name}.pt")

        # Save best checkpoint and check early stopping
        if valid_accuracy > best_valid_accuracy:
            best_valid_accuracy = valid_accuracy
            epochs_no_improve = 0

            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'valid_accuracy': valid_accuracy,
                'valid_loss': valid_loss
            }, f"best_{name}.pt")

            print(f"Best model saved at epoch {epoch+1} with validation accuracy: {valid_accuracy:.2f}%")
        else:
            epochs_no_improve += 1

        # Early stopping
        if epochs_no_improve >= patience:
            print(f"Early stopping at epoch {epoch+1}, no improvement in validation accuracy for {patience} epochs.")
            break

    plot_training_history(history)


In [None]:
def evaluate_test(model, test_dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, questions, answers in test_dataloader:
            images, questions = images.to(device), questions.to(device)

            outputs = model(images, questions)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(answers.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Accuracy: {acc*100:.2f}%")
    print(f"Test F1-score: {f1:.4f}")

    return acc, f1


In [None]:
json_path_train = "/kaggle/working/train.json"  
json_path_test = "/kaggle/working/test.json"  

train_dataset = VQADataset(json_path_train, max_qu_len=20, typeData='train')
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)

valid_dataset = VQADataset(json_path_train, max_qu_len=20, typeData='valid')
valid_dataloader = DataLoader(valid_dataset, batch_size=16, shuffle=True)

test_dataset = VQADataset(json_path_test, max_qu_len=20, typeData='test')
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

In [None]:
epochs = 50

In [None]:
feature_size = 512
qu_vocab_size = 186
ans_vocab_size = 33
word_embed = 128
hidden_size = 256
num_hidden = 4

## Pretrained Model

### With Attention

#### MobileNetV2

In [None]:
set_seed(42)

mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
mobilenet.classifier = nn.Identity()  
in_features_mobilenet = 1280  

mobilenet_model = VQAModel(mobilenet, 
                           in_features_mobilenet, 
                           qu_vocab_size, 
                           ans_vocab_size,
                           word_embed, 
                           hidden_size, 
                           num_hidden, 
                           feature_size, 
                           with_att=True, 
                           is_train_image=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam( mobilenet_model.parameters(), lr=0.0005)

In [None]:
# train(mobilenet_model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='mobile_net')

In [None]:
# evaluate_test(mobilenet_model, test_dataloader, device)

#### ResNet50

In [None]:
set_seed(42)

resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
resnet.fc = nn.Identity()
in_features_resnet = 2048   

resnet_model = VQAModel(resnet, 
                        in_features_resnet, 
                        qu_vocab_size, 
                        ans_vocab_size,
                        word_embed, 
                        hidden_size, 
                        num_hidden, 
                        feature_size,  
                        with_att=True, 
                        is_train_image=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model.parameters(), lr=0.0005)

In [None]:
# train(resnet_model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='resnet')

In [None]:
# evaluate_test(resnet_model, test_dataloader, device)

#### EfficientNetB3 

In [None]:
set_seed(42)
efficientnet = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
efficientnet.classifier = nn.Identity()

in_features_efficientnet = 1536

efficientnet_model = VQAModel(efficientnet, 
                              in_features_efficientnet, 
                              qu_vocab_size, 
                              ans_vocab_size,
                              word_embed, 
                              hidden_size, 
                              num_hidden, 
                              feature_size,  
                              with_att=True, 
                              is_train_image=False )

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(efficientnet_model.parameters(), lr=0.0005)

In [None]:
# train(efficientnet_model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='efficientnet')

In [None]:
# evaluate_test(efficientnet_model, test_dataloader, device)

### Without Attention

#### MobileNetV2

In [None]:
set_seed(42)
mobilenet = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
mobilenet.classifier = nn.Identity()  
in_features_mobilenet = 1280  

mobilenet_model_no_att = VQAModel(mobilenet, 
                                  in_features_mobilenet, 
                                  qu_vocab_size, 
                                  ans_vocab_size,
                                  word_embed, 
                                  hidden_size, 
                                  num_hidden, 
                                  feature_size, 
                                  with_att=False, 
                                  is_train_image=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam( mobilenet_model_no_att.parameters(), lr=0.0005)

In [None]:
# train(mobilenet_model_no_att, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='mobile_net_no_att')

In [None]:
# evaluate_test(mobilenet_model_no_att, test_dataloader, device)

#### ResNet50

In [None]:
set_seed(42)
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
resnet.fc = nn.Identity()
in_features_resnet = 2048   

resnet_model_no_att = VQAModel(resnet, 
                               in_features_resnet, 
                               qu_vocab_size, 
                               ans_vocab_size,
                               word_embed, 
                               hidden_size, 
                               num_hidden, 
                               feature_size, 
                               with_att=False, 
                               is_train_image=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model_no_att.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# train(resnet_model_no_att, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='resnet_no_att')

In [None]:
# evaluate_test(resnet_model_no_att, test_dataloader, device)

#### EfficientNetB3 

In [None]:
set_seed(42)
efficientnet = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
efficientnet.classifier = nn.Identity()

in_features_efficientnet = 1536

efficientnet_model_no_att = VQAModel(efficientnet, 
                                     in_features_efficientnet, 
                                     qu_vocab_size, 
                                     ans_vocab_size,
                                     word_embed, 
                                     hidden_size, 
                                     num_hidden, 
                                     feature_size, 
                                     with_att=False, 
                                     is_train_image=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(efficientnet_model_no_att.parameters(), lr=0.0005)


In [None]:
# train(efficientnet_model_no_att, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='efficientnet_no_att')

In [None]:
# evaluate_test(efficientnet_model_no_att, test_dataloader, device)

## FromScarth Model

In [None]:
class CNN_Self_Build(nn.Module):
    def __init__(self, feature_dim=3096, dropout_rate=0.5):
        super(CNN_Self_Build, self).__init__()
        #
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  
        )
        
        
        self.block2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  
        )
        
        
        self.block3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  
        )
        
        
        self.block4 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)  
        )
        
        
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))  
        
        
        self.fc = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(512, feature_dim)
        )
        
    def forward(self, x):
        x = self.block1(x)   
        x = self.block2(x)   
        x = self.block3(x)   
        x = self.block4(x)   
        x = self.global_avg_pool(x)  
        x = x.view(x.size(0), -1)      
        features = self.fc(x)          
        return features


### With Attention

In [None]:
set_seed(42)
cnn_self_build = CNN_Self_Build(feature_dim=2048, dropout_rate=0.5)

in_features_cnn_self_build = 2048

cnn_model  = VQAModel(cnn_self_build, 
                      in_features_cnn_self_build, 
                      qu_vocab_size, 
                      ans_vocab_size,
                      word_embed, 
                      hidden_size, 
                      num_hidden, 
                      feature_size,  
                      with_att=True, 
                      is_train_image=True)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn_model.parameters(), lr=0.0005)

In [None]:
# train(cnn_model, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='default')

In [None]:
# evaluate_test(cnn_model, test_dataloader, device)

### Without Attention

In [None]:
set_seed(42)
cnn_self_build = CNN_Self_Build(feature_dim=2048, dropout_rate=0.5)

in_features_cnn_self_build = 2048

cnn_model_no_att  = VQAModel(cnn_self_build, 
                             in_features_cnn_self_build, 
                             qu_vocab_size, 
                             ans_vocab_size,
                             word_embed, 
                             hidden_size, 
                             num_hidden, 
                             feature_size, 
                             with_att=False, 
                             is_train_image=True)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn_model_no_att.parameters(), lr=0.0005)

In [None]:
train(cnn_model_no_att, train_dataloader, valid_dataloader, criterion, optimizer, device, num_epochs=epochs, name='default_no_att')

In [None]:
evaluate_test(cnn_model_no_att, test_dataloader, device)

# Implement

In [None]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# cnn_self_build = CNN_Self_Build(feature_dim=2048, dropout_rate=0.5)

# in_features_cnn_self_build = 2048
# cnn_model_no_att  = VQAModel(cnn_self_build, 
#                              in_features_cnn_self_build, 
#                              qu_vocab_size, 
#                              ans_vocab_size,
#                              word_embed, 
#                              hidden_size, 
#                              num_hidden, 
#                              feature_size, 
#                              with_att=False, 
#                              is_train_image=True)

# checkpoint = torch.load('/kaggle/input/demo_dl_mt/pytorch/default/1/best_default_no_att.pt', map_location=device)
# cnn_model_no_att.load_state_dict(checkpoint['model_state_dict'])
# cnn_model_no_att.to(device)
# cnn_model_no_att.eval()  

In [None]:
# def tokenize_question(question, ques_vocab, max_qu_len=20):
#     tokens = [ques_vocab.get(word, ques_vocab.get("<unk>", 0)) for word in question.lower().split()]
#     if len(tokens) > max_qu_len:
#         tokens = tokens[:max_qu_len]
#     else:
#         tokens += [0] * (max_qu_len - len(tokens))
#     return tokens

In [None]:
# def idx_to_answer(index, ans_vocab_path):
#     with open(ans_vocab_path, 'r', encoding='utf-8') as f:
#         vocab = f.read().splitlines()
#     if 0 <= index < len(vocab):
#         return vocab[index]
#     else:
#         return "<unk>"

In [None]:
# def visualize_result(image_path, question, predicted_answer):
#     image = Image.open(image_path).convert("RGB")

#     plt.figure(figsize=(8, 6))
#     plt.imshow(image)
#     plt.axis('off')

#     # Vẽ box chứa text
#     plt.gca().add_patch(patches.Rectangle((0, 0), image.width, 100, linewidth=0, facecolor='black', alpha=0.6))
#     plt.text(5, 20, f"Câu hỏi: {question}", color='white', fontsize=12, verticalalignment='top')
#     plt.text(5, 60, f"Dự đoán: {predicted_answer}", color='yellow', fontsize=12, verticalalignment='top')

#     plt.show()

In [None]:
# def implement(model, image_path, ques_vocab_path, ans_vocab_path, transform, device):
#     model.to(device)
#     model.eval()
    
#     ques_vocab = load_vocab(ques_vocab_path)

#     image = Image.open(image_path).convert("RGB")
#     image_tensor = transform(image).unsqueeze(0).to(device)

#     question = input("Nhập câu hỏi: ")
#     question_tokens = tokenize_question(question, ques_vocab)
#     question_tensor = torch.tensor(question_tokens, dtype=torch.long).unsqueeze(0).to(device)

#     with torch.no_grad():
#         output = model(image_tensor, question_tensor)
#         predicted_answer_idx = torch.argmax(output, dim=1).item()

#     predicted_answer = idx_to_answer(predicted_answer_idx, ans_vocab_path)
#     print(f"Dự đoán của mô hình: {predicted_answer}")

#     visualize_result(image_path, question, predicted_answer)

#     return predicted_answer


In [None]:
# def implement_random_samples(model, test_json_path, ques_vocab_path, ans_vocab_path, transform, device, sample_size=20):
#     with open(test_json_path, 'r', encoding='utf-8') as f:
#         test_data = json.load(f)
    
#     selected_samples = random.sample(test_data, sample_size)

#     model.to(device)
#     model.eval()
    
#     ques_vocab = load_vocab(ques_vocab_path)

#     results = []

#     for entry in selected_samples:
#         img_path = entry['img_path']
#         question = entry['question']

#         image = Image.open(img_path).convert("RGB")
#         image_tensor = transform(image).unsqueeze(0).to(device)

#         question_tokens = tokenize_question(question, ques_vocab)
#         question_tensor = torch.tensor(question_tokens, dtype=torch.long).unsqueeze(0).to(device)

#         with torch.no_grad():
#             output = model(image_tensor, question_tensor)
#             predicted_answer_idx = torch.argmax(output, dim=1).item()

#         predicted_answer = idx_to_answer(predicted_answer_idx, ans_vocab_path)

#         results.append({
#             'image_path': img_path,
#             'question': question,
#             'predicted_answer': predicted_answer
#         })


#     cols = 5
#     rows = 4

#     plt.figure(figsize=(20, 16))

#     for i, res in enumerate(results):
#         img = Image.open(res['image_path']).convert("RGB")
#         plt.subplot(rows, cols, i + 1)
#         plt.imshow(img)
#         plt.axis('off')

#         plt.gca().add_patch(patches.Rectangle((0, 0), img.width, 60, linewidth=0, facecolor='black', alpha=0.6))
#         plt.text(5, 20, f"Q: {res['question']}", color='white', fontsize=8, verticalalignment='top')
#         plt.text(5, 45, f"A: {res['predicted_answer']}", color='yellow', fontsize=8, verticalalignment='top')

#     plt.tight_layout()
#     plt.show()

In [None]:
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# ])

----

In [None]:
# ques_vocab_path = "/kaggle/working/question_vocabs.txt"
# ans_vocab_path = "/kaggle/working/answer_vocabs.txt"
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# image_path = "/kaggle/input/vqa-test/data/places/id_751.png"
# predicted_answer = implement(cnn_model_no_att, image_path, ques_vocab_path, ans_vocab_path, transform, device)

In [None]:
# test_json_path = '/kaggle/working/test.json'

# implement_random_samples(cnn_model_no_att, test_json_path, ques_vocab_path, ans_vocab_path, transform, device, sample_size=20)
