In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

In [None]:
!pip install -q tensorflow==2.0.0-alpha0
import tensorflow as tf


#Generating plots of images

import matplotlib.pyplot as plt

#Scikit learn 

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle

In [None]:
annotation_zip = tf.keras.utils.get_file('captions.zip', 
                                         cache_subdir=os.path.abspath('.'),
                                         origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                         extract = True)
annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'

name_of_zip = 'train2014.zip'
if not os.path.exists(os.path.abspath('.') + '/' + name_of_zip):
  image_zip = tf.keras.utils.get_file(name_of_zip,
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip) + '/train2014/'
else:
  PATH = os.path.abspath('.') + '/train2014/'
  

In [None]:
print(PATH)
!wget https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Train_mscoco.zip
! unzip -a v2_Questions_Train_mscoco.zip
!wget https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Annotations_Train_mscoco.zip
! unzip -a v2_Annotations_Train_mscoco.zip

In [None]:
#storing caption and image name in vectors
import collections
import operator

annotation_file = 'v2_mscoco_train2014_annotations.json'

with open(annotation_file, 'r') as f:
  annotations = json.load(f)

all_answers = []
all_answers_qids = []
all_img_name_vector = []

for annot in annotations['annotations']:
  #print(annot)
  ans_dic = collections.defaultdict(int)
  for each in annot['answers']:
    diffans = each['answer']
    if diffans in ans_dic:
      #print(each['answer_confidence'])
      if each ['answer_confidence'] =='yes':
        ans_dic[diffans]+=4
      if each ['answer_confidence'] =='maybe':
        ans_dic[diffans]+=2
      if each ['answer_confidence'] =='no':
        ans_dic[diffans]+=1
    else:
      if each ['answer_confidence'] =='yes':
        ans_dic[diffans] = 4
      if each ['answer_confidence'] =='maybe':
        ans_dic[diffans] = 2
      if each ['answer_confidence'] =='no':
        ans_dic[diffans] = 1
  #print(ans_dic)
  most_fav = max(ans_dic.items(), key=operator.itemgetter(1))[0]
  #print(most_fav)
  caption = '<start> ' + most_fav + '<end> ' #each['answer']

  image_id = annot['image_id']
  question_id = annot['question_id']
  full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)

  all_img_name_vector.append(full_coco_image_path)
  all_answers.append(caption)
  all_answers_qids.append(question_id)

In [None]:
#read json file
question_file = 'v2_OpenEnded_mscoco_train2014_questions.json'
with open(question_file, 'r') as f:
  questions=json.load(f)

#storing captions and image name in vectors
question_ids = []
all_questions =[]
all_img_name_vector_2 = []

for annot in questions['questions']:
  caption = '<start> ' + annot['question'] + ' <end>'
  image_id = annot['image_id']
  full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)

  all_img_name_vector_2.append(full_coco_image_path)
  all_questions.append(caption)
  question_ids.append(annot['question_id'])

In [None]:
print(len(all_img_name_vector), len(all_answers), len(all_answers_qids))
print(all_img_name_vector[10:15], all_answers[10:15], all_answers_qids[10:15])
print(len(all_img_name_vector), len(all_questions), len(question_ids))
print(all_img_name_vector_2[10:15], all_questions[10:15], question_ids[10:15])


In [None]:
train_answers, train_questions, img_name_vector = shuffle(all_answers, all_questions,
                                                          all_img_name_vector,
                                                          random_state=1)
num_examples = 1000
train_answers = train_answers[:num_examples]
train_questions = train_questions[:num_examples]
img_name_vector = img_name_vector[:num_examples]

print(img_name_vector[0], train_questions[0], train_answers[0])

print(len(img_name_vector), len(train_questions), len(train_answers))

Image Feature Vector using VGG

In [None]:
def load_image(image_path):
  img = tf.io.read_file(image_path)
  img = tf.image.decode_jpeg(img, channels=3)
  img = tf.image.resize(img, (299,299))
  img = tf.keras.applications.inception_v3.preprocess_input(img)
  return img, image_path

In [None]:
image_model = tf.keras.applications.InceptionV3(include_top = False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [None]:
#getting unique images
encode_train = sorted(set(img_name_vector))

#changing batch_size according to system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
    load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

for img, path in image_dataset:
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))
# print(batch_features.shape)

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())
    

In [None]:
def calc_max_length(tensor):
  return max(len(t) for t in tensor)

In [None]:
# choosing top 10000 words from vocab
top_k = 10000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[/]^_`{|}~')
tokenizer.fit_on_texts(train_questions)
train_questions_seqs = tokenizer.texts_to_sequences(train_questions)

#new edit
print(tokenizer.word_index)
ques_vocab = tokenizer.word_index
#print(train_question_seqs)

In [None]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
#creating tokenized vectors
train_questions_seqs = tokenizer.texts_to_sequences(train_questions)

In [None]:
#padding each vector to max_len of captions
#if max_len paramter is not provided , pad_sequences calculates that automatically
question_vector = tf.keras.preprocessing.sequence.pad_sequences(train_questions_seqs, padding='post')
#cap_vector

In [None]:
#calculating max_len
#used to store attention weights
max_length = calc_max_length(train_questions_seqs)
print(max_length)

#new edit
max_q = max_length

Creating answers one hot vectors

In [None]:
from numpy import array
from numpy import argmax

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

data=train_answers
values=array(data)
print(values[:10])

label_encoder = LabelEncoder()
integer_encoded = label_encoder.fit_transform(values)

ans_vocab = {l: i for i, l in enumerate(label_encoder.classes_)}
print(ans_vocab)

onehot_encoder = OneHotEncoder(sparse = False)
integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
onehot_encoded = onehot_encoder.fit_transform(integer_encoded)

print(onehot_encoded[0], len(onehot_encoded))

answer_vector = onehot_encoded
len_ans_vocab = len(onehot_encoded[0])

# answer_vector = integer_encoded
# ans_vocab = {l: i for i, l in enumerate(label_encoder.classes_)}
print(answer_vector)

In [None]:
print(len(question_vector[0]), len(answer_vector[0]))

Test Train Split

In [None]:
img_name_train, img_name_val, question_train, question_val, answer_train, answer_val = train_test_split(img_name_vector,
                                                                                                        question_vector,
                                                                                                        answer_vector,
                                                                                                        test_size=0.2,
                                                                                                        random_state=0)

In [None]:
print(len(img_name_train), len(img_name_val), len(question_train), len(question_val), len(answer_train), len(answer_val))

In [None]:
question_train.shape

In [None]:
# changing parameters according to system configuration
BATCH_SIZE = 16 #64
BUFFER_SIZE = 1 #1000
#embedding_dim = 256
#units = 512
#vocab_size = len(tokenizer.word_index)+1
num_steps = len(img_name_train) // BATCH_SIZE
#shape of vector from InceptionV3 is (64,2048)
#these two variables represent that 
features_shape = 2048
attention_features_shape = 64

In [None]:
#loading the numpy files
def map_func(img_name, cap, ans):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap, ans

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, question_train.astype(np.float32), answer_train.astype(np.float32)))

#using map to load the numpy file in parallel
dataset = dataset.map(lambda item1, item2, item3: tf.numpy_function(map_func, [item1, item2, item3], [tf.float32, tf.float32, tf.float32]),
                      num_parallel_calls=tf.data.experimental.AUTOTUNE)

#shuffling and batching
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
test_dataset = tf.data.Dataset.from_tensor_slices((img_name_val, question_val.astype(np.float32), answer_val.astype(np.float32)))

#using map to load numpy file in parallel
test_dataset = test_dataset.map(lambda item1, item2, item3: tf.numpy_function(
    map_func, [item1, item2, item3], [tf.float32, tf.float32, tf.float32]),
                      num_parallel_calls=tf.data.experimental.AUTOTUNE)


In [None]:
dataset

Model 1

Appending Image as Word Model Treats image as word and inserts it at the end of the question. The updated word sequence is then fed to LSTM

In [None]:
class AppendImageAsWordModel(tf.keras.Model):
  def __init__(self, embedding_size, rnn_size, output_size):
    super(AppendImageAsWordModel, self).__init__()
    self.flatten = tf.keras.layers.Flatten()
    self.condense = tf.keras.layers.Dense(embedding_size, activation='relu')

    #add embedding layers for questions
    self.embedding = tf.keras.layers.Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=embedding_size)

    #creating input
    self.gru = tf.keras.layers.GRU(rnn_size, 
                                   return_sequences = False,
                                   return_state = False)
    self.logits=tf.keras.layers.Dense(output_size, activation='softmax')
  
  def call(self, x, sents, hidden):
    flattened_output = self.flatten(x)
    condensed_out = self.condense(flattened_output)
    #print(condensed_out.shape)
    condensed_out = tf.expand_dims(condensed_out, axis=1)
    #print(condensed_out.shape)
    sents = self.embedding(sents)
    #print(sents.shape)
    input_s = tf.concat([sents, condensed_out], axis=1)
    #print (input_s.shape)
    output = self.gru(input_s, initial_state = hidden)
    final_output = self.logits(output)
    #print(final_output.shape)
    return final_output
  
  def init_state(self, batch_size, rnn_size):
    return tf.zeros((batch_size, rnn_size))


In [None]:
append_image_word_model = AppendImageAsWordModel(256, 256, len_ans_vocab)
crossentropy = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

def calc_loss(labels, logits):
  return crossentropy(labels, logits)

optimizer_append = tf.keras.optimizers.Adam()

# @tf.function
def train_step(input_imgs, input_sents, labels, initial_state):
  with tf.GradientTape() as tape:
    my_model_output = append_image_word_model(input_imgs, input_sents, initial_state)
    loss = calc_loss(labels, my_model_output)
  variables = append_image_word_model.trainable_variables
  gradients = tape.gradient(loss, variables)
  optimizer_append.apply_gradients(zip(gradients, variables))
  return loss

EPOCHS = 40
for epoch in range(EPOCHS):
  init_states = append_image_word_model.init_state(BATCH_SIZE, 256)
  for (batch, (img_tensor, question, answer)) in enumerate(dataset):
    loss = train_step(img_tensor, question, answer, init_states)
  if epoch%10==0:
    print("Epoch #%d, Loss %.4f" % (epoch, loss))

Model 2 
Separate Image as Word Model

In [None]:
class SeparateImageAsWordModel(tf.keras.Model):
  def __init__(self, embedding_size, rnn_size, output_size):
    super(SeparateImageAsWordModel, self).__init__()
    self.flatten = tf.keras.layers.Flatten()
    self.condense = tf.keras.layers.Dense(embedding_size, activation='relu')

    #add embedding layer for questions
    self.embedding = tf.keras.layers.Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=embedding_size)

    #create the input
    self.gru = tf.keras.layers.GRU(rnn_size, 
                                   return_sequences = False,
                                   return_state = False)
    self.logits = tf.keras.layers.Dense(output_size, activation = 'softmax')

  def call(self, x, snets, hidden):
    flattened_output = self.flatten(x)
    condensed_out = self.condense(flattened_output)

    #print(condensed_out.shape)
    condensed_out = tf.expand_dims(condensed_out, axis = 1)
    #print(condensed_out.shape)
    sents = self.embedding(sents)
    sent_lstm_output = self.gru(sents, initial_state=hidden) #run lstm on questions sent
    sent_lstm_output = tf.expand_dims(sent_lstm_output, axis=1)
    #print(sent_lstm_output.shape)
    output = tf.concat([sent_lstm_output, condensed_out], axis=2) #word and image embedding side by side
    #print(output.shape)
    final_output = self.logits(output)
    return final_output

  def init_state(self, batch_size, rnn_size):
    return tf.zeros((batch_size, rnn_size))

Model 3 Co Attention Model

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Dropout, Embedding, LSTM, Activation, ZeroPadding1D, Conv1D

class CoattentionModel(tf.keras.Model):
  def __init__(self, ans_vocab, max_q, ques_vocab):
    super(CoattentionModel, self).__init__(name='CoattentionModel')
    self.ans_vocab = ans_vocab
    self.max_q = max_q
    self.ques_vocab = ques_vocab

    self.ip_dense = Dense(256, activation ='relu', input_shape=(512, ))
    num_words = len(ques_vocab)+2
    self.word_level_feats = Embedding(input_dim = len(ques_vocab)+2, output_dim=256)
    self.lstm_layer = LSTM(256, return_sequences=True, input_shape = (None, max_q, 256))
    self.dropout_layer = Dropout(0.5)
    self.tan_layer = Activation('tanh')
    self.dense_image = Dense(256, activation = 'relu', input_shape = (256,))
    self.dense_text = Dense(256, activation = 'relu', input_shape = (256,))
    self.image_attention = Dense(1, activation = 'softmax', input_shape = (256,))
    self.text_attention = Dense(1, activation = 'softmax', input_shape = (256,))
    self.dense_word_level = Dense(256, activation = 'relu', input_shape = (256,))
    self.dense_phrase_level = Dense(256, activation = 'relu', input_shape = (2*256,))
    self.dense_sent_level = Dense(256, activation = 'relu', input_shape = (2*256,))
    self.dense_final = Dense(len(ans_vocab), activation = 'relu', input_shape = (256,))

  def affinity(self, image_feat, text_feat, level, prev_att):
    img = self.dense_image(image_feat)
    text = self.dense_text(text_feat)

    if level==0:
      return self.dropout_layer(self.tan_layer(text))

    elif level==1:
      level=tf.expand_dims(self.dense_text(prev_att), 1)
      return self.dropout_layer(self.tan_layer(img+level))

    elif level==2:
      level=tf.expand_dims(self.dense_image(prev_att), 1)
      return self.dropout_layer(self.tan_layer(text+level))
        
  def attention_ques(self, text_feat, text):
    return tf.reduce_sum(self.text_attention(text) * text_feat,1)

  def attention_img(self, image_feat, img):
    return tf.reduce_sum(self.image_attention(img) * image_feat,1)
    
  def call(self, image_feat, question_encoding):
    #Preprocessing the image
    image_feat = self.ip_dense(image_feat)

    #Text Features

    #Text: Word Level
    word_feat = self.word_level_feats(question_encoding)

    #Text: Sentence Level
    sent_feat = self.lstm_layer(word_feat)

    #Apply attention on word level features
    word_text_attention = self.attention_ques(word_feat, self.affinity(image_feat, word_feat,0,0))
    word_img_attention = self.attention_img(image_feat, self.affinity(image_feat, word_feat,1,word_text_attention))
    word_text_attention = self.attention_ques(word_feat, self.affinity(image_feat, word_feat,2,word_img_attention))

    word_pred = self.dropout_layer(self.tan_layer(self.dense_word_level(word_img_attention + word_text_attention)))

    #Applying attention on sentence level features
    sent_text_attention = self.attention_ques(sent_feat, self.affinity(image_feat, sent_feat,0,0))
    sent_img_attention = self.attention_img(image_feat, self.affinity(image_feat, sent_feat,1,sent_text_attention))
    sent_text_attention = self.attention_ques(sent_feat, self.affinity(image_feat, sent_feat,2,sent_img_attention))

    sent_pred = self.dropout_layer(self.tan_layer(self.dense_sent_level(tf.concat([sent_img_attention + sent_text_attention, word_pred], -1))))

    return self.dense_final(sent_pred)



In [None]:
model = CoattentionModel(ans_vocab, max_q, ques_vocab)

loss_function = tf.keras.losses.CategoricalCrossentropy(from_logits=True) #sparse
optimizer = tf.keras.optimizers.Adam()
train_loss_metric = tf.keras.metrics.Mean(name='train_loss')
test_loss_metric = tf.keras.metrics.Mean(name='test_loss')

train_accuracy_metric = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')
test_accuracy_metric = tf.keras.metrics.CategoricalAccuracy(name='test_accuracy')


In [None]:
def train_step(images, questions, answers, model):
  with tf.GradientTape() as tape:

    #Forward pass
    predictions = model(images, questions)
    train_loss = loss_function(y_true=answers, y_pred=predictions)

    #Backward Pass
    gradients = tape.gradient(train_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    #Record Results
    train_loss_metric(train_loss)
    train_accuracy_metric(answers, predictions)

    def test_step(images, questions, answers, model):
      predictions = model(images, questions)
      test_loss = loss_function(y_true=answers, y_pred=predictions)

      #Record Results
      test_loss_metric(test_loss)
      test_accuracy_metric(answers, predictions)



In [None]:
EPOCHS = 40
train_loss = []
test_loss = []
train_acc = []
test_acc = []
for epoch in range(EPOCHS):
  for (batch, (img_tensor, question, answer)) in enumerate(dataset):
  #for images, labels in mnist train:
    train_step(img_tensor, question, answer, model)
  #for test_images, test_labels in mnist dataset:
  for (batch, (img_tensor, question, answer)) in enumerate(test_dataset):
    test_step(img_tensor, question, answer,model)

  template = 'Epoch : {}, Loss : {:.4f}, Accuracy : {:.2f}, Test Loss: {:.4f}, Test Accuracy : {:.2f}'
  train_loss.append(train_loss_metric.result())
  test_loss.append(test_loss_metric.result())
  train_acc.append(train_accuracy_metric.result()*100)
  test_acc.append(test_accuracy_metric.result()*100)
  print(template.format(epoch+1, 
                        train_loss_metric.result(),
                        train_accuracy_metric.result()*100,
                        test_loss_metric.result(),
                        test_accuracy_metric.result()*100))