In [None]:
import os
import requests
from tqdm import tqdm
import zipfile

import torch
import numpy as np

import nltk
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import json
import random

Download VQA v2 data

In [None]:
urls = {
    "train_images": "http://images.cocodataset.org/zips/train2014.zip",
    "val_images": "http://images.cocodataset.org/zips/val2014.zip",
    "annotations_train": "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Annotations_Train_mscoco.zip",
    "annotations_val": "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Annotations_Val_mscoco.zip",
    "questions_train": "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Train_mscoco.zip",
    "questions_val": "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Val_mscoco.zip"
}

data_dir = 'VQA_data'

def download_file(url, save_path):
    print(f"Downloading {url}")
    response = requests.get(url, stream=True)

    total_size = int(response.headers.get('content-length', 0))
    with open(save_path, 'wb') as file, tqdm(
        desc=save_path,
        total=total_size,
        unit='B',
        unit_scale=True
    ) as bar:
        for data in response.iter_content(chunk_size=1024):
            file.write(data)
            bar.update(len(data))
    print(f"Completed: {save_path}")

# Create directory if it doesn't exist, download the data.
# Otherwise, don't download.
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    for key, url in urls.items():
        filename = url.split("/")[-1]
        save_path = os.path.join(data_dir, filename)

        if not os.path.exists(save_path):
            download_file(url, save_path)
        else:
            print(f"File already exists: {save_path}")

print("Dataset is downloaded successfully.")

Unzip the file

In [None]:
def unzip_file(zip_path, extract_to):
    print(f"Extracting {zip_path}")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Completed: {zip_path}")

image_files = [os.path.join(data_dir, 'train2014.zip'),
               os.path.join(data_dir, 'val2014.zip'),
               os.path.join(data_dir, 'v2_Annotations_Train_mscoco.zip'),
               os.path.join(data_dir, 'v2_Annotations_Val_mscoco.zip'),
               os.path.join(data_dir, 'v2_Questions_Train_mscoco.zip'),
               os.path.join(data_dir, 'v2_Questions_Val_mscoco.zip')]

for image_file in image_files:
    unzip_file(image_file, data_dir)

Extracting VQA_data/v2_Annotations_Train_mscoco.zip
Completed: VQA_data/v2_Annotations_Train_mscoco.zip
Extracting VQA_data/v2_Annotations_Val_mscoco.zip
Completed: VQA_data/v2_Annotations_Val_mscoco.zip
Extracting VQA_data/v2_Questions_Train_mscoco.zip
Completed: VQA_data/v2_Questions_Train_mscoco.zip
Extracting VQA_data/v2_Questions_Val_mscoco.zip
Completed: VQA_data/v2_Questions_Val_mscoco.zip


Get GLOVE pretrained word embedding

In [None]:
glove_file_path = 'glove.6B.50d.txt'
glove_zip_url = 'http://nlp.stanford.edu/data/glove.6B.zip'
glove_zip_path = 'glove.6B.zip'

if not os.path.exists(glove_file_path):
    response = requests.get(glove_zip_url)
    with open(glove_zip_path, 'wb') as f:
        f.write(response.content)

    with zipfile.ZipFile(glove_zip_path, 'r') as zip_ref:
        zip_ref.extractall()

    os.remove(glove_zip_path)

def load_glove_embeddings(file_path, embedding_dim=50):
    word_embeddings = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            word_embeddings[word] = torch.tensor(vector)
    return word_embeddings

word_embeddings = load_glove_embeddings(glove_file_path)

word = "computer"
w_embedding = word_embeddings.get(word, torch.zeros(50))
print(f"Embedding for '{word}':\n", w_embedding)


Embedding for 'computer':
 tensor([ 0.0791, -0.8150,  1.7901,  0.9165,  0.1080, -0.5563, -0.8443, -1.4951,
         0.1342,  0.6363,  0.3515,  0.2581, -0.5503,  0.5106,  0.3741,  0.1209,
        -1.6166,  0.8365,  0.1420, -0.5235,  0.7345,  0.1221, -0.4908,  0.3253,
         0.4531, -1.5850, -0.6385, -1.0053,  0.1045, -0.4298,  3.1810, -0.6219,
         0.1682, -1.0139,  0.0641,  0.5784, -0.4556,  0.7378,  0.3720, -0.5772,
         0.6644,  0.0551,  0.0379,  1.3275,  0.3099,  0.5070,  1.2357,  0.1274,
        -0.1143,  0.2071])


Get the largest size of tokens in the question

In [None]:
import json

def get_max_tokens(questions_file):
    with open(questions_file, 'r') as f:
        questions_data = json.load(f)['questions']

    max_tokens = 0
    for question in questions_data:
        tokens = question['question'].lower().split()
        max_tokens = max(max_tokens, len(tokens))

    return max_tokens


questions_file_train = 'VQA_data/v2_OpenEnded_mscoco_train2014_questions.json'
questions_file_val = 'VQA_data/v2_OpenEnded_mscoco_val2014_questions.json'

max_tokens_train = get_max_tokens(questions_file_train)
max_tokens_val = get_max_tokens(questions_file_val)

print(f"Max tokens in questions: {max(max_tokens_train, max_tokens_val)}")

Max tokens in questions: 23


Process training and validation set

In [None]:
glove_file_path = 'glove.6B.50d.txt'
glove_zip_url = 'http://nlp.stanford.edu/data/glove.6B.zip'
glove_zip_path = 'glove.6B.zip'

if not os.path.exists(glove_file_path):
    response = requests.get(glove_zip_url)
    with open(glove_zip_path, 'wb') as f:
        f.write(response.content)

    with zipfile.ZipFile(glove_zip_path, 'r') as zip_ref:
        zip_ref.extractall()

    os.remove(glove_zip_path)

def load_glove_embeddings(file_path, embedding_dim=50):
    word_embeddings = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            word_embeddings[word] = torch.tensor(vector)
    return word_embeddings

glove = load_glove_embeddings(glove_file_path)

def tokenize_question(question):
    return question.lower().split()

def tokens_to_embeddings(tokens, embedding_model, max_length=23):
    embeddings = []
    for token in tokens:
        embedding = embedding_model.get(token, torch.zeros(50))
        embeddings.append(embedding)

    if len(embeddings) < max_length:
        padding = [torch.zeros(50)] * (max_length - len(embeddings))
        embeddings.extend(padding)
    else:
        embeddings = embeddings[:max_length]

    return torch.stack(embeddings)

def create_answer_idx(train_annotations_file, val_annotations_file):
    all_answers = set()
    with open(train_annotations_file, 'r') as f:
        train_annotations = json.load(f)['annotations']
        for annotation in train_annotations:
            all_answers.add(annotation['multiple_choice_answer'])

    with open(val_annotations_file, 'r') as f:
        val_annotations = json.load(f)['annotations']
        for annotation in val_annotations:
            all_answers.add(annotation['multiple_choice_answer'])

    answer_to_idx = {answer: idx for idx, answer in enumerate(sorted(all_answers))}
    idx_to_answer = {idx: answer for idx, answer in enumerate(sorted(all_answers))}
    return answer_to_idx, idx_to_answer

class VQADataset(Dataset):
    def __init__(self, img_dir, questions_file, annotations_file, answer_to_idx, transform=None, max_length=23):
        self.img_dir = img_dir
        self.transform = transform
        self.max_length = max_length
        with open(questions_file, 'r') as f:
            self.questions_data = json.load(f)['questions']
        with open(annotations_file, 'r') as f:
            self.annotations_data = json.load(f)['annotations']
        self.answer_to_idx = answer_to_idx

    def __len__(self):
        return len(self.questions_data)

    def __getitem__(self, idx):
        img_id = self.questions_data[idx]['image_id']
        if 'train' in self.img_dir:
            img_path = os.path.join(self.img_dir, f"COCO_train2014_{img_id:012d}.jpg")
        else:
            img_path = os.path.join(self.img_dir, f"COCO_val2014_{img_id:012d}.jpg")

        img = Image.open(img_path)

        if self.transform:
            img = self.transform(img)

        question = self.questions_data[idx]['question']
        question_tokens = tokenize_question(question)
        question_embeddings = tokens_to_embeddings(question_tokens, glove, self.max_length)

        answer = self.annotations_data[idx]['multiple_choice_answer']
        answer_idx = self.answer_to_idx[answer]

        return img, question_embeddings, answer_idx

img_dir_train = 'VQA_data/train2014'
img_dir_val = 'VQA_data/val2014'
questions_file_train = 'VQA_data/v2_OpenEnded_mscoco_train2014_questions.json'
annotations_file_train = 'VQA_data/v2_mscoco_train2014_annotations.json'
questions_file_val = 'VQA_data/v2_OpenEnded_mscoco_val2014_questions.json'
annotations_file_val = 'VQA_data/v2_mscoco_val2014_annotations.json'

answer_to_idx, idx_to_answer = create_answer_idx(annotations_file_train, annotations_file_val)

train_dataset = VQADataset(
    img_dir_train,
    questions_file_train,
    annotations_file_train,
    answer_to_idx,
    transform=transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    max_length=23
)

val_dataset = VQADataset(
    img_dir_val,
    questions_file_val,
    annotations_file_val,
    answer_to_idx,
    transform=transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    max_length=23
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

for images, question_embeddings, answers in train_loader:
    print(f"Batch images: {images.shape}")
    print(f"Batch question embeddings: {question_embeddings.shape}")
    print(f"Batch answers: {answers.shape}")
    break


Batch images: torch.Size([32, 3, 224, 224])
Batch question embeddings: torch.Size([32, 23, 50])
Batch answers: torch.Size([32])
