<a href="https://colab.research.google.com/github/zaidalyafeai/AttentioNN/blob/master/Attention_in_Image_Captioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction
Traditional image captioning models architectures suffer from a bottleneck problem. Usually, we use a pretrained model to extract fixed features that are fed directly to an RNN model to generate the caption. However, this representation affects the captioning result as we progress in time because we look at the image as a whole not in parts. The basic idea behind attention is forcing the model to assign weights to different parts of the image which makes the captioning process more effective. 

In [0]:
!pip install tensorflow-gpu==2.0.0

## Imports

In [0]:
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tqdm import tqdm
import glob
import re
import os
import json 
import cv2
import time
import random
from tensorflow.keras.applications.resnet50 import preprocess_input
print(tf.__version__)

2.0.0


## Dataset
We use Ms COCO 2014

In [0]:
annotation_zip = tf.keras.utils.get_file('captions.zip',
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'

name_of_zip = 'train2014.zip'
if not os.path.exists(os.path.abspath('.') + '/' + name_of_zip):
  image_zip = tf.keras.utils.get_file(name_of_zip,
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip)+'/train2014/'
else:
  PATH = os.path.abspath('.')+'/train2014/'

Downloading data from http://images.cocodataset.org/annotations/annotations_trainval2014.zip
Downloading data from http://images.cocodataset.org/zips/train2014.zip


## Feature Extraction Model

We use ResNet 50 to extract the features of each image. We remove the last few layers. The choice of the output layer is for performance reasons. 

In [0]:
image_model = tf.keras.applications.ResNet50(include_top=False,
                                                weights='imagenet', input_shape = (224, 224, 3))
new_input = image_model.input
hidden_layer = image_model.get_layer('conv5_block3_2_relu').output

feature_extraction_model = tf.keras.Model(new_input, hidden_layer)

Downloading data from https://github.com/keras-team/keras-applications/releases/download/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [0]:
list(feature_extraction_model.outputs[0].shape)

[None, 7, 7, 512]

## Extract and Save Features 

In [0]:
IMG_SIZE = 224

# helper function to extract the image path and the npy path to save the features
def create_img_npy_paths(imageid):
  append_zeros = ('').join(['0']*(12-len(imageid))) +imageid
  image_path = 'train2014/COCO_train2014_'+append_zeros+'.jpg'
  npy_path = f'features/{imageid}.npy'
  return image_path , npy_path 

# for each image we extract and save the features
def load_featurize_save(imageid, save = True):
  image_path, npy_path = create_img_npy_paths(imageid)

  #load and preprocess
  image = tf.image.decode_jpeg(tf.io.read_file(image_path), channels=3)
  image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
  image = preprocess_input(image)
  image = tf.expand_dims(image, 0)

  #extract the features 
  feature = feature_extraction_model(image)
  if save:
    np.save(npy_path, feature)
  return tf.squeeze(feature, 0)

In [0]:
# create a new foulder to save the features 
if not os.path.isdir('features'):
  os.makedirs('features')

feature_paths = []
caps = []
img_paths = []

# load the annotations, we only use 10000 image, captions pair 
y = json.load(open(annotation_file, 'r'))
annotations = np.random.choice(y['annotations'], 10000)    

# loop over the images, load them and extract then save features 
for element in tqdm(annotations):
  caption = element['caption']
  imageid = str(element['image_id'])
  load_featurize_save(imageid)
  image_path, npy_path = create_img_npy_paths(imageid)
  img_paths.append(image_path)
  feature_paths.append(npy_path)
  caps.append(caption)

100%|██████████| 10000/10000 [15:45<00:00, 10.58it/s]


## Captions Preprocessing

In this section we do some preprocessing. Our main task is to map each word to a unique index. 

In [0]:
def preprocess_cap(stmt):
  #remove new line character
  stmt = stmt.replace("\n", "")
  
  #only keep alphanumerics
  stmt = re.sub(r'([^\s\w]|_)+', "", stmt.lower().strip())
  
  #attach start, end special symbols 
  stmt = '<s> '+stmt+' <e>'
  
  return stmt

processed_caps = [preprocess_cap(cap) for cap in caps]
unique_words = set(' '.join(processed_caps).split(' '))
num_words = len(unique_words)

print('The number of unique words ', num_words)

The number of unique words  5000


Map each character to an integer using `tf.keras.preprocessing`

In [0]:
#helper function to find the largest statement in a corpus
def get_max_stmt(stmts):
  return max([len(stmt) for stmt in stmts])

def get_tensors_dicts(stmts):

  # how many words we use 
  most_frequent = 1000

  #tokenzie using spaces and convert to integers 
  tk = tf.keras.preprocessing.text.Tokenizer(split = ' ', filters = "", num_words= most_frequent, oov_token = '<u>')
  tk.fit_on_texts(stmts)

  # create the word indices and indices to words
  tk.word_index = {e:i for e,i in tk.word_index.items() if i < most_frequent} 
  tk.word_index['<p>'] = 0

  word2index = tk.word_index
  index2word = {word2index[k]:k for k in word2index.keys()}

  # convert the text to sequences
  sequences = tk.texts_to_sequences(stmts)

  #pad the sequences to have the same length 
  max_stmt = get_max_stmt(sequences)
  output = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen = max_stmt, padding = "post")  
  
  return output, word2index, index2word

Get the input tensors and output tensors

In [0]:
cap_tensors, word2index, index2word = get_tensors_dicts(processed_caps)

## Create Dataset

In [0]:
def parse_data(path, cap):
  features = np.load(path.decode('utf-8'))
  return features, cap

In [0]:
BATCH_SIZE = 64

#random split
paths_train, paths_valid, caps_train, caps_valid = train_test_split(feature_paths, cap_tensors, test_size=0.2)

#training dataset
train_dataset = tf.data.Dataset.from_tensor_slices((paths_train, caps_train)).shuffle(len(paths_train))
train_dataset = train_dataset.map(lambda item1, item2: tf.numpy_function(
          parse_data, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=True)

## Create Models
Instintiate some variables

In [0]:
units = 1024
embedding_dim = 256
feature_vector_shape = (49, 512) 
vocab_size = len(index2word)
caps_max_length = cap_tensors.shape[1]

## Attention Mechanism

Givn the output from the pretrained model as a list of features $a = [a_1, a_2, \cdots , a_n]$ and the input hidden state $h_0$ are processed by a special network called the attention network. This results in attention weights which are values between 0 and 1 that tell us which hidden states are most important to us at each stage of the decoder. In this notebook use the following network 

$$\text{Attention Network} = \text{softmax}(V(\tanh(W_1(a)+ W_2(h_0))))$$

Where $W_1,W_2$ and $V$ are dense layers with $units, units$ and $1$ neurons respectively. This results in an output tensor of size $[\text{batch_sz}, n, 1]$  called the attention_weights. Then the attention weights are multiplied element wise by $a$ to generate the context vector

$$\text{Context Vector} = \text{attention_weights} \odot a$$
Finally the context vector is concatenated by the embedded input vector as an input to the decoder.

### Encoder

In [0]:
def gru(units):
  return tf.keras.layers.GRU(units, 
                             return_sequences=True, 
                             return_state=True, 
                             recurrent_initializer='glorot_uniform')

def get_encoder(feature_vector_shape, embedding_dim, batch_sz):
  
    input = tf.keras.layers.Input(feature_vector_shape)
    
    # apply dense layer output x: [batch_sz, embedding_dim]
    x = tf.keras.layers.Dense(embedding_dim, activation='tanh')(input)
    
    return tf.keras.models.Model(inputs = input, outputs = x)

### Decoder

In [0]:
def get_decoder(vocab_size, embedding_dim, units, batch_sz):
  
  enc_output = tf.keras.layers.Input((feature_vector_shape[0], embedding_dim))
  enc_hidden = tf.keras.layers.Input((units,))
  dec_input = tf.keras.layers.Input((1,))

  W1 = tf.keras.layers.Dense(units)
  W2 = tf.keras.layers.Dense(units)
  V = tf.keras.layers.Dense(1)
      
  x = tf.keras.layers.Embedding(vocab_size, embedding_dim)(dec_input)
  
  #1. attention network output [batch_sz, feature_vector_size, 1]
  score = V(tf.nn.tanh(W1(enc_output) + W2(tf.expand_dims(enc_hidden, axis = 1))))

  #2. attention weights output [batch_sz, feature_vector_size , 1]
  attention_weights = tf.nn.softmax(score, axis = 1)

  #3. context_vector output [batch_sz, 1, units]
  context_vector = attention_weights * enc_output
  context_vector = tf.reduce_sum(context_vector, axis=1, keepdims = True)
  
  #3. concatenate with the output [batch_sz, 1, units + embedding_dim]
  x = tf.concat([x, context_vector], axis = -1)
  
  #4. apply GRU output x:[batch_sz, 1, units] h:[batch_sz, units]
  x, h = gru(units)(x)
  
  #5. reshape and dense output [batch_sz, vocab_size]
  x = tf.reduce_sum(x, axis = 1)
  output = tf.keras.layers.Dense(vocab_size)(x)
 
  return tf.keras.models.Model(inputs = [dec_input, enc_hidden, enc_output], outputs = [output, h, attention_weights])

In [0]:
encoder = get_encoder(feature_vector_shape, embedding_dim, BATCH_SIZE)
decoder = get_decoder(vocab_size, embedding_dim, units, BATCH_SIZE)

## Loss function

In [0]:
optimizer = tf.optimizers.Adam()

def loss_function(real, pred):
  # mask out the <u> and <p> tags because they don't contribute to captioning
  loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred, ) * (1 - np.equal(real, 0)) * (1 - np.equal(real, 1))
  return tf.reduce_mean(loss)

## Training

In [33]:
import time

EPOCHS = 30

for epoch in range(EPOCHS):
    start = time.time()
    
    total_loss = 0
    
    #loop over the training tensors 
    for (batch, (features, caps)) in enumerate(train_dataset):
      
        features = tf.reshape(features, (BATCH_SIZE, feature_vector_shape[0], feature_vector_shape[1]))

        
        loss = 0
        
        with tf.GradientTape() as tape:

            # encode the features
            enc_output = encoder(features)
            enc_hidden = tf.zeros((BATCH_SIZE, units))

            # create the initial input to the decoder 
            dec_input = tf.expand_dims([word2index['<s>']] * BATCH_SIZE, 1)      
            

            attention_sum = tf.zeros((BATCH_SIZE, 49, 1))

            # Teacher forcing - feeding the target as the next input
            for t in range(1, caps.shape[1]):
              
                # passing enc_output to the decoder
                predictions, enc_hidden, attention_weights = decoder([dec_input, enc_hidden, enc_output])
                attention_sum += attention_weights

                # evaluate the of captioning 
                loss += loss_function(caps[:, t], predictions)   

                # evaluate the next input 
                dec_input = tf.expand_dims(caps[:, t], 1)

            # Doubly stochastic regularization https://arxiv.org/abs/1502.03044
            # we want the sum of the weights accross t to sum to 1   
            loss += 0.05 * tf.reduce_sum((1-attention_sum)**2)

        #calculate the loss 
        batch_loss = (loss / int(caps.shape[1]))
        total_loss += batch_loss
        
        # backprop
        variables = encoder.variables + decoder.variables
        gradients = tape.gradient(loss, variables)
        optimizer.apply_gradients(zip(gradients, variables))
        
        N_BATCH = batch
    
    #show accumulative loss 
    print('Epoch {} Train Loss {:.4f}'.format(epoch + 1,
                                        total_loss / N_BATCH))

Epoch 1 Train Loss 0.2154
Epoch 2 Train Loss 0.1960
Epoch 3 Train Loss 0.1778
Epoch 4 Train Loss 0.1605
Epoch 5 Train Loss 0.1451
Epoch 6 Train Loss 0.1298
Epoch 7 Train Loss 0.1166
Epoch 8 Train Loss 0.1040
Epoch 9 Train Loss 0.0953
Epoch 10 Train Loss 0.0835


## Test

In [0]:
def postprocess_activations(activations):

  #resize and convert to image 
  output = cv2.resize(activations, (224, 224), )
  output = output/output.max() # maybe normalize
  output = output *255
  return 255 - output.astype('uint8')

def apply_heatmap(weights, img):
  #generate heat maps 
  heatmap = cv2.applyColorMap(weights, cv2.COLORMAP_JET)
  heatmap = cv2.addWeighted(heatmap, 0.6, img, 0.4, 0)
  return heatmap

def plot_output(output, words):
  fig = plt.figure(figsize = (20, 20))
  n = len(output)
  for i in range(n):
    a=fig.add_subplot(1,n,i+1)
    plt.subplots_adjust(wspace = 0.0005)
    plt.imshow(output[i])
    plt.axis('off')
    plt.title(words[i])
    
def caption(imageid):
 
    # load the image
    image_path, npy_path = create_img_npy_paths(imageid)
    numpy_image = cv2.imread(image_path)[:,:,::-1]
    numpy_image = cv2.resize(numpy_image, (224, 224))
    
    # extract the features  
    features = load_featurize_save(imageid, save = False)
    features = tf.reshape(features, (1, feature_vector_shape[0], feature_vector_shape[1]))
    
    #feed encoder 
    enc_out = encoder(features)

    enc_hidden = tf.zeros((1, units))
    # prepare first input to the decoder 
    dec_input = tf.expand_dims([word2index['<s>']], 0)
    
    result = ""
    words = ['']
    output = [numpy_image.copy()]

    for t in range(10):
        
        # feed decoder 
        predictions, enc_hidden, attention_weights = decoder([dec_input, enc_hidden, enc_out])

        # extract the attention weights and post process them 
        attention_image = attention_weights.numpy().reshape((7,7))
        heatmap = postprocess_activations(attention_image)

        output.append(apply_heatmap(heatmap, numpy_image))

        # predict next word 
        predicted_id = tf.argmax(predictions[0]).numpy()
        
        next_word = index2word[predicted_id]+ ' '
        result += next_word + ' '
        words.append(next_word)
        
        
        # exit on end token 
        if index2word[predicted_id] == '<e>':
            plot_output(output, words)
            return result
        
        # the predicted ID is fed back into the model
        dec_input = tf.expand_dims([predicted_id], 0)
    
    
    plot_output(output, words)
    return result

In [0]:
for path in annotations[0:10]:
  caption(str(path['image_id']))

# References

1. https://www.tensorflow.org/tutorials/text/image_captioning
2. https://medium.com/syncedreview/a-brief-overview-of-attention-mechanism-13c578ba9129