In [1]:
import h5py
import json
import numpy as np

In [2]:
data = 'data/'
img_feat = data + 'VQA_image_features.h5'
img_ids = data + 'image_ids_vqa.json'
map_features_to_id = data + 'VQA_img_features2id.json'
img_info = data + 'imgid2imginfo.json'

questions_train = data + 'vqa_questions_train.json'
questions_validation = data + 'vqa_questions_valid.json'
questions_test = data + 'vqa_questions_test.json'

annotations_train = data + 'vqa_annotations_train.json'
annotations_validation = data + 'vqa_annotations_valid.json'
annotations_test = data + 'vqa_annotations_test.json'

In [3]:
def read_images():
    # load computed VQA image features from hdf5 file
    image_features = np.asarray(h5py.File(img_feat, 'r')['img_features'])

    # load IDs file
    with open(img_ids, 'r') as file:
        image_ids = json.load(file)['image_ids']

    # load feature mapping file
    with open(map_features_to_id, 'r') as file:
        feature_mapping = json.load(file)['VQA_imgid2id']

    # load info file
    with open(img_info, 'r') as file:
        image_info = json.load(file)

    return image_ids, image_features, feature_mapping, image_info

In [4]:
def read_text():
    with open(questions_train, 'r') as file:
        q_train = [[x['question'], x['image_id']] for x in json.load(file)['questions']]
    with open(questions_validation, 'r') as file:
        q_validation = [[x['question'], x['image_id']] for x in json.load(file)['questions']]
    with open(questions_test, 'r') as file:
        q_test = [[x['question'], x['image_id']] for x in json.load(file)['questions']]
        
    with open(annotations_train, 'r') as file:
        a_train = [x['multiple_choice_answer'] for x in json.load(file)['annotations']]
    with open(annotations_validation, 'r') as file:
        a_validation = [x['multiple_choice_answer'] for x in json.load(file)['annotations']]
    with open(annotations_test, 'r') as file:
        a_test = [x['multiple_choice_answer']  for x in json.load(file)['annotations']]
    
    return q_train, q_validation, q_test, a_train, a_validation, a_test

In [13]:
## Display image from URL

import os
from PIL import Image
from urllib import request

def show_image(imgid2info, id):
    img_name = 'temp-image.jpg'
    request.urlretrieve(imgid2info[str(id)]['flickr_url'], img_name)

    img = Image.open(img_name)
    img.show()

    os.remove(img_name)
    img.close()
    
#show_image(image_info, 111756)

In [14]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random

USE_GPU = True

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
    
print(device)

cuda


In [15]:
def image_id_to_features(image_id):
    feat = feature_mapping[str(image_id)]
    return image_features[feat]

In [16]:
q_train, q_validation, q_test, a_train, a_validation, a_test = read_text()

In [17]:
image_ids, image_features, feature_mapping, image_info = read_images()

In [18]:
## TO DO 
# Increase size of datasets

train_len = int(len(q_train)) 
validation_len = int(len(q_validation))
test_len =  int(len(q_test))

In [19]:
import sys
import torch
sys.path.append('/home/mau_engr/InferSent/')

from models import InferSent
V = 1
MODEL_PATH = '/home/mau_engr/InferSent/encoder/infersent%s.pkl' % V
params_model = {'bsize': 64, 'word_emb_dim': 300, 'enc_lstm_dim': 2048,
                'pool_type': 'max', 'dpout_model': 0.0, 'version': V}
infersent = InferSent(params_model)
infersent.load_state_dict(torch.load(MODEL_PATH))

W2V_PATH = '/home/mau_engr/InferSent/dataset/GloVe/glove.840B.300d.txt'
infersent.set_w2v_path(W2V_PATH)


In [20]:
qtrain_sents = [i[0].lower() for i in q_train]
qvalid_sents = [i[0].lower() for i in q_validation]
qtest_sents = [i[0].lower() for i in q_test]
total_vocab = qtrain_sents+qtest_sents+qvalid_sents

length = infersent.build_vocab(total_vocab)

qtrain_embeddings = infersent.encode(qtrain_sents, tokenize=True)
qvalid_embeddings = infersent.encode(qvalid_sents, tokenize=True)
qtest_embeddings = infersent.encode(qtest_sents, tokenize=True)

Found 7190(/7235) words with w2v vectors
Vocab size : 7190


In [21]:
import string
import nltk
from nltk.corpus import stopwords

sw = set(stopwords.words('english'))

def organize_data():
    # determine train data
    train_text = []
    train_images = []
    for i in range(train_len):        
        train_text.append((qtrain_embeddings[i], a_train[i]))
        train_images.append(image_id_to_features(q_train[i][1]))

    # determine validation data
    validation_text = []
    validation_images = []
    for i in range(validation_len): 
        validation_text.append((qvalid_embeddings[i], a_validation[i]))
        validation_images.append(image_id_to_features(q_validation[i][1]))

    # determine test data
    test_text = []
    test_images = []
    for i in range(test_len): 
        test_text.append((qtest_embeddings[i], a_test[i]))
        test_images.append(image_id_to_features(q_test[i][1]))
        
    return train_text, train_images, validation_text, validation_images, test_text, test_images

In [22]:
def shuffle_data(text_features, visual_features):
    combined = [(text, visual) for text, visual in zip(text_features, visual_features)]
    random.shuffle(combined)
    return [text for (text, _) in combined], [visual for (_, visual) in combined]

In [23]:
def vocabulary():
    annotation_vocab = {}
    annotation_vocab_lookup = []
    for question, answer in train_text:
        if answer not in annotation_vocab:
            annotation_vocab[answer] = len(annotation_vocab)
            annotation_vocab_lookup.append(answer)
    return annotation_vocab, annotation_vocab_lookup

In [24]:
from collections import defaultdict
import operator

def select_frequent_answers(train_text, train_images, maxAnswers):   
    answer_fq= defaultdict(int)
    for question, answer in train_text:
        answer_fq[answer] += 1
    sorted_fq = sorted(answer_fq.items(), key=operator.itemgetter(1), reverse=True)[0:maxAnswers]
    top_answers, top_fq = zip(*sorted_fq)
    new_train_text=[]
    new_train_images=[]
    
    for (ques, ans), img in zip(train_text, train_images):
        if ans in top_answers:
            new_train_text.append((ques, ans))
            new_train_images.append(img)

    return new_train_text, new_train_images

In [25]:
train_text, train_images, validation_text, validation_images, test_text, test_images = organize_data()
train_text, train_images = select_frequent_answers(train_text, train_images, 1000)
annotation_vocab, annotation_vocab_lookup = vocabulary()

n_questions = len(train_text)
annotation_vocab_size = len(annotation_vocab)
image_feature_length = len(train_images[0])
input_vector_size = image_feature_length+4096
print(n_questions)
print(annotation_vocab_size)

43502
1000


In [26]:
class BoWClassifier(nn.Module):  # inheriting from nn.Module!

    def __init__(self, num_labels, vocab_size):
        # calls the init function of nn.Module.  Dont get confused by syntax,
        # just always do it in an nn.Module
        super(BoWClassifier, self).__init__()

        # Define the parameters that you will need.  In this case, we need A and b,
        # the parameters of the affine mapping.
        # Torch defines nn.Linear(), which provides the affine map.
        # Make sure you understand why the input dimension is vocab_size
        # and the output is num_labels!
        self.linear1 = nn.Linear(vocab_size, 2048)
        
        self.linear2 = nn.Linear(2048, 1024)
        
        self.linear3 = nn.Linear(1024, num_labels)
        
        # NOTE! The non-linearity log softmax does not have parameters! So we don't need
        # to worry about that here

    def forward(self, bow_vec):
        # Pass the input through the linear layer,
        # then pass that through log_softmax.
        # Many non-linearities and other functions are in torch.nn.functional
        tanh1 = F.tanh(self.linear1(bow_vec))
        drop1 = F.dropout(tanh1, p=0.5)
        tanh2 = F.tanh(self.linear2(drop1))
        drop2 = F.dropout(tanh2, p=0.5)
        scores = F.log_softmax(self.linear3(drop2), dim=1)
        
        return scores

In [27]:
def make_target(label, annotation_vocab):
    return torch.LongTensor([annotation_vocab[label]]).to(device=device)

In [28]:
def train(model, num_epochs, batch_size):
    s = True
    for ep in range(num_epochs):
        ep_loss = 0
        
        for start in range(0, len(train_text), batch_size):
            text_batch = train_text[start:start+batch_size]
            image_batch = train_images[start:start+batch_size]
            in_mat = torch.zeros(batch_size, input_vector_size, device=device)
            out_vec = torch.zeros(batch_size, dtype=torch.long, device=device)
                
            for i, ((instance, label), image_features) in enumerate(zip(*shuffle_data(text_batch, image_batch))):
                text_features = torch.from_numpy(instance).to(device=device)
                visual_features = torch.from_numpy(image_features).to(device=device)
                infersent_vec = torch.cat((text_features, visual_features)).view(1,-1)
                target = make_target(label, annotation_vocab)
                
                in_mat[i] = infersent_vec
                out_vec[i] = target
            
            log_probs = model(in_mat) 

            batch_loss = loss_function(log_probs, out_vec)
            ep_loss += batch_loss
            
            optimizer.zero_grad()
            batch_loss.backward()
            optimizer.step()
        
    return model

In [29]:
def calculate_accuracy(model, text, image_features):
    with torch.no_grad():
        counter = 0
        i = 0
        for (question, actual_answer), visual_features in zip(text, image_features):
            text_features = torch.from_numpy(question).to(device=device)
            visual_features = torch.from_numpy(visual_features).to(device=device)
            infersent_vec = torch.cat((text_features, visual_features)).view(1,-1)
            log_probs = model(infersent_vec)
            value, index = torch.max(log_probs, 1)
            index = index.data[0]

            predicted_answer = annotation_vocab_lookup[index]
            #print("Question", validation_text[i])
            #print("Actual Answer", actual_answer)
            #print("Predicted Answer", predicted_answer)

            if predicted_answer == actual_answer:
                counter += 1
            i += 1
        accuracy = (float(counter) / len(text)) * 100
        return accuracy
    
def calculate_type_accuracy(model, text, image_features):
    with torch.no_grad():
        counter = 0
        yes_type = 0
        number_type = 0
        other_type = 0
        
        yes_right = 0
        number_right = 0
        other_right = 0
        
        i = 0
        for (question, actual_answer), visual_features in zip(text, image_features):
            text_features = torch.from_numpy(question).to(device=device)
            visual_features = torch.from_numpy(visual_features).to(device=device)
            infersent_vec = torch.cat((text_features, visual_features)).view(1,-1)
            log_probs = model(infersent_vec)
            value, index = torch.max(log_probs, 1)
            index = index.data[0]

            predicted_answer = annotation_vocab_lookup[index]
            #print("Question", validation_text[i])
            #print("Actual Answer", actual_answer)
            #print("Predicted Answer", predicted_answer)
            
            if actual_answer == 'yes' or actual_answer == 'no':
                yes_type += 1
                if predicted_answer == actual_answer:
                    yes_right += 1
            elif actual_answer.isdigit():
                number_type += 1
                if predicted_answer == actual_answer:
                    number_right += 1
            else:
                other_type += 1
                if predicted_answer == actual_answer:
                    other_right += 1

            if predicted_answer == actual_answer:
                counter += 1
            i += 1
        accuracy = (float(counter) / len(text)) * 100
        return accuracy, float(yes_right)/yes_type, float(number_right)/number_type, float(other_right)/other_type

In [None]:
epochs = [20]
learning_rates = [1e-4]
batch_sizes = [32]
best = 0
best_values = 0
best_model = 0

with torch.cuda.device(0):
    for epoch in epochs:
        for lr in learning_rates:
            for bs in batch_sizes:
                print('LR= ' + str(lr) + ', BS= ' + str(bs) + ', epochs= ' + str(epoch))
                model = BoWClassifier(annotation_vocab_size, input_vector_size).to(device=device)
                loss_function = nn.NLLLoss()
                optimizer = optim.Adam(model.parameters(), lr=lr)
                train_model = train(model, epoch, bs)  
                train_accuracy = calculate_accuracy(train_model, train_text, train_images)
                print(train_accuracy)
                validation_accuracy = calculate_accuracy(train_model, validation_text, validation_images)
                print(validation_accuracy)
                
                if validation_accuracy > best:
                    best = validation_accuracy
                    best_values = (epoch,lr,bs)
                    best_model = train_model

LR= 0.0001, BS= 32, epochs= 20




In [None]:
print(calculate_type_accuracy(best_model, test_text, test_images))