### Imports

In [1]:
import os
import json
import random
import warnings
import re
import numpy as np
import torch
import torch.nn as nn

from PIL import Image
from collections import defaultdict, Counter
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [2]:
SEED = 777
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

### Dataset Loading

In [3]:
IMAGE_DIR = "VQA_RAD/VQA_RAD Image Folder"
JSON_PATH = "VQA_RAD/VQA_RAD Dataset Public.json"

with open(JSON_PATH, "r") as f:
    raw_data = json.load(f)

len(raw_data)

2248

In [4]:
def normalize_answer(ans):
    ans = ans.lower().strip()
    if ans in ["yes", "y"]:
        return 1
    if ans in ["no", "n"]:
        return 0
    return None

In [5]:
samples = []

for item in raw_data:
    if item.get("image_organ", "").lower() != "chest":
        continue

    if item.get("answer_type", "").lower() != "closed":
        continue

    label = normalize_answer(item.get("answer", ""))
    if label is None:
        continue

    image_name = item.get("image_name")
    if image_name is None:
        continue

    samples.append({
        "image_path": os.path.join(IMAGE_DIR, image_name),  # synpicXXXX.jpg
        "image_id": image_name,
        "question": item["question"].lower(),
        "label": label
    })

len(samples)

477

### Image-level Data Splitting

In [6]:
def image_level_split(samples, seed=SEED):
    random.seed(seed)

    image_to_samples = defaultdict(list)
    for s in samples:
        image_to_samples[s["image_id"]].append(s)

    image_ids = list(image_to_samples.keys())
    random.shuffle(image_ids)

    n = len(image_ids)
    train_ids = image_ids[:int(0.8 * n)]
    val_ids   = image_ids[int(0.8 * n):int(0.9 * n)]
    test_ids  = image_ids[int(0.9 * n):]

    def collect(ids):
        out = []
        for i in ids:
            out.extend(image_to_samples[i])
        return out

    return collect(train_ids), collect(val_ids), collect(test_ids)

train_samples, val_samples, test_samples = image_level_split(samples)
len(train_samples), len(val_samples), len(test_samples)

(365, 49, 63)

### Image Augmentation

In [7]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

eval_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

### Vocabulary + Tokenization

In [8]:
def tokenize(text):
    text = re.sub(r"[^a-z0-9\s]", "", text.lower())
    return text.split()

In [9]:
word_counter = Counter()
for s in train_samples:
    word_counter.update(tokenize(s["question"]))

vocab = {"<pad>": 0, "<unk>": 1}
for w in word_counter:
    vocab[w] = len(vocab)

vocab_size = len(vocab)
vocab_size

362

In [10]:
def encode_question(q, vocab, max_len=30):
    tokens = tokenize(q)
    ids = [vocab.get(t, vocab["<unk>"]) for t in tokens][:max_len]
    return ids + [0] * (max_len - len(ids))

### Dataset

In [11]:
class VQARadMultimodalDataset(Dataset):
    def __init__(self, samples, transform, vocab):
        self.samples = samples
        self.transform = transform
        self.vocab = vocab

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]
        image = Image.open(s["image_path"]).convert("RGB")
        image = self.transform(image)

        question = torch.tensor(
            encode_question(s["question"], self.vocab),
            dtype=torch.long
        )

        label = torch.tensor(s["label"], dtype=torch.long)
        return image, question, label

### DataLoaders

In [12]:
BATCH_SIZE = 32

train_loader = DataLoader(
    VQARadMultimodalDataset(train_samples, train_transform, vocab),
    batch_size=BATCH_SIZE, shuffle=True
)

val_loader = DataLoader(
    VQARadMultimodalDataset(val_samples, eval_transform, vocab),
    batch_size=BATCH_SIZE, shuffle=False
)

test_loader = DataLoader(
    VQARadMultimodalDataset(test_samples, eval_transform, vocab),
    batch_size=BATCH_SIZE, shuffle=False
)

### Model + Freeze Helpers

In [13]:
class ResNetBiLSTMVQA(nn.Module):
    def __init__(self, vocab_size, embed_dim=300, lstm_hidden=128, dropout=0.3):
        super().__init__()

        self.cnn = models.resnet50(pretrained=True)
        cnn_dim = self.cnn.fc.in_features
        self.cnn.fc = nn.Identity()

        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, lstm_hidden, batch_first=True, bidirectional=True)

        self.classifier = nn.Sequential(
            nn.Linear(cnn_dim + 2*lstm_hidden, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, 2)
        )

    def forward(self, image, question):
        img_feat = self.cnn(image)
        emb = self.embedding(question)
        _, (h, _) = self.lstm(emb)
        q_feat = torch.cat([h[-2], h[-1]], dim=1)
        return self.classifier(torch.cat([img_feat, q_feat], dim=1))

In [14]:
def freeze_cnn(model):
    for p in model.cnn.parameters():
        p.requires_grad = False

def unfreeze_cnn(model):
    for p in model.cnn.parameters():
        p.requires_grad = True

### Training Utilities

In [15]:
criterion = nn.CrossEntropyLoss()

def train_epoch(model, loader, optimizer):
    model.train()
    total = 0
    for img, q, y in loader:
        img, q, y = img.to(device), q.to(device), y.to(device)
        optimizer.zero_grad()
        loss = criterion(model(img, q), y)
        loss.backward()
        optimizer.step()
        total += loss.item()
    return total / len(loader)

In [16]:
@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    preds, labels = [], []
    for img, q, y in loader:
        img, q = img.to(device), q.to(device)
        p = model(img, q).argmax(1)
        preds.extend(p.cpu().numpy())
        labels.extend(y.numpy())
    return accuracy_score(labels, preds), f1_score(labels, preds, average="macro")

### Hyperparameter Tuning

In [17]:
param_grid = [
    {"embed_dim": 300, "lstm_hidden": 128, "dropout": 0.3, "lr": 1e-4},
    {"embed_dim": 300, "lstm_hidden": 128, "dropout": 0.5, "lr": 5e-5},
    {"embed_dim": 100, "lstm_hidden": 64,  "dropout": 0.3, "lr": 1e-4},
]

In [18]:
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_score = None
        self.counter = 0
        self.early_stop = False

    def step(self, score):
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

In [19]:
def run_experiment(params):
    model = ResNetBiLSTMVQA(
        vocab_size,
        embed_dim=params["embed_dim"],
        lstm_hidden=params["lstm_hidden"],
        dropout=params["dropout"]
    ).to(device)

    freeze_cnn(model)

    optimizer = torch.optim.AdamW(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=params["lr"],
        weight_decay=1e-4
    )

    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode="max", patience=2, factor=0.5
    )

    early_stopping = EarlyStopping(patience=3)
    best_val_f1 = 0.0

    for epoch in range(20):
        if epoch == 5:
            unfreeze_cnn(model)
            optimizer = torch.optim.AdamW(
                model.parameters(),
                lr=optimizer.param_groups[0]["lr"],
                weight_decay=1e-4
            )

        train_epoch(model, train_loader, optimizer)
        val_acc, val_f1 = evaluate(model, val_loader)

        scheduler.step(val_f1)
        best_val_f1 = max(best_val_f1, val_f1)

        early_stopping.step(val_f1)
        if early_stopping.early_stop:
            break

    return best_val_f1

In [20]:
results = []
for p in param_grid:
    print("Testing:", p)
    f1 = run_experiment(p)
    results.append({**p, "val_f1": f1})

results

Testing: {'embed_dim': 300, 'lstm_hidden': 128, 'dropout': 0.3, 'lr': 0.0001}


Testing: {'embed_dim': 300, 'lstm_hidden': 128, 'dropout': 0.5, 'lr': 5e-05}


Testing: {'embed_dim': 100, 'lstm_hidden': 64, 'dropout': 0.3, 'lr': 0.0001}


[{'embed_dim': 300,
  'lstm_hidden': 128,
  'dropout': 0.3,
  'lr': 0.0001,
  'val_f1': 0.5811965811965812},
 {'embed_dim': 300,
  'lstm_hidden': 128,
  'dropout': 0.5,
  'lr': 5e-05,
  'val_f1': 0.4555555555555556},
 {'embed_dim': 100,
  'lstm_hidden': 64,
  'dropout': 0.3,
  'lr': 0.0001,
  'val_f1': 0.6165884194053208}]

### Model Training

In [21]:
BEST_MODEL = {
    "embed_dim": 300,
    "lstm_hidden": 128,
    "dropout": 0.3
}

BEST_LR = 1e-4

model = ResNetBiLSTMVQA(vocab_size, **BEST_MODEL).to(device)
freeze_cnn(model)

optimizer = torch.optim.AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=BEST_LR,
    weight_decay=1e-4
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="max", patience=3, factor=0.5
)


In [22]:
early_stopping = EarlyStopping(patience=10)
best_val_f1 = 0.0

for epoch in range(30):

    if epoch == 5:
        print("Unfreezing CNN backbone")
        unfreeze_cnn(model)
        optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=optimizer.param_groups[0]["lr"],
            weight_decay=1e-4
        )

    train_loss = train_epoch(model, train_loader, optimizer)
    val_acc, val_f1 = evaluate(model, val_loader)

    scheduler.step(val_f1)

    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        torch.save(model.state_dict(), "best_resnet_bilstm_vqa.pt")

    print(
        f"Epoch {epoch+1:02d} | "
        f"Train Loss: {train_loss:.4f} | "
        f"Val Acc: {val_acc:.4f} | "
        f"Val Macro-F1: {val_f1:.4f}"
    )

    early_stopping.step(val_f1)
    if early_stopping.early_stop:
        print(f"Early stopping triggered at epoch {epoch+1}")
        break


Epoch 01 | Train Loss: 0.6885 | Val Acc: 0.6939 | Val Macro-F1: 0.5455


Epoch 02 | Train Loss: 0.6815 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 03 | Train Loss: 0.6617 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 04 | Train Loss: 0.6636 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 05 | Train Loss: 0.6419 | Val Acc: 0.7143 | Val Macro-F1: 0.4167
Unfreezing CNN backbone


Epoch 06 | Train Loss: 0.6424 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 07 | Train Loss: 0.5303 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 08 | Train Loss: 0.4842 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 09 | Train Loss: 0.4613 | Val Acc: 0.7143 | Val Macro-F1: 0.4167


Epoch 10 | Train Loss: 0.4428 | Val Acc: 0.5714 | Val Macro-F1: 0.4035


Epoch 11 | Train Loss: 0.4663 | Val Acc: 0.6531 | Val Macro-F1: 0.4450
Early stopping triggered at epoch 11


### Test Evaluation

In [23]:
test_acc, test_f1 = evaluate(model, test_loader)
print("Test Acc:", test_acc)
print("Test F1:", test_f1)

Test Acc: 0.5555555555555556
Test F1: 0.5288461538461539
