## Import Modules

In [None]:
import os  # handling the files
import pickle  # storing numpy features, like image features
import numpy as np
from tqdm.notebook import tqdm  # UI to show how much data has been processed

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array  # loading and preprocessing the images
from tensorflow.keras.preprocessing.text import Tokenizer # preprocessing the text
from tensorflow.keras.preprocessing.sequence import pad_sequences  # to even out the lengths of the sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model  # interpretation of the model in terms of an image
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add

from nltk.translate.bleu_score import corpus_bleu

from PIL import Image  # to load the image
import matplotlib.pyplot as plt

In [None]:
BASE_DIR = '/kaggle/input/flickr8k'
WORKING_DIR = '/kaggle/working'

In [None]:
# Load VGG16 model
model = VGG16()

# Restructure the model
# Why? - We don't need the fully connected layer of the VGG16 model. We just need last second layer to extract the features
# Note - The last layer is predictions (Dense), which we don't need
# The one we're using is last second layer - fc2 (Dense)
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)

# Summarize
print(model.summary())

In [None]:
# Extract Features from Image using a dictionary
# key = image ID, value = features
features = {}
directory = os.path.join(BASE_DIR, 'Images')

for img_name in tqdm(os.listdir(directory)):
    # Load the image from file
    img_path = directory + '/' + img_name
    image = load_img(img_path, target_size=(224, 224))  # images will get resized to 224*224
    # Convert image pixels to numpy array
    image = img_to_array(image)
    # Reshape data for model, in order to extract features
    # shape[i] represents r, g, b values for i = 0, 1, 2
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    # Preprocess image for VGG
    image = preprocess_input(image)
    # Extract features
    feature = model.predict(image, verbose=0)  # verbose=0 => it won't display any text or status
    # Get image ID
    # image_id's look like blahblah.jpg
    # We just need the blahblah part, so take [0] of the image_id
    image_id = img_name.split('.')[0]
    # Store features
    features[image_id] = feature

In [None]:
# Store Extracted Features in a File
pickle.dump(features, open(os.path.join(WORKING_DIR, 'features.pkl'), 'wb'))

In [None]:
# Load Features from Pickle
with open(os.path.join(WORKING_DIR, 'features.pkl'), 'rb') as f:
    features = pickle.load(f)

## Load the Captions Data

In [None]:
with open(os.path.join(BASE_DIR, 'captions.txt'), 'rb') as f:
    # The first line of the captions.txt file is "image, text". We don't need this line, so use next(f) to skip that line
    next(f)
    captions_doc = f.read()  # Reads the whole text data from captions.txt
# Note - The captions_doc has files in "1000268201_693b08cb0e.jpg,A child in a pink dress is climbing up a set of stairs in an entry way ." format

In [None]:
# Create Mapping of Images to Captions
mapping = {}
# Process Lines
for line in tqdm(captions_doc.split('\n')):  # By splitting it by /n, you're iterating line-by-line
    # Split the Line by Commas(,)
    tokens = line.split(',')  # Commas act as separators
    # If line = img_01, 'A cute panda', 'A chonky panda'
    # tokens = ['img_01', 'A cute panda', 'A chonky panda']
    # Basically, the split function is separating the image ID, and the caption sentence
    
    if len(line) < 2:
        continue  # Small lines might give an error, so avoid the error by continuing
    image_id, caption = tokens[0], tokens[1:]
    # Remove .jpg extension from image_id
    image_id = image_id.split('.')[0]
    # Convert caption list to strings
    # The following line will concatenate the caption as,
    # caption = 'A cute panda A chonky panda'
    caption = " ".join(caption)
    
    # An image can have multiple captions
    # Create a list of captions, if needed
    if image_id not in mapping:
        mapping[image_id] = []
    # Store the caption corresponding to the image
    mapping[image_id].append(caption)
    # mapping[img_01] = 'A cute panda A chonky panda'

In [None]:
# Number of images we have
len(mapping)

In [None]:
# Pre-process the Captions
# caption = 'A cute panda A chonky panda'
def clean(mapping):
    for key, captions in mapping.items():
        for i in range(len(captions)):
            # Take one caption at a time
            caption = captions[i]
            # Pre-processing steps - 
            caption = caption.lower()  # Convert everything to lower case
            caption = caption.replace('[^A-Za-z]', '')  # Remove all special characters, numbers, etc.
            caption = caption.replace('\s+', ' ')  # Replace multiple spaces with a single space
            # Add start and end tags to the caption
            # Helps the model decide when to start and when to stop
            # Also, remove single character words like 'a', we don't need it
            # 'A girl is walking' ~= 'girl is walking'
            caption = 'startseq' + " ".join([word for word in caption.split() if len(word)>1]) + 'endseq'
            # Replace the original captions with the pre-processed ones
            captions[i] = caption
'''
After using this function for the mapping['img_01] = 'A cute panda A chonky panda', 
it will become:
mapping['img_01] = ['startseq cute panda endseq', 'starseq chonky panda endseq']
NOTE : Earlier I had used <start> and <end> to denote the starting and ending of a sentence, but later the special characters get removed anyway, so I changed those names to startseq and endseq.
'''
            

In [None]:
# Store all captions in a single list
all_captions = []
for key in mapping:
    for caption in mapping[key]:
        all_captions.append(caption)

In [None]:
# Check length of captions to get an idea
print(len(all_captions))
print(all_captions[:10])

In [None]:
# Tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1  # Total number of unique words you're having = 8483 in this case
vocab_size

In [None]:
# Get maximum length of the caption available
# Will be useful for padding later on
max_length = max(len(caption.split()) for caption in all_captions)  # = 35 in this case
max_length

##  Train-test Split

In [None]:
# Train Test Split
image_ids = list(mapping.keys())
split_ratio = int(len(image_ids) * 0.90)  # 90% of the data for training
train = image_ids[:split]
test = image_ids[split:]

In [None]:
# Kaggle has maximum 13GB of RAM
# If you feed the entire thing directly, it'll crash
# Create data generator to divide data into batches and get it, because we have limited memory

def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):
    # Loop over images
    X1, X2, y = list(), list(), list()
    n = 0  # To determine whether we reach the batch size or not
    while 1:  # Infinite loop
        for key in data_keys:  # data_keys are the image_id's of the training data
            # When you get new data, increment n
            n += 1
            captions = mapping[key]
            # Process each caption
            for caption in captions:
                # Encode the sequence
                # For each word, assign an index
                seq = tokenizer.texts_to_sequences([caption])[0]
                # Split the sequence into X (input), y(output) pairs
                for i in range(1, len(seq)):
                    # Split into input and output data
                    in_seq, out_seq = seq[:i], seq[i]
                    # Pad the input sequence
                    # There will be additional results/outputs too
                    # So you should get only the [0]th term
                    in_seq = pad_sequence([in_seq], maxlen=max_length)[0]
                    # Encode output sequence into one-hot encoding
                    # One-hot encoding - If the word is present, it'll be represented as 1, otherwise 0
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]

                    # Store the sequences
                    X1.append(features[key][0])  # Image features
                    X2.append(in_seq)  # Text features
                    y.append(out_seq)  # Output is the out_seq
            if n == batch_size:
                # Convert into np arrays, because model can't process normal python lists
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                # Return the collected sequences for the model to consume
                yield [X1, X2], y
                # Reinitialize X1, X2, y, because we no longer need the values from the last iteration
                # If you don't reinitialize, you'll exhaust the memory
                X1, X2, y = list(), list(), list()
                n = 0

What the code above does is:

Say we have a caption: startseq girl going into wooden building endseq
For each iteration, there will be new X and y
For first iteration, X=startseq, and the model predicts y=girl. For the second iteration, X=startseq girl, and the model is supposed to predict y=going.
X is input sequence.

This is how the sequence flow works

In [1]:
'''
       X                                             y
    startseq                                         girl
    startseq girl                                    going
    startseq girl going                              inside
    startseq girl going inside                       wooden 
    startseq girl going inside wooden                building
    startseq girl going inside wooden building       endseq
    startseq girl going inside wooden building endseq

'''

'\n       X                                             y\n    <start>                                         girl\n    <start> girl                                    going\n    <start> girl going                              inside\n    <start> girl going inside                       wooden \n    <start> girl going inside wooden                building\n    <start> girl going inside wooden building       <end>\n    <start> girl going inside wooden building <end>\n\n'

## Model Creation

In [None]:
# Encoder model :

# Image-feature layers
inputs1 = Input(shape=(4096,))  # is the input for fe1 layer
fe1 = Dropout(0.4)(inputs1)  # feature 1, which is the input for fe2 layer
fe2 = Dense(256, activation='relu')(fe2)

# Sequence-feature layers
inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)  # mask_zero=True, because we're padding the sequence
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

# Decoder model : 
decoder1 = add([fe2, se3])  # Concatenate the features
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

# Model with 2 inputs, and 1 output
model = Model(inputs=[inputs1, inputs2], outputs=outputs)
# Loss and optimizer
model.compile(loss='categorical_crossentropy', optimizer='adam')

# Plot the model
plot_model[model, show_shapes=True]  # You can even save this as an image

Since we had already extracted the features using VGG, we didn't use any CNNs in the code snippet for the layers above

In [None]:
# Train the Model
# Using data_generator function => model will train slowly, but will take less RAM
# Not using data_encoder function => model trains faster, but takes a lot of memory -> might crash
epochs = 15
batch_size = 64
steps = len(train) // batch_size  # After each step, it does back-propagation, and fetches the data

for i in range(epochs):
    # Create data generator
    generator = data_generator(train, mapping, features, tokenizer, max_length, vocab_size, batch_size)
    # From the generator, you'll get your inputs - X1 (image features), X2 (sequence features), y (outputs or labels)
    # Fit for one epoch
    model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    # This won't show any validation accuracy, because we don't have any validation generator
    # We'll do testing and validation after training the model
    

In [None]:
# Save the trained model
model.save(WORKING_DIR+'/best_model.h5')

## Generate Captions for the Image

In [None]:
# Convert id into a word
# The output of the model will be just indices
# Convert those indices into words

# Helper function
def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index==integer:
            return word
    return None

In [None]:
# Generate caption for an image
def predict_caption(model, image, tokenizer, max_length):
    # Add start tag for generation process
    in_text = 'startseq'
    # Iterate over the max length of the sequence (=35, here)
    for i in range(max_length):
        # Encode input sequence. Convert Sequence -> Integer
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        # Pad the sequence
        sequence = pad_sequences([sequence], max_length)
        # Predict next word
        # yhat will be probabilities
        yhat = model.predict([image, sequence], verbose=0)
        # Choose the word with highest probability
        yhat = np.argmax(yhat)  # Gives index with max probability
        # Convert index to word
        word = idx_to_word(yhat, tokenizer)
        # Stop if word not found
        if word is None:
            break
        # Append word as input for generating next word in the caption
        in_text += " " + word
        # Stop if we reach <end> tag
        if word == 'endseq':
            break
    return in_text  # Return the caption

In [None]:
# Validate with Text Data
actual, predicted = list(), list()
for key in tqdm(test):
    # Get actual caption
    captions = mappingp[key]
    # Predict the caption for image
    # y_pred will have a text-like caption in it
    y_pred = predict_caption(model, feature[key], tokenizer, max_length)  # Pass the trained model, image features
    # Split into words
    actual_captions = [caption.split() for caption in captions]
    y_pred = y_pred.split()
    # Append to the list
    actual.append(actual_captions)
    predicted.append(y_pred)

# Calculate BLEU Score - Score we need to consider when we have text data
# Higher the score, the better. Ranges from 0-1
# Increase number of epochs and see if the score improves
print("BLEU-1: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))

## Visualize the Results

In [None]:
def generate_caption(image_name):
    # Load the Image
    # image_name = "1009434119_febe49276a.jpg"
    image_id = image_name.split('.')[0]
    img_path = os.path.join(BASE_DIR, "Images", image_name)
    image = Image.open(img_path)
    captions = mapping[image_id]
    print('------------------ Actual ------------------')
    for caption in captions:
        print(caption)
    y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
    print('--------------------- Predicted ------------------')
    print(y_pred)
    plt.imshow(image)

In [None]:
generate_caption('009434119_febe49276a.jpg')