In [1]:
import os

from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input

import warnings
warnings.filterwarnings('ignore')

In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
base_directory = 'flickr8k'

In [3]:
model = VGG16()

In [4]:
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
print(model.summary())

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

In [5]:
directory = os.path.join(base_directory,'Images')
features={}

In [6]:
from tqdm.notebook import tqdm

In [None]:
for img_name in os.listdir(directory):
    
    img_path = directory + '/' + img_name
    image = load_img(img_path, target_size=(224, 224))
    image = img_to_array(image)  #pixel to array
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    
    image = preprocess_input(image) #to use with vgg
    feature = model.predict(image, verbose=0) #feature extraction
    id = img_name.split('.')[0]
    features[id] = feature

In [10]:
import pickle
pickle.dump(features, open(os.path.join('/kaggle/working', 'features.pkl'), 'wb'))

In [11]:
with open(os.path.join(base_directory, 'captions.txt'), 'r') as f:
    next(f)  #for iterating through the captions
    captions = f.read()

In [None]:
# create mapping of image to captions
mapping = {}

for line in tqdm(captions.split('\n')):

    words = line.split(',')
    
    if len(line) < 2:   #very small captions
        continue
        
    image_id, caption = words[0], words[1:]

    image_id = image_id.split('.')[0]  #removing the extension
    caption = " ".join(caption)

    if image_id not in mapping:
        mapping[image_id] = []

    mapping[image_id].append(caption)

In [13]:
def text_preprocess(mapping):
    
    for key, captions in mapping.items():
        
        for i in range(len(captions)):

            caption = captions[i]
            caption = caption.lower()
            
            caption = caption.replace('[^A-Za-z]', '') #removing nos and spl chars
            caption = caption.replace('\s+', ' ') #removing spaces
            
            # adding start and end tags to the caption
            caption = 'start ' + " ".join([word for word in caption.split() if len(word)>1]) + ' end'
            captions[i] = caption

In [14]:
text_preprocess(mapping)

In [15]:
captions = []
for key in mapping:
    for caption in mapping[key]:
        captions.append(caption)

In [16]:
len(captions)

40455

In [17]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(captions)

In [18]:
vocabulary = len(tokenizer.word_index) + 1
vocabulary

8483

In [19]:
max_length = max(len(caption.split()) for caption in captions)
max_length

35

In [20]:
image_ids = list(mapping.keys())

partition = int(len(image_ids) * 0.80)
train = image_ids[:partition]
test = image_ids[partition:]

In [21]:
def CustomDatagenerator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):
    
    # loop over images
    X1, X2, y = list(), list(), list()
    n = 0
    
    while 1:
        for key in data_keys:
            n=n+1
            captions = mapping[key]
            for caption in captions:
                
                # encode the sequence
                seq = tokenizer.texts_to_sequences([caption])[0]
                
                # split the sequence into X, y pairs
                for i in range(1, len(seq)):
                    seq1, seq2 = seq[:i], seq[i]
                    seq1 = pad_sequences([seq1], maxlen=max_length)[0] #padding captions with different lengths to standard size
                    
                    # encoding output sequence
                    seq2 = to_categorical([seq2], num_classes=vocab_size)[0]
                    
                    # store the sequences
                    X1.append(features[key][0])
                    X2.append(seq1)
                    y.append(seq2)
                    
            if n == batch_size:
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                yield [X1, X2], y
                X1, X2, y = list(), list(), list()
                n = 0

In [22]:
inputs1 = Input(shape=(4096,))

fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)

# sequence feature layers
max_length = max(len(caption.split()) for caption in captions)
inputs2 = Input(shape=(max_length,))

l1 = Embedding(vocabulary, 256, mask_zero=True)(inputs2)
l2 = Dropout(0.4)(l1)
l3 = LSTM(256)(l2)

# Decoder - Generates o/p
decode_1 = add([fe2, l3])
decode_2 = Dense(256, activation='relu')(decode_1)

# output
outputs = Dense(vocabulary, activation='softmax')(decode_2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [None]:
epochs = 20
batch_size = 32
iters = len(train)//batch_size

for i in range(epochs):
    
    # create data generator
    generator = CustomDatagenerator(train, mapping, features, tokenizer, max_length, vocabulary, batch_size)
    model.fit(generator, epochs=1, steps_per_epoch=iters)

In [30]:
model.save('model.h5')

### Predictions

In [31]:
def index_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

In [32]:
def predict_caption(model, image, tokenizer, max_length):
    
    # add start tag for generation process
    start_tag = 'startseq'
    
    # iterate over the max length of sequence
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([start_tag])[0]
        sequence = pad_sequences([sequence], max_length)

        pred_word = model.predict([image, sequence], verbose=0)
        pred_word = np.argmax(pred_word)
        word = index_to_word(pred_word, tokenizer)
        
        # stop if word not found
        if word is None:
            break
            
        # append word as input for generating next word
        start_tag += " " + word
        
        # stop if we reach end tag
        if word == 'endseq':
            break
        text_pred = start_tag
      
    return text_pred

In [None]:
from nltk.translate.bleu_score import corpus_bleu
# validate with test data
actual, predicted = [],[]

for key in tqdm(test):
    captions = mapping[key]
    
    # predict the caption for image
    y_pred = predict_caption(model, features[key], tokenizer, max_length)
    
    # split into words
    actual_captions = [caption.split() for caption in captions]
    y_pred = y_pred.split()
    
    # append to the list
    actual.append(actual_captions)
    
    predicted.append(y_pred)
    
# calcuate BLEU score
print("BLEU: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights = (0.5,0.5,0,0)))