# Image captioning with visual attention
This notebook is based on tensorflow page for image captioning.
https://www.tensorflow.org/tutorials/text/image_captioning

The model architecture is similar to https://arxiv.org/abs/1502.03044

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import keras

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle

Using TensorFlow backend.


In [2]:
tf.executing_eagerly()

True

## Download and prepare the MS-COCO dataset
The dataset contains over 82,000 images, each of which has at least 5 different caption annotations. The code below downloads and extracts the dataset automatically.

In [3]:
class MsCoco:
    
    @staticmethod
    def __download_annotations(name_of_zip='captions.zip'):
        annotation_zip = keras.utils.get_file(
            name_of_zip,
            cache_subdir=os.path.abspath('.'),
            origin='http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
            extract=True)
        return f'{os.path.dirname(annotation_zip)}/annotations/captions_train2014.json'
    
    @staticmethod
    def __download_dataset(name_of_zip='train2014.zip', folder_to_extract='train2014'):
        if not os.path.exists(f'{os.path.abspath(".")}/{name_of_zip}'):
            image_zip = keras.utils.get_file(
                name_of_zip,
                cache_subdir=os.path.abspath('.'),
                origin='http://images.cocodataset.org/zips/train2014.zip',
                extract=True)
            return f'{os.path.dirname(image_zip)}/{folder_to_extract}/'
        else:
            return f'{os.path.abspath(".")}/{folder_to_extract}/'

    def download(self, dataset_name_of_zip='train2014.zip',
                 dataset_folder_to_extract='train2014',
                 annotations_name_of_zip='captions.zip'):
        return self.__download_annotations(annotations_name_of_zip), \
               self.__download_dataset(dataset_name_of_zip, dataset_folder_to_extract)

In [4]:
dataset_mscoco = MsCoco()

In [5]:
annotation_file, train_folder = dataset_mscoco.download()

## Get images vector and annotations

In [6]:
with open(annotation_file, 'r') as f:
    annotations = json.load(f)

In [7]:
all_captions = []
all_img_name_vector = []

In [8]:
for annotation in annotations['annotations']:
    caption = f'<start>{annotation["caption"]}<end>'
    image_id = annotation['image_id']
    coco_image_path = train_folder + 'COCO_train2014_' + '%012d.jpg' % (image_id)

    all_img_name_vector.append(coco_image_path)
    all_captions.append(caption)

### Suffle dataset

In [9]:
train_captions, img_name_vector = shuffle(all_captions,
                                          all_img_name_vector)

In [10]:
len(train_captions), len(all_captions)

(414113, 414113)

## Preprocess the images using InceptionV3

First, I will convert the images into InceptionV3's expected format by: * Resizing the image to 299px by 299px *

In [11]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

## Initialize InceptionV3 and load the pretrained Imagenet weights

In [12]:
image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')

model_input = image_model.input
model_output = image_model.layers[-1].output

In [13]:
image_features_extract_model = tf.keras.Model(model_input, model_output)

### Create image dataset

In [14]:
encode_train = sorted(set(img_name_vector))

In [15]:
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(64)

### Cache processing images
Processing the images with convolutional layers from InceptionV3 and delete the original file for free up storage

for more convenience, I will be install the `tqdm` for see the progress bar in donwload

In [20]:
!pip install -q tqdm

In [21]:
from tqdm import tqdm

In [None]:
for img, path in tqdm(image_dataset):
    batch_features = image_features_extract_model(img)
    batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

    for bf, p in zip(batch_features, path):
        path_of_feature = p.numpy().decode("utf-8")
        np.save(path_of_feature, bf.numpy())

59it [06:20,  6.35s/it]

## Preprocess and tokenize the captions

Find the maximum length of any caption in our dataset

In [None]:
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

Choose the top 10000 words from the vocabulary

In [None]:
top_k = 10000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')

In [None]:
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)

In [None]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

Create the tokenized vectors

In [None]:
train_seqs = tokenizer.texts_to_sequences(train_captions)

Pad each vector to the max_length of the captions

If you do not provide a max_length value, pad_sequences calculates it automatically

In [None]:
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

Calculates the max_length, which is used to store the attention weights

In [None]:
max_length = calc_max_length(train_seqs)

## Split the data into training and testing

Create training and validation sets using an 80-20 split

In [None]:
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,
                                                                    cap_vector,
                                                                    test_size=0.2,
                                                                    random_state=0)

In [None]:
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val)

(331290, 331290, 82823, 82823)

## Create a tf.data dataset for training

Feel free to change these parameters according to your system's configuration

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index) + 1
num_steps = len(img_name_train)

Shape of the vector extracted from InceptionV3 is (64, 2048)

These two variables represent that vector shape

In [None]:
features_shape = 2048
attention_features_shape = 64

### Load numpy files

In [None]:
def map_func(img_name, cap):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

In [None]:
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)

Shuffle and batch

In [None]:
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

## Model

In [None]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

        # hidden shape == (batch_size, hidden_size)
        # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        # score shape == (batch_size, 64, hidden_size)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        # attention_weights shape == (batch_size, 64, 1)
        # you get 1 at the last axis because you are applying score to self.V
        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [None]:
class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [None]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        # defining attention as a separate model
        context_vector, attention_weights = self.attention(features, hidden)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # shape == (batch_size, max_length, hidden_size)
        x = self.fc1(output)

        # x shape == (batch_size * max_length, hidden_size)
        x = tf.reshape(x, (-1, x.shape[2]))

        # output shape == (batch_size * max_length, vocab)
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

Cretate models

In [None]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

## Checkpoint

In [None]:
checkpoint_path = "./checkpoints/train"

In [None]:
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)

In [None]:
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

## Training

Adding this in a separate cell because if you run the training cell many times, the loss_plot array will be reset

In [None]:
loss_plot = []

initializing the hidden state for each batch
because the captions are not related from image to image

In [None]:
@tf.function
def train_step(img_tensor, target):
    loss = 0

    hidden = decoder.reset_state(batch_size=target.shape[0])

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))

    trainable_variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

In [None]:
EPOCHS = 20

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(
              epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
    loss_plot.append(total_loss / num_steps)

    if epoch % 5 == 0:
          ckpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1, total_loss/num_steps))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 1.9880
Epoch 1 Batch 100 Loss 1.1633
Epoch 1 Batch 200 Loss 0.9539
Epoch 1 Batch 300 Loss 0.9819
Epoch 1 Batch 400 Loss 0.8571
Epoch 1 Batch 500 Loss 0.8078
Epoch 1 Batch 600 Loss 0.8228
Epoch 1 Batch 700 Loss 0.7061
Epoch 1 Batch 800 Loss 0.7213
Epoch 1 Batch 900 Loss 0.7475
Epoch 1 Batch 1000 Loss 0.7627
Epoch 1 Batch 1100 Loss 0.7564
Epoch 1 Batch 1200 Loss 0.6764
Epoch 1 Batch 1300 Loss 0.6601
Epoch 1 Batch 1400 Loss 0.7389
Epoch 1 Batch 1500 Loss 0.7000
Epoch 1 Batch 1600 Loss 0.7334
Epoch 1 Batch 1700 Loss 0.6883
Epoch 1 Batch 1800 Loss 0.6961
Epoch 1 Batch 1900 Loss 0.7065
Epoch 1 Batch 2000 Loss 0.7070
Epoch 1 Batch 2100 Loss 0.7137
Epoch 1 Batch 2200 Loss 0.7254
Epoch 1 Batch 2300 Loss 0.6367
Epoch 1 Batch 2400 Loss 0.6626
Epoch 1 Batch 2500 Loss 0.6929
Epoch 1 Batch 2600 Loss 0.6783


In [None]:
plt.plot(loss_plot)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Plot')
plt.show()

## Caption

In [None]:
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(10, 10))

    len_result = len(result)
    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (8, 8))
        ax = fig.add_subplot(len_result//2, len_result//2, l+1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()

### Captions on the validation set

In [None]:
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print ('Real Caption:', real_caption)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)

### Try it on your own images

In [None]:
image_url = 'https://tensorflow.org/images/surf.jpg'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension,
                                     origin=image_url)

result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image_path, result, attention_plot)

Image.open(image_path)