This is a generic script for finetuning VisualBERT model for VQA tasks
Simply format your data as per the requirements below and this should work.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Data format
* the images (stored in a single folder)
* the questions (stored in a JSON) -- with following
* the annotations (stored in a JSON) a.k.a. the answers to the questions.

Refer to https://github.com/multimodal/multimodal/blob/master/test/data/vqa2/val/v2_OpenEnded_mscoco_val2014_questions.json for a sample of how question json file should look like.

In [None]:
!pip install -q git+https://github.com/huggingface/transformers.git

In [None]:
import os
home = '/content/drive/MyDrive/ReadyForFineTuning/MultiModalQA/'
imroot = home+'merged_images'
os.chdir(home)

This piece of code converts data in VLQAv1 into desired format. If multiple images are there, it automatically merges them into one.

In [None]:
prefix = 'mmqa'

In [None]:
import json
import ast

dumpdict = { "info": {}, "task_type": "Open-Ended", "data_type": "mmqa", "license": {}, "data_subtype": prefix,
    "questions": [] }

with open(home+prefix+'.jsonl') as f:
  data = f.readlines()
  print(len(data))
  for i in data:
    ijson = json.loads(i)
    imlist = ast.literal_eval(ijson['images'])
    anslist = [str(i) for i in ast.literal_eval(ijson['answer_choices'])]
    #if len(imlist) == 2:
    #  mpath = mergeim(imlist[0], imlist[1])
    # mpath
    dumpdict['questions'].append( { "image_id": imroot+'/Merged_'+imlist[0].replace("./images/","",1).replace(".png","",1)+"#"+imlist[1].replace("./images/","",1), "question": ijson['question']+" "+ijson['passage'], "question_id": ijson['qid'], "answer_choices": anslist, "answer_id": int(ijson['answer']) } )

with open('/content/'+prefix+'_vbertft.jsonl','w+') as w:
  w.write(json.dumps(dumpdict,indent=4))

print(dumpdict)

### Read questions

First, we read the questions.

In [None]:
import json
f = open('/content/'+prefix+'_vbertft.jsonl','r')
data_questions = json.load(f)
questions = data_questions['questions']

print(data_questions.keys())
print("Number of questions:", len(questions))

That's quite a lot! Let's take a look at the first one:

In [None]:
questions[0]

In [None]:
from os import listdir
from os.path import isfile, join

# -- CHANGE THIS BASED ON WHERE IMAGES ARE STORED --
#root = home+'images'

file_names = [f for f in listdir(imroot) if isfile(join(imroot, f))]
print(file_names)
print(len(file_names))

In [None]:
from PIL import Image

path = questions[0]['image_id']
image = Image.open(path)
image

In [None]:
from transformers import AutoTokenizer, VisualBertForQuestionAnswering
import torch

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

config = VisualBertForQuestionAnswering.from_pretrained("uclanlp/visualbert-vqa")

In [None]:
import torch
from PIL import Image

class VQADataset(torch.utils.data.Dataset):

    def __init__(self, questions, processor): #annotations
        self.questions = questions
        #self.annotations = annotations
        self.processor = processor

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        # get image + text
        #annotation = self.annotations[idx]
        questions = self.questions[idx]
        #image = Image.open(id_to_filename[annotation['image_id']])
        image = Image.open(questions['image_id']).convert("RGB")
        text = questions['question']
        #print(image)
        #print(text)
        encoding = self.processor(image, text, padding="max_length", truncation=True, return_tensors="pt")
        # remove batch dimension
        for k,v in encoding.items():
          encoding[k] = v.squeeze()
        # add labels
        #labels = questions['answer_choices'][questions['answer_id']] #annotation['labels']
        #scores = [1.0] #annotation['scores']
        #print(len(questions['answer_choices']))
        targets = torch.zeros(len(questions['answer_choices']))
        targets[questions['answer_id']] = 1.0
        #print(targets)
        #for label, score in zip(labels, scores):
        #      targets[label] = score
        encoding["labels"] = targets
        #print(encoding)
        #print(labels)
        #print(scores)

        return encoding

In [None]:
from transformers import VisualBertForQuestionAnswering

processor = VisualBertForQuestionAnswering.from_pretrained("uclanlp/visualbert-vqa")

In [None]:
dataset = VQADataset(questions=questions,processor=processor) ##annotations=annotations[:10],

In [None]:
dataset[1].keys()

In [None]:
processor.decode(dataset[0]['input_ids'])

In [None]:
labels = torch.nonzero(dataset[0]['labels']).squeeze().tolist()

In [None]:
from transformers import VisualBertForQuestionAnswering

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = VisualBertForQuestionAnswering.from_pretrained("uclanlp/visualbert-vqa", num_labels=2)

model.to(device)

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
  input_ids = [item['input_ids'] for item in batch]
  pixel_values = [item['pixel_values'] for item in batch]
  #print(pixel_values)
  attention_mask = [item['attention_mask'] for item in batch]
  token_type_ids = [item['token_type_ids'] for item in batch]
  labels = [item['labels'] for item in batch]

  # create padded pixel values and corresponding pixel mask
  #encoding = processor.feature_extractor.pad_and_create_pixel_mask(pixel_values, return_tensors="pt")

  # create new batch
  batch = {}
  batch['input_ids'] = torch.stack(input_ids)
  batch['attention_mask'] = torch.stack(attention_mask)
  batch['token_type_ids'] = torch.stack(token_type_ids)
  batch['pixel_values'] = torch.stack(pixel_values) #encoding['pixel_values']
  #batch['pixel_mask'] = encoding['pixel_mask']
  batch['labels'] = torch.stack(labels)

  return batch

train_dataloader = DataLoader(dataset, collate_fn=collate_fn, batch_size=1, shuffle=True)

Let's verify a batch:

In [None]:
batch = next(iter(train_dataloader))

In [None]:
for k,v in batch.items():
  print(k, v.shape)

## Train a model

Finally, let's train a model!

In [None]:
from tqdm.notebook import tqdm
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

# -- CHANGE BASED ON HOW MANY EPOCHS YOU WANT TO RUN FOR
e = 5

model.train()
for epoch in range(e):  # loop over the dataset multiple times
   print(f"Epoch: {epoch}")
   for batch in tqdm(train_dataloader):
        # get the inputs;
        batch = {k:v.to(device) for k,v in batch.items()}

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(**batch)
        loss = outputs.loss
        print("Loss:", loss.item())
        loss.backward()
        optimizer.step()

## Inference

Let's verify whether the model has actually learned something:

# Single Instance Inference

In [None]:
exid = 1
example = dataset[exid]
print(example.keys())

In [None]:
# add batch dimension + move to GPU
example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}

# forward pass
outputs = model(**example)

In [None]:
logits = outputs.logits
predicted_class = logits.argmax(-1).item()
print("Predicted answer:", predicted_class)
print("Ground-truth answer:", dataset[exid]['labels'].argmax(-1).item())

# Batch Inference

In [None]:
import csv
tasktype = "2way"
filprefix = prefix+"_"+tasktype

count = 0
st = 0
end = 251
with open(filprefix+'.csv', 'w+') as k:
  spamwriter = csv.writer(k)
  spamwriter.writerow(["qid","pred_ans","gt_ans","correctness"])
  for exid in range(st,end):
    correctness = 0
    example = dataset[exid]
    example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}
    outputs = model(**example)
    logits = outputs.logits
    predicted_class = logits.argmax(-1).item()
    gtclass = dataset[exid]['labels'].argmax(-1).item()
    if gtclass==predicted_class:
      count+=1
      correctness = 1
    #print([exid,predicted_class, gtclass,correctness])
    spamwriter.writerow([exid,predicted_class, gtclass,correctness])