###Mount Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Since we've shared this drive with you, please use the correct file path from your drive since it'll go to SharedDrive for you
%cd '/content/drive/MyDrive/vqa'

/content/drive/MyDrive/vqa


In [4]:
!pip install opencv-python

Collecting opencv-python
  Obtaining dependency information for opencv-python from https://files.pythonhosted.org/packages/a1/f6/57de91ea40c670527cd47a6548bf2cbedc68cec57c041793b256356abad7/opencv_python-4.8.1.78-cp37-abi3-macosx_11_0_arm64.whl.metadata
  Downloading opencv_python-4.8.1.78-cp37-abi3-macosx_11_0_arm64.whl.metadata (19 kB)
Downloading opencv_python-4.8.1.78-cp37-abi3-macosx_11_0_arm64.whl (33.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.1/33.1 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0mm
[?25hInstalling collected packages: opencv-python
Successfully installed opencv-python-4.8.1.78


In [5]:
import os
#os.chdir("/content/drive/MyDrive/vqa")
os.chdir("./")

###Import relevant libraries

In [6]:
import os
import numpy as np
import torch
import torchvision.models as models
import torch.utils.data as data
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from PIL import Image
import re
import time
import cv2
import warnings

###Setting the constants and parameter values

In [9]:
input_dir = './input_dir/'
log_dir = './logs'
model_dir = './models'
# maximum length of question, the length in the VQA dataset is 26
max_qst_length = 30
# maximum number of answers
max_num_ans = 10
# embedding size of feature vector for image and question
embed_size = 1024
# embedding size of the word used as the input for the LSTM
word_embed_size = 300
# Number of layers in the LSTM
num_layers = 2
# Hidden size in the LSTM
hidden_size = 64
# Learning rate, step size and decay rate used while initializing the Step learning rate Scheduler
learning_rate = 0.001
step_size = 10
gamma = 0.1
#Number of epochs it is trained on
num_epochs = 30
#Batch size, number of workers and the steps after which the model parameters are saved
batch_size = 256
num_workers = 4
save_step = 1

#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('mps')

###Helper Functions for Handling Text

In [10]:
SENTENCE_SPLIT_REGEX = re.compile(r'(\W+)')

# create tokens
def tokenize(sentence):
    tokens = SENTENCE_SPLIT_REGEX.split(sentence.lower())
    tokens = [t.strip() for t in tokens if len(t.strip()) > 0]
    return tokens

# returns a file as list of lines
def load_str_list(fname):
    with open(fname) as f:
        lines = f.readlines()
    lines = [l.strip() for l in lines]
    return lines


# Tokenizes the text and then gives the index of the word from the vocab txt file of answers and questions
class VocabDict:

    def __init__(self, vocab_file):
        self.word_list = load_str_list(vocab_file)
        self.word2idx_dict = {w:n_w for n_w, w in enumerate(self.word_list)}
        self.vocab_size = len(self.word_list)
        self.unk2idx = self.word2idx_dict['<unk>'] if '<unk>' in self.word2idx_dict else None

    def idx2word(self, n_w):

        return self.word_list[n_w]

    def word2idx(self, w):
        if w in self.word2idx_dict:
            return self.word2idx_dict[w]
        elif self.unk2idx is not None:
            return self.unk2idx
        else:
            raise ValueError('word %s not in dictionary (while dictionary does not contain <unk>)' % w)

    def tokenize_and_index(self, sentence):
        inds = [self.word2idx(w) for w in tokenize(sentence)]

        return inds

###Building the Dataset and DataLoader

In [20]:
from data_loader import VqaDataset,get_loader

###Model: Image Encoding Block

In [21]:
#Block 1: Image Channel which creates the image embedding
class ImgEncoder(nn.Module):

    def __init__(self, embed_size):
        super(ImgEncoder, self).__init__()
        model = models.vgg19(pretrained=True)
        in_features = model.classifier[-1].in_features  # input size of feature vector
        model.classifier = nn.Sequential(
            *list(model.classifier.children())[:-1])    # remove last fc layer

        self.model = model                              # loaded model without last fc layer
        self.fc = nn.Linear(in_features, embed_size)    # feature vector of image

    def forward(self, image):

        with torch.no_grad():
            img_feature = self.model(image)                  # [batch_size, vgg16(19)_fc=4096]
        img_feature = self.fc(img_feature)                   # [batch_size, embed_size]

        l2_norm = img_feature.norm(p=2, dim=1, keepdim=True).detach()
        img_feature = img_feature.div(l2_norm)               # l2-normalized feature vector

        return img_feature

###Model: Question Encoding Block

In [22]:
class QstEncoder(nn.Module):

    def __init__(self, qst_vocab_size, word_embed_size, embed_size, num_layers, hidden_size):

        super(QstEncoder, self).__init__()
        self.word2vec = nn.Embedding(qst_vocab_size, word_embed_size)
        self.tanh = nn.Tanh()
        self.lstm = nn.LSTM(word_embed_size, hidden_size, num_layers)
        self.fc = nn.Linear(2*num_layers*hidden_size, embed_size)     # 2 for hidden and cell states

    def forward(self, question):

        qst_vec = self.word2vec(question)                             # [batch_size, max_qst_length=30, word_embed_size=300]
        qst_vec = self.tanh(qst_vec)
        qst_vec = qst_vec.transpose(0, 1)                             # [max_qst_length=30, batch_size, word_embed_size=300]
        _, (hidden, cell) = self.lstm(qst_vec)                        # [num_layers=2, batch_size, hidden_size=512]
        qst_feature = torch.cat((hidden, cell), 2)                    # [num_layers=2, batch_size, 2*hidden_size=1024]
        qst_feature = qst_feature.transpose(0, 1)                     # [batch_size, num_layers=2, 2*hidden_size=1024]
        qst_feature = qst_feature.reshape(qst_feature.size()[0], -1)  # [batch_size, 2*num_layers*hidden_size=2048]
        qst_feature = self.tanh(qst_feature)
        qst_feature = self.fc(qst_feature)                            # [batch_size, embed_size]

        return qst_feature

###Model: Combine Image Encoding and Question Encoding Block

In [23]:
class VqaModel(nn.Module):

    def __init__(self, embed_size, qst_vocab_size, ans_vocab_size, word_embed_size, num_layers, hidden_size):

        super(VqaModel, self).__init__()
        self.img_encoder = ImgEncoder(embed_size)
        self.qst_encoder = QstEncoder(qst_vocab_size, word_embed_size, embed_size, num_layers, hidden_size)
        self.tanh = nn.Tanh()
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(embed_size, ans_vocab_size)
        self.fc2 = nn.Linear(ans_vocab_size, ans_vocab_size)

    def forward(self, img, qst):

        img_feature = self.img_encoder(img)                     # [batch_size, embed_size]
        qst_feature = self.qst_encoder(qst)                     # [batch_size, embed_size]

        # Elementwise multiplication of image and question vectors for fusion
        combined_feature = torch.mul(img_feature, qst_feature)  # [batch_size, embed_size]
        combined_feature = self.tanh(combined_feature)
        combined_feature = self.dropout(combined_feature)
        combined_feature = self.fc1(combined_feature)           # [batch_size, ans_vocab_size=1000]
        combined_feature = self.tanh(combined_feature)
        combined_feature = self.dropout(combined_feature)
        combined_feature = self.fc2(combined_feature)           # [batch_size, ans_vocab_size=1000]

        return combined_feature

In [24]:
# Get data loader for train and test - it's a dictionary with the key train having the train dataloader and same for test
data_loader = get_loader(
        input_dir=input_dir,
        input_vqa_train='train.npy',
        input_vqa_valid='valid.npy',
        max_qst_length=max_qst_length,
        max_num_ans=max_num_ans,
        batch_size=batch_size,
        num_workers=num_workers)

qst_vocab_size = data_loader['train'].dataset.qst_vocab.vocab_size
ans_vocab_size = data_loader['train'].dataset.ans_vocab.vocab_size
ans_unk_idx = data_loader['train'].dataset.ans_vocab.unk2idx

# Initializing the model
model = VqaModel(
        embed_size=embed_size,
        qst_vocab_size=qst_vocab_size,
        ans_vocab_size=ans_vocab_size,
        word_embed_size=word_embed_size,
        num_layers=num_layers,
        hidden_size=hidden_size).to(device)

# Initializing the loss function
criterion = nn.CrossEntropyLoss()

# Choosing which parameters to update in the optimizer
params = list(model.img_encoder.fc.parameters()) \
      + list(model.qst_encoder.parameters()) \
      + list(model.fc1.parameters()) \
      + list(model.fc2.parameters())

# Initializing the optimizer and learning rate scheduler
optimizer = optim.Adam(params, lr=learning_rate)
scheduler = lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
last_time = 0

###Training Loop

In [25]:

for epoch in range(num_epochs):

    for phase in ['train', 'valid']:

        running_loss = 0.0
        running_corr = 0

        batch_step_size = len(data_loader[phase].dataset) / batch_size

        if phase == 'train':
            scheduler.step()
            model.train()
        else:
            model.eval()

        for batch_idx, batch_sample in enumerate(data_loader[phase]):

            image = batch_sample['image'].to(device)
            question = batch_sample['question'].to(device)
            label = batch_sample['answer_label'].to(device)
            multi_choice = batch_sample['answer_multi_choice']  # not tensor, list.

            optimizer.zero_grad()

            with torch.set_grad_enabled(phase == 'train'):

                output = model(image, question)      # size: [batch_size X ans_vocab_size=1000]
                _, pred = torch.max(output, 1)  # size: [batch_size]

                loss = criterion(output, label)

                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            # Evaluation metric
            running_loss += loss.item()
            running_corr += torch.stack([(ans == pred.cpu()) for ans in multi_choice]).any(dim=0).sum()

            # Print the average loss in a mini-batch.
            if batch_idx % 10 == 0:
                time_taken = time.time() - last_time
                time_left = (((batch_step_size - batch_idx) * time_taken)/10) * (num_epochs - epoch)
                print('| {} SET | Epoch [{:02d}/{:02d}], Step [{:04d}/{:04d}], Loss: {:.4f}, Time left: {:.2f} hr'
                      .format(phase.upper(), epoch+1, num_epochs, batch_idx, int(batch_step_size), loss.item(), time_left/3600))
                last_time = time.time()
        # Print the average loss and accuracy in an epoch.
        epoch_loss = running_loss / batch_step_size
        epoch_acc = running_corr.double() / len(data_loader[phase].dataset)

        print('| {} SET | Epoch [{:02d}/{:02d}], Loss: {:.4f}, Acc: {:.4f}\n'
              .format(phase.upper(), epoch+1, num_epochs, epoch_loss, epoch_acc))



        # Log the loss and accuracy in an epoch.
        with open(os.path.join(log_dir, '{}-log-epoch-{:02}.txt')
                  .format(phase, epoch+1), 'w') as f:
            f.write(str(epoch+1) + '\t'
                    + str(epoch_loss) + '\t'
                    + str(epoch_acc.item()))


    # Save the model check points.
    if (epoch+1) % save_step == 0:
        torch.save(model, os.path.join(model_dir, '-epoch-{:02d}.pt'.format(epoch+1)))

| TRAIN SET | Epoch [01/30], Step [0000/0057], Loss: 6.8989, Time left: 81396587.76 hr
| TRAIN SET | Epoch [01/30], Step [0010/0057], Loss: 4.3280, Time left: 1.10 hr
| TRAIN SET | Epoch [01/30], Step [0020/0057], Loss: 4.5443, Time left: 0.83 hr
| TRAIN SET | Epoch [01/30], Step [0030/0057], Loss: 4.2608, Time left: 0.63 hr


###Inference

In [None]:
image_path = './input_dir/Resized_Images/test_img.jpeg'
question = 'What does the sign say?'
saved_model = './models/best_modelb.pt'
max_qst_length=30

In [None]:
warnings.filterwarnings("ignore")

qst_vocab = load_str_list("/content/drive/MyDrive/vqa/dataset/vocab_questions.txt")
ans_vocab = load_str_list("/content/drive/MyDrive/vqa/dataset/vocab_answers.txt")
word2idx_dict = {w:n_w for n_w, w in enumerate(qst_vocab)}
unk2idx = word2idx_dict['<unk>'] if '<unk>' in word2idx_dict else None
qst_vocab_size = len(qst_vocab)
ans_vocab_size = len(ans_vocab)

def word2idx(w):
        if w in word2idx_dict:
            return word2idx_dict[w]
        elif unk2idx is not None:
            return unk2idx
        else:
            raise ValueError('word %s not in dictionary (while dictionary does not contain <unk>)' % w)

image = cv2.imread(image_path)
image = cv2.resize(image, dsize=(224,224), interpolation = cv2.INTER_AREA)
image = torch.from_numpy(image).float()
image = image.to(device)
image = image.unsqueeze(dim=0)
image = image.view(1,3,224,224)

try:
  q_list = list(question.split(" "))
except:
  q_list = list(question.split(1))

idx = 'valid'
qst2idc = np.array([word2idx('<pad>')] * max_qst_length)  # padded with '<pad>' in 'ans_vocab'
qst2idc[:len(q_list)] = [word2idx(w) for w in q_list]

question = qst2idc
question = torch.from_numpy(question).long()

question = question.to(device)
question = question.unsqueeze(dim=0)

net = torch.load(saved_model)
net = net.to(device)

net.eval()

output = net(image, question)
predicts = torch.softmax(output, 1)
probs, indices = torch.topk(predicts, k=5, dim=1)
probs = probs.squeeze()
indices = indices.squeeze()
print("predicted - probabilty")
for i in range(5):
    print("'{}' - {:.4f}".format(ans_vocab[indices[i].item()], probs[i].item()))

predicted - probabilty
'yes' - 0.3498
'no' - 0.2768
'green' - 0.0493
'4' - 0.0321
'1' - 0.0250
