# Project: Enhancing Visual Reasoning in VQA Tasks through Interactive Learning

For my course project, I want to explore if interactive learning can improve spatial reasoning in Visual Question Answering (VQA) tasks.

## Background

From our readings and discussions so far, one of the key observations I've made is that current neural network models struggle with tasks requiring spatial reasoning or counting. They often rely on **pattern recognition** rather than true spatial understanding. Given this, I am somewhat skeptical of the research path based on the inherent, restrictive approach and am more drawn to exploring **interactive learning**. Can interactive learning bridge this gap, or bring more insights to traditional models?

## Available Resources

**Dataset**: CLEVR, a diagnostic dataset for testing spatial and logical reasoning. It also provides scene graphs as a foundation of interactive learning.  
**Existing Baselines**: Neural network-based VQA models (e.g., CNN + LSTM).

## Current Approaches

1. Implement a baseline VQA model to evaluate counting and spatial reasoning tasks. 
2. Introduce online learning approach, using given answers and programs to iteratively improve model predictions.
Compare model performance across these approaches and analyze their reasoning capabilities.

## Expected Results

Improved accuracy on counting and spatial reasoning tasks through interaction.
Insights into the potential interactive methods for improving static learning in VQA systems.

CLEVR paper: https://arxiv.org/pdf/1612.06890

dataset: https://github.com/facebookresearch/clevr-dataset-gen

In [1]:
import json
import torch
import torch.nn as nn
from pathlib import Path
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torchvision import models, transforms
import re
from collections import Counter

## 0. Load Data
In preliminary practice, I control the length of program to limit the complexity of questions.

In [2]:
folder_path = "/srv/data/CLEVR_v1.0/"

In [3]:
class CLEVRDataset(Dataset):
    def __init__(self, corpus_dir, split='train', transform=None, max_program_length=None):
        """
        Initialize CLEVR dataset
        :param corpus_dir: root directory path of CLEVR dataset
        :param split: data split ('train', 'val', 'test')
        :param transform: image preprocessing transform
        :param max_program_length: maximum length of program, subset questions
        """
        self.path = Path(corpus_dir)
        self.split = split
        # self.mode = mode  # 'train' or 'test'
        # self.feedback_mode = feedback_mode  # 'none', 'answer', 'program'
        self.transform = transform 
        self.max_program_length = max_program_length

        questions_file = self.path / f'questions/CLEVR_{self.split}_questions.json'
        all_items = json.load(questions_file.open())['questions']

        # subset
        if self.max_program_length is not None:
            self.items = [
                item for item in all_items if len(item.get('program', [])) <= self.max_program_length
            ]
        else:
            self.items = all_items

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        item = self.items[idx]

        # load image
        img_path = self.path / 'images' / self.split / item['image_filename']
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        # basic data
        data = {
            'image': image, 
            'question': item['question'], 
            'answer': item.get('answer', '<NO_ANS>'), 
            'program': item.get('program', [])  # optional
        }

        # # add feedback 
        # if self.mode == 'train' and self.feedback_mode != 'none':
        #     if self.feedback_mode == 'answer':
        #         data['feedback'] = {'answer': item.get('answer', '<NO_ANS>')}
        #     elif self.feedback_mode == 'program':
        #         data['feedback'] = {'program': item.get('program', [])}

        return data

In [4]:
# # load scene graphs

# scene_path = folder_path + "/scenes/CLEVR_val_scenes.json"
# with open(scene_path, 'r') as f:
#     scenes = json.load(f)

# # look into the structure
# print(scenes['scenes'][0])


In [5]:
# # load questions

# questions_path = folder_path + "/questions/CLEVR_val_questions.json"
# with open(questions_path, 'r') as f:
#     questions = json.load(f)

# # # look into the structure
# # print(questions['questions'][0])

In [6]:
# # collect simple 'count' questions as a starting point
# def extract_count_questions(questions_data, max_steps=3):
#     count_questions = []
#     for question in questions_data['questions']:
#         program = question['program']
#         if any(step['function'] == 'count' for step in program) and len(program) <= max_steps:
#             count_questions.append({
#                 'question_index': question['question_index'],
#                 'image_index': question['image_index'],
#                 'image_filename': question['image_filename'],
#                 'question': question['question'],
#                 'answer': question['answer'],
#                 'program': question['program']
#             })
#     return count_questions

# count_questions = extract_count_questions(questions, max_steps=3)

In [7]:
# print(f"Total 'count' questions found: {len(count_questions)}")
# print("Sample 'count' question:", count_questions[:5])

In [8]:
# # iterate the problem and analyze its program
# for question in count_questions:
#     print("Question:", question['question'])
#     print("Answer:", question['answer'])
#     print("Program:")
    
#     for step in question['program']:
#         print(f"  Function: {step['function']}, Inputs: {step['inputs']}, Values: {step['value_inputs']}")
#     print("-" * 40)

In [9]:
# random.shuffle(count_questions)

# # Split data for initialization and testing
# train_questions = count_questions[:200]
# test_questions = count_questions[200:262]

## 1. Baseline VQA Model

In previous lecture, Bill introduced a base VQA model, which utilized LSTM model to process question and VGG16 to extract image features.

In [10]:
train_dataset = CLEVRDataset(folder_path, split='val', max_program_length=3)


In [11]:
# construct a global vocab
def load_all_questions(folder_path):
    all_items = []
    for split in ['train', 'val', 'test']:
        questions_file = Path(folder_path) / f'questions/CLEVR_{split}_questions.json'
        all_items.extend(json.load(questions_file.open())['questions'])
    return all_items

tokenize = lambda text: re.findall(r'\w+|\S', text.lower())
def build_global_vocab(folder_path):
    all_items = load_all_questions(folder_path)

    # debug
    for item in all_items:
        if 'answer' not in item:
            print("Item without 'answer' key:", item)
            break
            
    all_questions = [tokenize(item['question']) for item in all_items if 'question' in item]
    all_answers = [item.get('answer', '<NO_ANS>') for item in all_items]

    question_vocab = Counter([token for question in all_questions for token in question])
    answer_vocab = Counter(all_answers)

    itos_q = ['<PAD>', '<UNK>'] + [s[0] for s in question_vocab.most_common()]
    stoi_q = {s: i for i, s in enumerate(itos_q)}
    pad_idx, unk_idx = stoi_q['<PAD>'], stoi_q['<UNK>']

    itos_a = ['<NO_ANS>'] + [s[0] for s in answer_vocab.most_common()]
    stoi_a = {s: i for i, s in enumerate(itos_a)}

    return question_vocab, answer_vocab, stoi_q, itos_q, stoi_a, itos_a, pad_idx, unk_idx


In [12]:
question_vocab, answer_vocab, stoi_q, itos_q, stoi_a, itos_a, pad_idx, unk_idx = build_global_vocab(folder_path)

Item without 'answer' key: {'image_index': 0, 'split': 'test', 'image_filename': 'CLEVR_test_000000.png', 'question_index': 0, 'question': 'Is there anything else that is the same shape as the small brown matte object?'}


In [13]:
# question_vocab

In [14]:
# answer_vocab

In [15]:
class LSTMEncoder(nn.Module):

    def __init__(self, n_tokens, hidden_size, n_layers, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(n_tokens, hidden_size)
        dropout = dropout if n_layers > 1 else 0
        self.lstm = nn.LSTM(hidden_size, hidden_size, n_layers, dropout=dropout)

    def forward(self, x, x_lens):
        x, hidden = self.lstm(self.embedding(x))
        # take the final token hidden state for each item in batch
        return x[torch.arange(x.size(0)),x_lens-1,:] 

class VQAModel(nn.Module):

    def __init__(self,
                num_answers,
                text_encoder, text_dim,
                image_encoder, image_dim):
        super().__init__()
        self.text_encoder = text_encoder
        self.image_encoder = image_encoder
        self.classifier = nn.Linear(text_dim + image_dim, num_answers)
        self.softmax = nn.Softmax(dim=0)

    def forward(self, text, text_lens, image):
        x_text = text_encoder(text, text_lens)
        x_image = image_encoder(image)
        x = torch.concat((x_text, x_image), axis=1)
        return self.softmax(self.classifier(x))

image_encoder = nn.Sequential(
    models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1).features,
    nn.AdaptiveAvgPool2d((7, 7)),
    nn.Flatten(),
    nn.Linear(25088, 4096),
    nn.ReLU(),
    nn.Linear(4096, 512)
)
                              
text_encoder = LSTMEncoder(len(itos_q), 50, 3)
model = VQAModel(len(itos_a), image_encoder, 512, text_encoder, 50)


In [16]:
image_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


In [17]:
# def collate_fn(samples):
#     tokenized_qs = [list(map(lambda x: stoi_q.get(x, unk_idx), s['question'])) for s in samples]
#     question_lens = torch.LongTensor([len(q) for q in tokenized_qs])
#     qs_maxlen = question_lens.max()
#     tokenized_qs = torch.LongTensor([q + [pad_idx] * (qs_maxlen - len(q)) for q in tokenized_qs])
#     images = torch.stack([image_transforms(s['image']) for s in samples])
#     answers = torch.LongTensor([stoi_a[s['answer']] for s in samples])
#     return tokenized_qs, question_lens, images, answers

# train_loader = DataLoader(train_dataset, batch_size=16, collate_fn=collate_fn)

In [18]:
def collate_fn(samples):
    tokenized_qs = [list(map(lambda x: stoi_q.get(x, unk_idx), tokenize(s['question']))) for s in samples]
    question_lens = torch.LongTensor([len(q) for q in tokenized_qs])
    qs_maxlen = question_lens.max()
    tokenized_qs = torch.LongTensor([q + [pad_idx] * (qs_maxlen - len(q)) for q in tokenized_qs])
    images = torch.stack([image_transforms(s['image']) for s in samples])
    answers = torch.LongTensor([stoi_a.get(s['answer'], -1) for s in samples])  # unknown -> -1
    return tokenized_qs, question_lens, images, answers

train_loader = DataLoader(train_dataset, batch_size=16, collate_fn=collate_fn)

In [19]:
# !nvidia-smi

# !ps -aux | grep python

In [20]:
from tqdm.notebook import tqdm

num_epochs = 5
learning_rate = 0.001
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=5e-4)

n_total_step = len(train_loader)
for epoch in range(num_epochs):
   for i, batch in enumerate(tqdm(train_loader)):
       x_text, x_text_lens, x_img, y = (item.to(device) for item in batch)       
       y_hat = model(x_text, x_text_lens, x_img)
       n_corrects = (y_hat.argmax(axis=1)==y).sum().item()
       loss_value = criterion(y_hat, y)
       loss_value.backward()
       optimizer.step()
       optimizer.zero_grad()
       if (i+1) % 250 == 0:
          print(f'epoch {epoch+1}/{num_epochs}, step: {i+1}/{n_total_step}: loss = {loss_value:.5f}, acc = {100*(n_corrects/y.size(0)):.2f}%')
   print()

cuda:0


  0%|          | 0/26 [00:00<?, ?it/s]




  0%|          | 0/26 [00:00<?, ?it/s]




  0%|          | 0/26 [00:00<?, ?it/s]




  0%|          | 0/26 [00:00<?, ?it/s]




  0%|          | 0/26 [00:00<?, ?it/s]




In [21]:
test_dataset = CLEVRDataset(folder_path, split='test', max_program_length=3)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

In [22]:
def calculate_accuracy(model, dataloader, device=device):
    model.eval()  
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in dataloader:
            tokenized_qs, question_lens, images, answers = batch
            
            tokenized_qs = tokenized_qs.to(device)
            question_lens = question_lens.to(device)
            images = images.to(device)
            answers = answers.to(device)

            outputs = model(tokenized_qs, question_lens, images)
            _, predicted = torch.max(outputs, dim=1)

            correct += (predicted == answers).sum().item()
            total += answers.size(0)
            
    accuracy = correct / total
    return accuracy


In [23]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

test_accuracy = calculate_accuracy(model, test_loader, device=device)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%") # 7.46%

Test Accuracy: 2.08%


## 2. Online Learning with Program

In [30]:
def build_program_vocab(train_dataset):
    programs = [item.get('program', []) for item in train_dataset]
    
    # extract all functions
    all_functions = [step['function'] for program in programs for step in program]
    function_vocab = Counter(all_functions)

    # build index mapping
    itos_p = ['<PAD>'] + [func for func, _ in function_vocab.most_common()]
    stoi_p = {func: idx for idx, func in enumerate(itof_p)}

    return ftoi_p, itof_p


In [31]:
stoi_p, itos_p = build_program_vocab(train_dataset)

print("String to Index:", stoi_p)
print("Index to String:", itos_p)

String to Index: {'<PAD>': 0, 'scene': 1, 'count': 2, 'filter_color': 3, 'exist': 4, 'filter_shape': 5, 'filter_material': 6, 'filter_size': 7}
Index to String: ['<PAD>', 'scene', 'count', 'filter_color', 'exist', 'filter_shape', 'filter_material', 'filter_size']


In [53]:
def collate_fn_1(samples):
    tokenized_qs = [list(map(lambda x: stoi_q.get(x, unk_idx), tokenize(s['question']))) for s in samples]
    question_lens = torch.LongTensor([len(q) for q in tokenized_qs])
    qs_maxlen = question_lens.max()
    tokenized_qs = torch.LongTensor([q + [pad_idx] * (qs_maxlen - len(q)) for q in tokenized_qs])
    images = torch.stack([image_transforms(s['image']) for s in samples])
    answers = torch.LongTensor([stoi_a.get(s['answer'], -1) for s in samples])  # unknown -> -1
   
    programs = []
    for s in samples:
        program_indices = torch.LongTensor([
            stoi_p.get(step['function'], -1) for step in s['program']  
        ])
        programs.append(program_indices)

    # padding
    max_program_length = max(len(p) for p in programs)
    padded_programs = torch.stack([
        torch.cat([p, torch.full((max_program_length - len(p),), pad_idx)]) for p in programs
    ])
    
    return tokenized_qs, question_lens, images, answers, padded_programs

train_loader_1 = DataLoader(train_dataset, batch_size=16, collate_fn=collate_fn_1)

In [61]:
class VQAModelWithProgram(nn.Module):
    def __init__(self, num_answers, image_encoder, image_dim, text_encoder, text_dim, program_vocab_size, program_dim):
        super().__init__()
        self.image_encoder = image_encoder  
        self.text_encoder = text_encoder    
        self.program_encoder = nn.Embedding(program_vocab_size, program_dim)  
        self.classifier = nn.Linear(image_dim + text_dim + program_dim, num_answers)
        self.softmax = nn.Softmax(dim=0)

    def forward(self, questions, question_lens, programs, images):

        image_features = self.image_encoder(images)
        question_features = self.text_encoder(questions, question_lens)
        program_features = self.program_encoder(programs).mean(dim=1) 

        combined_features = torch.cat((image_features, question_features, program_features), dim=1)

        output = self.classifier(combined_features)
        return self.softmax(output)


In [78]:
import random

def online_learning(model, dataloader, optimizer, criterion, num_iterations=100, device=device):
    performance = [] 

    # convert the dataloader into a list of batches
    all_batches = list(dataloader)
    random.shuffle(all_batches) 

    for iteration, batch in enumerate(dataloader):
        if iteration >= num_iterations:  # control the number of iterations
            break

        # train on the current batch
        model.train()

        model = model.to(device)
        tokenized_qs, question_lens, images, answers, programs = batch
        tokenized_qs = tokenized_qs.to(device)
        question_lens = question_lens.to(device)
        images = images.to(device)
        answers = answers.to(device)
        programs = programs.to(device)

        outputs = model(tokenized_qs, question_lens, programs, images)
        loss = criterion(outputs, answers)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # use remaining unseen batches for testing
        unseen_batches = all_batches[iteration + 1:]
        if unseen_batches:
            test_batch = unseen_batches[0]
            tokenized_qs, question_lens, images, answers, programs = test_batch
            
            tokenized_qs = tokenized_qs.to(device)
            question_lens = question_lens.to(device)
            images = images.to(device)
            answers = answers.to(device)
            programs = programs.to(device)

            model.eval()
            with torch.no_grad():
                outputs = model(tokenized_qs, question_lens, programs, images)
                _, predicted = torch.max(outputs, dim=1)
                test_accuracy = (predicted == answers).float().mean().item()
        else:
            test_accuracy = 0.0  # no more unseen batches

        print(f"Iteration {iteration+1}/{num_iterations}: Loss = {loss.item():.4f}, Test Accuracy = {test_accuracy * 100:.2f}%")

    return performance


In [79]:
model_withprogram = VQAModelWithProgram(len(itos_a), image_encoder, 512, text_encoder, 50, len(ftoi_p), 50)

In [80]:
performance = online_learning(model_withprogram, train_loader_1, optimizer, criterion, num_iterations=25, device=device)

Iteration 1/25: Loss = 3.4026, Test Accuracy = 0.00%
Iteration 2/25: Loss = 3.4032, Test Accuracy = 0.00%
Iteration 3/25: Loss = 3.3998, Test Accuracy = 6.25%
Iteration 4/25: Loss = 3.4023, Test Accuracy = 0.00%
Iteration 5/25: Loss = 3.4005, Test Accuracy = 0.00%
Iteration 6/25: Loss = 3.4014, Test Accuracy = 0.00%
Iteration 7/25: Loss = 3.4009, Test Accuracy = 0.00%
Iteration 8/25: Loss = 3.4040, Test Accuracy = 0.00%
Iteration 9/25: Loss = 3.4008, Test Accuracy = 0.00%
Iteration 10/25: Loss = 3.4012, Test Accuracy = 0.00%
Iteration 11/25: Loss = 3.4011, Test Accuracy = 12.50%
Iteration 12/25: Loss = 3.4028, Test Accuracy = 6.25%
Iteration 13/25: Loss = 3.4007, Test Accuracy = 12.50%
Iteration 14/25: Loss = 3.4005, Test Accuracy = 0.00%
Iteration 15/25: Loss = 3.4003, Test Accuracy = 6.25%
Iteration 16/25: Loss = 3.4022, Test Accuracy = 6.25%
Iteration 17/25: Loss = 3.4044, Test Accuracy = 6.25%
Iteration 18/25: Loss = 3.4017, Test Accuracy = 0.00%
Iteration 19/25: Loss = 3.4036, Tes

The output indicates that the model's predictions are basically random..try to use dynamic fusion weights rather than simple concatenation? How to make the approach more "interactive"?