- Ref and Codes: https://iamaaditya.github.io/2016/04/visual_question_answering_demo_notebook
- Ref Git: https://github.com/iamaaditya/VQA_Keras
- Ref Git: https://github.com/iamaaditya/VQA_Demo
- **Copied and slightly modified from the repos above for self-learning purpose**

<img src="http://i.imgur.com/Za5P1ZZ.png" width="700">

# Import packages

In [1]:
%matplotlib inline
import warnings
import os, argparse
import cv2, spacy, numpy as np
from keras.models import model_from_json
from keras.optimizers import SGD
from sklearn.externals import joblib

from keras.models import Sequential
from keras.layers.core import Flatten, Dense, Dropout
from keras.layers.convolutional import MaxPooling2D, ZeroPadding2D
from keras.layers.convolutional import Conv2D as Convolution2D
import numpy as np

Using TensorFlow backend.


# Pretrained model files

In [13]:
# model with input (embedding question vector, extracted image 4096 vector), and output [0,1000] classification
VQA_model_file_name      = './model/VQA/VQA_MODEL.json' 
VQA_weights_file_name   = './model/VQA/VQA_MODEL_WEIGHTS.hdf5'
# convert from [0,1000] to label text
label_encoder_file_name  = './model/VQA/FULL_labelencoder_trainval.pkl'
# model with input (224 * 224) image and output 4096 feature vector
CNN_weights_file_name   = './model/VQA/vgg16_weights.h5'

# VGG model

## Pop one model layer

In [7]:
def pop(model):
    '''Removes a layer instance on top of the layer stack.
    This code is thanks to @joelthchao https://github.com/fchollet/keras/issues/2371#issuecomment-211734276
    '''
    if not model.outputs:
        raise Exception('Sequential model cannot be popped: model is empty.')
    else:
        model.layers.pop()
        if not model.layers:
            model.outputs = []
            model.inbound_nodes = []
            model.outbound_nodes = []
        else:
            model.layers[-1].outbound_nodes = []
            model.outputs = [model.layers[-1].output]
        model.built = False

    return model

## Load pretrained model weights

In [3]:
def load_model_legacy(model, weight_path):
    ''' this function is used because the weights in this model
    were trained with legacy keras. New keras does not support loading these weights '''

    import h5py
    f = h5py.File(weight_path, mode='r')
    flattened_layers = model.layers

    nb_layers = f.attrs['nb_layers']

    for k in range(nb_layers):
        g = f['layer_{}'.format(k)]
        weights = [g['param_{}'.format(p)] for p in range(g.attrs['nb_params'])]
        if not weights: continue
        if len(weights[0].shape) >2: 
            # swap conv axes
            # note np.rollaxis does not work with HDF5 Dataset array
            weights[0] = np.swapaxes(weights[0],0,3)
            weights[0] = np.swapaxes(weights[0],0,2)
            weights[0] = np.swapaxes(weights[0],1,2)
        flattened_layers[k].set_weights(weights)

    f.close()

## Define model layers

In [8]:
def VGG_16(weights_path=None):
    model = Sequential()
    model.add(ZeroPadding2D((1,1),input_shape=(3,224,224)))
    model.add(Convolution2D(64,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(64,( 3, 3), activation='relu'))
    model.add(MaxPooling2D((2,2), strides=(2,2), padding='same'))

    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(128,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(128,( 3, 3), activation='relu'))
    model.add(MaxPooling2D((2,2), strides=(2,2), padding='same'))

    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(256,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(256,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(256,( 3, 3), activation='relu'))
    model.add(MaxPooling2D((2,2), strides=(2,2), padding='same'))

    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(MaxPooling2D((2,2), strides=(2,2), padding='same'))

    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(ZeroPadding2D((1,1)))
    model.add(Convolution2D(512,( 3, 3), activation='relu'))
    model.add(MaxPooling2D((2,2), strides=(2,2), padding='same'))

    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1000, activation='softmax'))
    
    if weights_path:
        # model.load_weights(weights_path)
        load_model_legacy(model, weights_path)

    #Remove the last two layers to get the 4096D activations
    model = pop(model)
    model = pop(model)
        

    return model

# Model Inputs

## Extract raw image features as `VGG` input

extract/resize raw image features with `cv2`

In [9]:
def get_image_features(image_file_name):
    ''' Runs the given image_file to VGG 16 model and returns the 
    weights (filters) as a 1, 4096 dimension vector '''
    image_features = np.zeros((1, 4096))
    # Magic_Number = 4096  > Comes from last layer of VGG Model

    # Since VGG was trained as a image of 224x224, every new image
    # is required to go through the same transformation
    im = cv2.resize(cv2.imread(image_file_name), (224, 224))
    im = im.transpose((2,0,1)) # convert the image to RGBA

    
    # this axis dimension is required because VGG was trained on a dimension
    # of 1, 3, 224, 224 (first axis is for the batch size
    # even though we are using only one image, we have to keep the dimensions consistent
    im = np.expand_dims(im, axis=0) 

    image_features[0,:] = model_vgg.predict(im)[0]
    return image_features

## Embed question text with pretrained word vectors as `VQA` input

In [10]:
def get_question_features(question):
    ''' For a given question, a unicode string, returns the time series vector
    with each word (token) transformed into a 300 dimension representation
    calculated using Glove Vector '''
    word_embeddings = spacy.load('en_vectors_web_lg')
    tokens = word_embeddings(question)
    question_tensor = np.zeros((1, 30, 300))
    for j in range(len(tokens)):
        question_tensor[0,j,:] = tokens[j].vector
    return question_tensor

# Model Loading

## get VGG model with loaded weights

In [14]:
def get_image_model(CNN_weights_file_name):
    image_model = VGG_16()
    sgd = SGD(lr=0.1, decay=1e-6, momentum=0.9, nesterov=True)
    image_model.compile(optimizer=sgd, loss='categorical_crossentropy')
    return image_model

In [15]:
from keras.utils import plot_model
model_vgg = get_image_model(CNN_weights_file_name)
plot_model(model_vgg, to_file='model_vgg.png')

## get VQA model with loaded weights

In [16]:
def get_VQA_model(VQA_model_file_name, VQA_weights_file_name):
    ''' Given the VQA model and its weights, compiles and returns the model '''

    # thanks the keras function for loading a model from JSON, this becomes
    # very easy to understand and work. Alternative would be to load model
    # from binary like cPickle but then model would be obfuscated to users
    vqa_model = model_from_json(open(VQA_model_file_name).read())
    vqa_model.load_weights(VQA_weights_file_name)
    vqa_model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
    return vqa_model

In [17]:
warnings.filterwarnings("ignore")
model_vqa  = get_VQA_model(VQA_model_file_name, VQA_weights_file_name)
plot_model(model_vqa, to_file='model_vqa.png')

# Test with an example

In [18]:
image_file_name = './fig/test.jpg'
question = u"What vehicle is in the picture?"

<img src="./fig/test.jpg" width="400">

In [19]:
# get the raw image features
image_features = get_image_features(image_file_name)

In [20]:
# get the raw question features
question_features = get_question_features(question)

In [21]:
y_output = model_vqa.predict([question_features, image_features])
warnings.filterwarnings("ignore")
# This task here is represented as a classification into a 1000 top answers
# this means some of the answers were not part of training and thus would 
# not show up in the result.
# These 1000 answers are stored in the sklearn Encoder class
labelencoder = joblib.load(label_encoder_file_name)
for label in reversed(np.argsort(y_output)[0,-5:]):
    print (str(round(y_output[0,label]*100,2)).zfill(5), "% ", labelencoder.inverse_transform(label))

20.16 %  piano
17.09 %  train
15.23 %  bus
011.1 %  bicycle
09.82 %  airplane
