Following: https://machinelearningmastery.com/develop-a-deep-learning-caption-generation-model-in-python/

In [1]:
#Import libraries
from numpy import array

from os import listdir
from pickle import dump, load
from keras.models import Model
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical, plot_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint

import string

import datetime

import random
random.seed(42)

###GloVe
from numpy import asarray
from numpy import zeros

Using TensorFlow backend.


## Prepare Text Data

In [2]:
#mag-check

#Load the file containing all of the descriptions

def load_doc(filename):
    #open the file as read only
    file = open(filename, 'r')
    #read all text
    text = file.read()
    #close the file
    file.close()
    return text

filename = '../input/flickr8k/flickr8k_text/Flickr8k.token.txt'
#load descriptions 
doc = load_doc(filename)

In [3]:
#mag-check m

#extract descriptions for images
def load_descriptions(doc):
    mapping = dict()
    #process lines
    for line in doc.split('\n'):
        #slit line by white space
        tokens = line.split()
        if len(line) < 2:
            continue
        #take the first token as the image id, the rest as a description
        image_id, image_desc = tokens[0], tokens[1:]
        #remove filename extension from image id
        image_id = image_id.split('.')[0]
        #convert descrption tokens back to string
        image_desc = ' '.join(image_desc)
        #create the list if needed
        if image_id not in mapping:
            mapping[image_id] = list()
        #store description
        mapping[image_id].append(image_desc)
    return mapping

#parse descriptions
descriptions = load_descriptions(doc)
print('Loaded: %d ' % len(descriptions))

Loaded: 8092 


Next, let's clean the text. Perforrming the necessary tasks of text mining:
- converting words to lowercase
- removing all the punctuations
- removing all the words that are 1 character or less in length
- remove all words with numbers in them

In [4]:

def clean_descriptions(descriptions):
    #prepare translation table for removing the punctuation
    table = str.maketrans('', '', string.punctuation)
    for key, desc_list in descriptions.items():
        for i in range(len(desc_list)):
            desc = desc_list[i]
            #tokenize
            desc = desc.split()
            #convert to lower case
            desc = [word.lower() for word in desc]
            #remove punctuation from each token
            desc = [word.translate(table) for word in desc]
            #remove 1 character words 
            desc = [word for word in desc if len(word) > 1]
            #remove tokens with numbers in them
            desc = [word for word in desc if word.isalpha()]
            #store as a string
            desc_list[i] = ' '.join(desc)
        
#clean descriptions
clean_descriptions(descriptions)

Next, we summarize the size of the vocabulary.

In [5]:
#convert the loaded descriptions into a vocabulary of words
def to_vocabulary(descriptions):
    #build a list of all description strings
    all_desc = set()
    for key in descriptions.keys():
        [all_desc.update(d.split()) for d in descriptions[key]]
    return all_desc

#summarize vocabulary
vocabulary = to_vocabulary(descriptions)
print('Vocabulary Size: %d' %len(vocabulary))

Vocabulary Size: 8763


In [6]:
#save descriptions to file, one per line
def save_descriptions(descriptions, filename):
    lines = list()
    for key, desc_list in descriptions.items():
        for desc in desc_list:
            lines.append(key + ' ' + desc)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()

#save descriptions to the file
save_descriptions(descriptions, 'descriptions.txt')

## Developing Deep Learning Model

### LOAD DATA

In [7]:
#load doc into memory
#use the function load_doc defines above

#load a pre-defines list of photo identifiers
def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    # process line by line
    for line in doc.split('\n'):
        #skip empty lines
        if len(line) < 1:
            continue
        #get the image identifier
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

Function __load_clean_descriptions()__ defined below loads the cleaned text descriptions from ‘descriptions.txt‘ for a given set of identifiers and returns a dictionary of identifiers to lists of text descriptions

Using  strings __startseq__ and __endseq__ for first-word and last word signal purpose. These tokens are added to the loaded descriptions as they are loaded. It is important to do this now before we encode the text so that the tokens are also encoded correctly.

In [None]:
#load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    #load document
    doc = load_doc(filename)
    descriptions = dict()
    for line in doc.split('\n'):
        #split line by whitespace
        tokens = line.split()
        #split id from description
        image_id, image_desc = tokens[0], tokens[1:]
        #skip images not in set
        if image_id in dataset:
            #create list
            if image_id not in descriptions:
                descriptions[image_id] = list()
            #wrap description in tokens
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            #store
            descriptions[image_id].append(desc)
    return descriptions

In [None]:
#load photo features
def load_photo_features(filename, dataset):
    #load all features
    all_features = load(open(filename, 'rb'))
    #filter features
    features = {k: all_features[k] for k in dataset}
    return features

In [None]:
#load training dataset (6K)
filename = '../input/flickr8k/flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('Dataset: %d' %len(train))
# descriptions
train_descriptions = load_clean_descriptions('descriptions.txt', train)
print('Descriptions: train = %d' %len(train_descriptions))
# photo features
train_features = load_photo_features('../input/01-image-features/densenet_features.pkl', train)
print('Photos: train = %d' %len(train_features))

In [None]:
# load validation dataset
filename = '../input/flickr8k/flickr8k_text/Flickr_8k.devImages.txt'
dev = load_set(filename)
print('Dataset: %d' % len(dev))
# descriptions
dev_descriptions = load_clean_descriptions('descriptions.txt', dev)
print('Descriptions: dev=%d' % len(dev_descriptions))
# photo features
dev_features = load_photo_features('../input/01-image-features/densenet_features.pkl', dev)
print('Photos: train=%d' % len(dev_features))

The description text will need to be encoded to numbers before it can be presented to the model as in input or compared to the model’s predictions

In [None]:

#convert a dictionary of clean descriptions to a list of descreptions
def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc

#fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

#prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' %vocab_size)

In [None]:
# create sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, descriptions, photos):
    X1, X2, y = list(), list(), list()
    # walk through each image identifier
    for key, desc_list in descriptions.items():
        # walk through each description for the image
        for desc in desc_list:
            # encode the sequence
            seq = tokenizer.texts_to_sequences([desc])[0]
            # split one sequence into multiple X,y pairs
            for i in range(1, len(seq)):
                # split into input and output pair
                in_seq, out_seq = seq[:i], seq[i]
                # pad input sequence
                in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                # encode output sequence
                out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
                # store
                X1.append(photos[key][0])
                X2.append(in_seq)
                y.append(out_seq)
    return array(X1), array(X2), array(y)

In [None]:
# calculate the length of the description with the most words
def max_length(descriptions):
    lines = to_lines(descriptions)
    return max(len(d.split()) for d in lines)

### DEFINING THE MODEL

In [None]:
use_GloVe = False

In [None]:
###GloVe
#bring in GloVe word embedding model to map our vocab into vectors

if use_GloVe:
    # Load GloVe vectors
    embeddings_index = {} # empty dictionary
    f = open('../input/glove-global-vectors-for-word-representation/glove.6B.200d.txt', encoding="utf-8")
    for line in f:
        values = line.split()
        word = values[0]
        coefs = asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs
    f.close()

In [None]:
###GloVe 
# create embedding matrix to inject into model

embedding_dim = 256

if use_GloVe:
    embedding_dim = 200
    # Get dense vector for each of word in vocabulary
    embedding_matrix = zeros((vocab_size, embedding_dim))
    for word, i in tokenizer.word_index.items():
        #if i < max_words:
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            # Words not found in the embedding index will be all zeros
            embedding_matrix[i] = embedding_vector

In [None]:
# define the captioning model
def define_model(vocab_size, max_length):
    # feature extractor model
    inputs1 = Input(shape=(1024,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation='relu')(fe1)
    # sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, embedding_dim, mask_zero=True)(inputs2) 
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    # decoder model
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    # tie it together [image, seq] [word]
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    
    if use_GloVe:
        ###GloVe
        # now set the embedding weights
        model.layers[2].set_weights([embedding_matrix])
        model.layers[2].trainable = False
    
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    # summarize model
    print(model.summary())
    # plot_model(model, to_file='model.png', show_shapes = True)
    return model

In [None]:
# determine the maximum sequence length
max_length = max_length(train_descriptions)
print('Description Length: %d' % max_length)

### FITTING THE MODEL

In [None]:
# define the model
model = define_model(vocab_size, max_length)

### Train With Progressive Loading

In [None]:
# data generator, intended to be used in a call to model.fit_generator()
def data_generator(descriptions, photos, tokenizer, max_length):
    # loop for ever over images
    while 1:
        for key, desc_list in descriptions.items():
            # retrieve the photo feature
            photo = photos[key][0]
            in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo)
            yield [[in_img, in_seq], out_word]

You can see that we are calling the create_sequence() function to create a batch worth of data for a single photo rather than an entire dataset. This means that we must update the create_sequences() function to delete the “iterate over all descriptions” for-loop.

The updated function is as follows:

In [None]:
# create sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, desc_list, photo):
    X1, X2, y = list(), list(), list()
    # walk through each description for the image
    for desc in desc_list:
        # encode the sequence
        seq = tokenizer.texts_to_sequences([desc])[0]
        # split one sequence into multiple X,y pairs
        for i in range(1, len(seq)):
            # split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # store
            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq)
    return array(X1), array(X2), array(y)

In [None]:
# train the model, run epochs manually and save after each epoch

label = 'model'
epoch_loop = 20

start = datetime.datetime.now()
#checkpoint = ModelCheckpoint(model_filepath, monitor='val_loss', verbose=2, save_best_only=True, mode='min')

history = {}
min_val_loss_epoch = -1
min_val_loss = 100

for i in range(epoch_loop):
    # fit for one epoch
    train_generator = data_generator(train_descriptions, train_features, tokenizer, max_length)
    validation_generator = data_generator(dev_descriptions, dev_features, tokenizer, max_length)
    hist = model.fit_generator(train_generator,
                        epochs=1,
                        steps_per_epoch=len(train_descriptions),
                        verbose=1,
                        #callbacks = [checkpoint],
                        validation_data = validation_generator,
                        validation_steps = len(dev_descriptions))
    # save model
    model_filepath = label + '_epoch_' + str(i) + '.h5'
    model.save(model_filepath)
    history[i] = hist.history
    if history[i]["val_loss"][0] <= min_val_loss:
        min_val_loss_epoch = i
        min_val_loss = hist.history["val_loss"][0]
end = datetime.datetime.now()

# save history 
history_filepath = label + '_history_.pkl'
with open(history_filepath, "wb") as pcklfile: 
    dump(history, pcklfile)

print("Total time taken: {}".format(end - start))


Evaluation

In [None]:
# map an integer to a word
def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None
 
# generate a description for an image
def generate_desc(model, tokenizer, photo, max_length):
    # seed the generation process
    in_text = 'startseq'
    # iterate over the whole length of the sequence
    for i in range(max_length):
        # integer encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        # pad input
        sequence = pad_sequences([sequence], maxlen=max_length)
        # predict next word
        yhat = model.predict([photo,sequence], verbose=0)
        # convert probability to integer
        yhat = argmax(yhat)
        # map integer to word
        word = word_for_id(yhat, tokenizer)
        # stop if we cannot map the word
        if word is None:
            break
        # append as input for generating the next word
        in_text += ' ' + word
        # stop if we predict the end of the sequence
        if word == 'endseq':
            break
    return in_text

In [None]:
#define evaluation

def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = list(), list()
    # step over the whole set
    for key, desc_list in descriptions.items():
        # generate description
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        # store actual and predicted
        references = [d.split() for d in desc_list]
        actual.append(references)
        predicted.append(yhat.split())
    # calculate BLEU score
    print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
# prepare test set
 
# load test set
filename = '../input/flickr8k/flickr8k_text/Flickr_8k.testImages.txt'
test = load_set(filename)
print('Dataset: %d' % len(test))
# descriptions
test_descriptions = load_clean_descriptions('descriptions.txt', test)
print('Descriptions: test=%d' % len(test_descriptions))
# photo features
test_features = load_photo_features('../input/01-image-features/densenet_features.pkl', test)
print('Photos: test=%d' % len(test_features))


In [None]:
#print(test)


In [None]:
# reload the best epoch result

from keras.models import load_model

override_model_choice = False

if override_model_choice:
    model_filepath = 'model_epoch_1.h5'
else:
    # identify the best one
    best = min_val_loss_epoch
    model_filepath = label + '_epoch_' + str(best) + '.h5'

# load the model
model = load_model(model_filepath)


In [None]:
#evaluate on test set

from numpy import argmax
from nltk.translate.bleu_score import corpus_bleu

# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

In [8]:
# function to predict captions
def demo_captions(model, photos, tokenizer, max_length):
    captions = {}
    for key in photos:
        # generate description
        caption = generate_desc(model, tokenizer, photos[key], max_length)
        captions.update( {key : caption} )
    return(captions)

# prepare demo set
filename = '../input/demoimagesforcaptioning/demoImages.txt'
demo_path = '../input/flickr8k/flickr8k_dataset/Flicker8k_Dataset/'

# load demo set
demo = load_set(filename)
print('Dataset: %d' % len(demo))
# photo features
demo_features = load_photo_features('../input/01-image-features/densenet_features.pkl', demo)
print('Photos: demo=%d' % len(demo_features))

demo_captions = demo_captions(model, demo_features, tokenizer, max_length)

%pylab inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

for k in demo_features:
    img=mpimg.imread(demo_path + k +'.jpg')
    imgplot = plt.imshow(img)
    plt.show()
    print(demo_captions[k])


Dataset: 23
