In [75]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from collections import Counter

In [76]:
from torchvision import transforms

train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [102]:
class VQAmodel(nn.Module):
    def __init__(self, vocab_size, num_ans):
        super().__init__()
        resnet = models.resnet18(pretrained=True)
        self.img_encoder = nn.Sequential(*list(resnet.children())[:-1])
        
        for name, param in self.img_encoder.named_parameters():
            if 'layer3' not in name and 'layer4' not in name:
                param.requires_grad = False
        
        self.img_fc = nn.Sequential(
            nn.Linear(512, 768),
            nn.BatchNorm1d(768),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        self.ques_encoder = nn.Embedding(vocab_size, 300)
        self.lstm = nn.LSTM(300, 768, num_layers=2, batch_first=True, dropout=0.3)
        
        self.fusion = nn.Sequential(
            nn.Linear(768*2, 768),
            nn.BatchNorm1d(768),
            nn.ReLU(),
            nn.Dropout(0.4)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(768, 384),
            nn.BatchNorm1d(384),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(384, num_ans)
        )
    
    def forward(self, image, question):
        img_encod = self.img_encoder(image)
        img_encod = img_encod.view(img_encod.size(0), -1)
        img_encod = self.img_fc(img_encod)
        
        q_encod = self.ques_encoder(question)
        _, (q_features, _) = self.lstm(q_encod)
        q_features = q_features[-1]
        
        combined = torch.cat([img_encod, q_features], dim=1)
        fused = self.fusion(combined)
        
        result = self.classifier(fused)
        return result

In [103]:
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, question, answer, image_path, transform):
        super().__init__()
        self.question = question
        self.answer = answer
        self.image_path = image_path
        self.transform = transform

    def __len__(self):
        return len(self.question)

    def __getitem__(self, idx):
        image = Image.open(self.image_path[idx]).convert('RGB')
        image = self.transform(image)
        
        question = torch.tensor(self.question[idx], dtype=torch.long)
        answer = torch.tensor(self.answer[idx], dtype=torch.long)
        
        return image, question, answer

In [104]:
train_data = pd.read_csv('dataset//data_train.csv')
ques = train_data.iloc[:, 0].tolist()
imgs = train_data.iloc[:, 2].tolist()
ans = train_data.iloc[:, 1].tolist()

answer_counts = Counter(ans)
top_answers = [a for a, _ in answer_counts.most_common(100)]

filtered_data = [(q, a, img) for q, a, img in zip(ques, ans, imgs) if a in top_answers]
ques = [x[0] for x in filtered_data]
ans = [x[1] for x in filtered_data]
imgs = [x[2] for x in filtered_data]

print(f"Original samples: {len(train_data)}")
print(f"Filtered samples: {len(ques)}")
print(f"Answer classes reduced to 120")
print(f"Samples per class: {len(ques)/120:.1f}")

Original samples: 9974
Filtered samples: 7525
Answer classes reduced to 120
Samples per class: 62.7


In [105]:
def make_vocab(sentences):
    words = []
    for line in sentences:
        word = line.lower().strip().split()
        words.extend(word)
    words = ['<PAD>', '<UNK>'] + sorted(set(words))
    word_to_idx = {word: i for i, word in enumerate(words)}
    return word_to_idx

In [106]:
def tokenize(vocab, sentences, max_len=25):
    tokenized = []
    for line in sentences:
        ques = []
        words = line.lower().strip().split()
        for word in words:
            if word in vocab:
                ques.append(vocab[word])
            else:
                ques.append(vocab['<UNK>'])
        
        if len(ques) < max_len:
            ques += [vocab['<PAD>']] * (max_len - len(ques))
        else:
            ques = ques[:max_len]
        
        tokenized.append(ques)
    return tokenized

In [107]:
image_path = ['dataset//images//' + img + '.png' for img in imgs]

In [108]:
vocab = make_vocab(ques)
vocab_size = len(vocab)
tokenized_ques = tokenize(vocab, ques)

unique_answers = sorted(set(ans))
ans_to_idx = {answer: idx for idx, answer in enumerate(unique_answers)}
num_ans = len(ans_to_idx)
tokenized_ans = [ans_to_idx[a] for a in ans]

print(f"Vocab size: {vocab_size}")
print(f"Answer classes: {num_ans}")

Vocab size: 888
Answer classes: 100


In [109]:
train_idx, val_idx = train_test_split(range(len(tokenized_ques)), test_size=0.15, random_state=42)

train_ques = [tokenized_ques[i] for i in train_idx]
train_ans = [tokenized_ans[i] for i in train_idx]
train_imgs = [image_path[i] for i in train_idx]

val_ques = [tokenized_ques[i] for i in val_idx]
val_ans = [tokenized_ans[i] for i in val_idx]
val_imgs = [image_path[i] for i in val_idx]

train_dataset = CustomDataset(train_ques, train_ans, train_imgs, train_transforms)
val_dataset = CustomDataset(val_ques, val_ans, val_imgs, test_transforms)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, pin_memory=True, num_workers=0)

print(f"Train samples: {len(train_dataset)}")
print(f"Val samples: {len(val_dataset)}")

Train samples: 6396
Val samples: 1129


In [110]:
epochs = 50
learning_rate = 0.0005

In [111]:
# Verify one sample
idx = 0
print(f"Question: {ques[idx]}")
print(f"Answer: {ans[idx]}")
print(f"Image: {image_path[idx]}")
print(f"\nTokenized question: {tokenized_ques[idx]}")
print(f"Tokenized answer: {tokenized_ans[idx]}")
print(f"Answer should be in range [0, {num_ans-1}]")

Question: what is the object on the shelves
Answer: cup
Image: dataset//images//image100.png

Tokenized question: [866, 422, 785, 519, 524, 785, 681, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Tokenized answer: 34
Answer should be in range [0, 99]


In [112]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using: {device}")

model = VQAmodel(vocab_size, num_ans).to(device)
model.train()

criterion_loss = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

Using: cuda




In [113]:
from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()
best_val_loss = float('inf')
patience = 7
patience_counter = 0

for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (images, questions, answers) in enumerate(train_loader):
        images, questions, answers = images.to(device), questions.to(device), answers.to(device)
        
        optimizer.zero_grad()
        
        with autocast():
            outputs = model(images, questions)
            loss = criterion_loss(outputs, answers)
        
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
        
        epoch_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += answers.size(0)
        correct += (predicted == answers).sum().item()
    
    train_loss = epoch_loss / len(train_loader)
    train_acc = 100 * correct / total
    
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for images, questions, answers in val_loader:
            images, questions, answers = images.to(device), questions.to(device), answers.to(device)
            
            with autocast():
                outputs = model(images, questions)
                loss = criterion_loss(outputs, answers)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += answers.size(0)
            val_correct += (predicted == answers).sum().item()
    
    avg_val_loss = val_loss / len(val_loader)
    val_acc = 100 * val_correct / val_total
    
    scheduler.step(avg_val_loss)
    
    print(f'Epoch {epoch+1:2d}: Train Loss={train_loss:.4f} Acc={train_acc:.2f}% | Val Loss={avg_val_loss:.4f} Acc={val_acc:.2f}%')
    
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'vqa_best_Bigger_Model.pth')
        print('Best model saved')
    else:
        patience_counter += 1
        print(f'  Patience: {patience_counter}/{patience}')
        if patience_counter >= patience:
            print('Early stopping!')
            break

print('\n Training Complete!')

  scaler = GradScaler()
  with autocast():
  with autocast():


Epoch  1: Train Loss=4.4186 Acc=4.71% | Val Loss=4.1790 Acc=8.06%
Best model saved
Epoch  2: Train Loss=3.9896 Acc=10.58% | Val Loss=3.8161 Acc=11.51%
Best model saved
Epoch  3: Train Loss=3.7872 Acc=12.54% | Val Loss=3.7751 Acc=13.11%
Best model saved
Epoch  4: Train Loss=3.5806 Acc=16.39% | Val Loss=3.4512 Acc=17.36%
Best model saved
Epoch  5: Train Loss=3.3937 Acc=19.82% | Val Loss=3.4094 Acc=17.89%
Best model saved
Epoch  6: Train Loss=3.2780 Acc=21.62% | Val Loss=3.3376 Acc=21.26%
Best model saved
Epoch  7: Train Loss=3.2135 Acc=22.53% | Val Loss=3.2749 Acc=20.55%
Best model saved
Epoch  8: Train Loss=3.1346 Acc=25.89% | Val Loss=3.1869 Acc=23.56%
Best model saved
Epoch  9: Train Loss=3.0471 Acc=27.99% | Val Loss=3.1217 Acc=25.42%
Best model saved
Epoch 10: Train Loss=2.9724 Acc=29.24% | Val Loss=3.1329 Acc=26.22%
  Patience: 1/7
Epoch 11: Train Loss=2.8953 Acc=31.94% | Val Loss=3.0975 Acc=27.28%
Best model saved
Epoch 12: Train Loss=2.8451 Acc=33.61% | Val Loss=3.0402 Acc=28.88%


In [117]:
test_df = pd.read_csv('dataset//data_eval.csv')
test_ques = test_df.iloc[:, 0].tolist()
test_ans = test_df.iloc[:, 1].tolist()
test_imgs = test_df.iloc[:, 2].tolist()

test_tokenized_ques = tokenize(vocab, test_ques, max_len=25)
test_tokenized_ans = []
valid_indices = []

for idx, a in enumerate(test_ans):
    if a in ans_to_idx:
        test_tokenized_ans.append(ans_to_idx[a])
        valid_indices.append(idx)

test_tokenized_ques_filtered = [test_tokenized_ques[i] for i in valid_indices]
test_imgs_filtered = [test_imgs[i] for i in valid_indices]
test_image_path = ['dataset//images//' + img + '.png' for img in test_imgs_filtered]

test_dataset = CustomDataset(test_tokenized_ques_filtered, test_tokenized_ans, test_image_path, test_transforms)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, pin_memory=True)

print(f"Total test samples: {len(test_ans)}")
print(f"Valid samples: {len(valid_indices)}")

Total test samples: 2494
Valid samples: 1864


In [None]:
model.load_state_dict(torch.load('vqa_best_Bigger_Model.pth', map_location=device, weights_only=True))
model.eval()

test_correct = 0
test_total = 0
test_loss = 0

with torch.no_grad():
    for images, questions, answers in test_loader:
        images, questions, answers = images.to(device), questions.to(device), answers.to(device)
        
        with autocast():
            outputs = model(images, questions)
            loss = criterion_loss(outputs, answers)
        
        test_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        test_total += answers.size(0)
        test_correct += (predicted == answers).sum().item()

test_acc = 100 * test_correct / test_total
avg_test_loss = test_loss / len(test_loader)

print(f'\n{"="*60}')
print(f'FINAL TEST RESULTS:')
print(f'  Loss: {avg_test_loss:.4f}')
print(f'  Accuracy: {test_acc:.2f}%')

  with autocast():



FINAL TEST RESULTS:
  Loss: 3.0232
  Accuracy: 28.97%
