In [2]:
from os import listdir
from os import path
from pickle import dump
from keras.models import Model
from keras.models import load_model
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils.np_utils import to_categorical
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM
from keras.layers import Embedding
from keras.utils import plot_model
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from nltk.translate.bleu_score import corpus_bleu

from pickle import load
from numpy import array
from numpy import argmax

import string


  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


Preparing Data

In [13]:

def feature_extraction(directory):
    model = VGG16()
    model.layers.pop() #restructuring the model for our use removing the last layer
    model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
    
    print(model.summary())
    
    features = dict()
    for name in listdir(directory):
        # load an image from file
        filename = directory + '/' + name
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        features[image_id] = feature
    print('Preprocessing Completed')
    return features


directory = 'Flicker8k_Dataset'
features = feature_extraction(directory)
print('Extracted Features: %d' % len(features))
dump(features, open('features.pkl', 'wb'))

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 224, 224, 3)       0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 56, 56, 128)       0         
__________

In [3]:
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

filename = 'Flickr8k_text/Flickr8k.token.txt'
doc = load_doc(filename)

    Extracting Description for images

In [4]:
def load_description(doc):
    mapping = dict()
    
    for line in doc.split('\n'):
        tokens = line.split()
        if len(line)<2:
            continue
        image_id, image_des = tokens[0], tokens[1:]
        image_id = image_id.split('.')[0]
        image_des = ' '.join(image_des)
        
        if image_id not in mapping:
            mapping[image_id] = list()
        
        mapping[image_id].append(image_des)
    return mapping

description = load_description(doc)
print('Loaded: %d ' % len(description))

Loaded: 8092 


    Cleaning Descriptions

In [5]:

def clean_description(description):
    #removing punctuations
    table = str.maketrans('','', string.punctuation)
    for key, des_list in description.items():
        for i in range(len(des_list)):
            desc = des_list[i]
            desc = desc.split()
            # convert to lower case
            desc = [word.lower() for word in desc]
            # remove punctuation from each token
            desc = [w.translate(table) for w in desc]
            # remove hanging 's' and 'a'
            desc = [word for word in desc if len(word)>1]
            # remove tokens with numbers in them
            desc = [word for word in desc if word.isalpha()]
            # store as string
            des_list[i] =  ' '.join(desc)
clean_description(description)

In [5]:
# Saving Descriptions in a text file

def save_description(description, filename):
    lines = list()
    for key, des_list in description.items():
        for desc in des_list:
            lines.append(key+' '+desc)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()

save_description(description, 'File_Description.txt')

Training the model and essential functions

In [6]:
def load_doc(filename):
    file = open(filename, 'r')
    text = file.read()
    file.close()
    return text

def load_set(filename):
    doc = load_doc(filename)
    dataset = list()
    for line in doc.split('\n'):
        if len(line) < 1:
            continue
        identifier = line.split('.')[0]
        dataset.append(identifier)
    return set(dataset)

def load_clean_descriptions(filename, dataset):
    doc = load_doc(filename)
    descriptions = dict()
    for line in doc.split('\n'):
        tokens = line.split()
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in dataset:
            if image_id not in descriptions:
                descriptions[image_id] = list()
            desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
            descriptions[image_id].append(desc)
    return descriptions

def load_photo_features(filename, dataset):
    all_features = load(open(filename, 'rb'))
    features = {k: all_features[k] for k in dataset}
    return features

In [7]:
def to_lines(descriptions):
    all_desc = list()
    for key in descriptions.keys():
        [all_desc.append(d) for d in descriptions[key]]
    return all_desc



def create_tokenizer(descriptions):
    lines = to_lines(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer



def max_length(descriptions):
    lines = to_lines(descriptions)
    return max(len(d.split()) for d in lines)

def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None



In [8]:
def generate_desc(model, tokenizer, photo, max_length):
    in_text = 'startseq'
    for _ in range(max_length):

        sequence = tokenizer.texts_to_sequences([in_text])[0]

        sequence = pad_sequences([sequence], maxlen=max_length)

        yhat = model.predict([photo,sequence], verbose=0)

        yhat = argmax(yhat)

        word = word_for_id(yhat, tokenizer) #mapping iteger to word

        if word is None:
            break
        in_text += ' ' + word

        if word == 'endseq':
            break
    return in_text

def cleanup_summary(summary):
    index = summary.find('startseq ')
    if index > -1:
        summary = summary[len('startseq '):]
    index = summary.find(' endseq')
    if index > -1:
        summary = summary[:index]
    return summary

In [9]:
#Creating the Training Sequence adding <startseq> and <endseq>

def create_sequences(tokenizer, max_length, desc_list, photo):
    X1, X2, y = list(), list(), list()
    for desc in desc_list:
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0] #pad sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0] #encode output
            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq)
    return array(X1), array(X2), array(y)

In [10]:
# evaluate the skill of the model
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = list(), list()
    # step over the whole set
    for key, desc_list in descriptions.items():
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        # store actual and predicted
        references = [d.split() for d in desc_list]
        actual.append(references)
        predicted.append(yhat.split())
    # calculate BLEU score
    print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

Captioning Model

In [11]:
def caption_model(vocab_size, max_length):
    inputs1 = Input(shape=(4096,))
    feature_extractor1 = Dropout(0.5)(inputs1)
    feature_extractor2 = Dense(256, activation='relu')(feature_extractor1)
    
    inputs2 = Input(shape=(max_length,))
    seq_mod1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    seq_mod2 = Dropout(0.5)(seq_mod1)
    seq_mod3 = LSTM(256)(seq_mod2)
    
    decoder1 = add([feature_extractor2,seq_mod3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    
    model = Model(inputs=[inputs1, inputs2], outputs=outputs) #combining them
    
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    
    print(model.summary())
    return model

In [12]:
# to be used in model.fit due to large size of the data
from tqdm import tqdm

def data_generator(description, photos, tokenizer, max_length):
    while 1:
        for key, desc_list in description.items():
            photo = photos[key][0]
            in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo)
            yield[[in_img, in_seq], out_word]

In [13]:
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
print('Dataset: %d' % len(train))

train_descriptions = load_clean_descriptions('File_Description.txt', train)
print('Descriptions: train=%d' % len(train_descriptions))

train_features = load_photo_features('features.pkl', train)
print('Photos: train=%d' % len(train_features))

tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

max_length = max_length(train_descriptions)
print('Description Length: %d' % max_length)

Dataset: 6000
Descriptions: train=6000
Photos: train=6000
Vocabulary Size: 7579
Description Length: 34


In [32]:


model = caption_model(vocab_size, max_length)
epochs = 10
steps = len(train_descriptions)
for i in range(epochs):
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length)
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    model.save('model_' + str(i) + '.h5')

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_18 (InputLayer)           (None, 34)           0                                            
__________________________________________________________________________________________________
input_17 (InputLayer)           (None, 4096)         0                                            
__________________________________________________________________________________________________
embedding_9 (Embedding)         (None, 34, 256)      1940224     input_18[0][0]                   
__________________________________________________________________________________________________
dropout_17 (Dropout)            (None, 4096)         0           input_17[0][0]                   
__________________________________________________________________________________________________
dropout_18

In [14]:
# For testing the model

filename = 'Flickr8k_text/Flickr_8k.devImages.txt'
test = load_set(filename)
print('Dataset: %d' % len(test))

test_descriptions = load_clean_descriptions('File_Description.txt', test)
print('Descriptions: test=%d' % len(test_descriptions))

test_features = load_photo_features('features.pkl', test)
print('Photos: test=%d' % len(test_features))

Dataset: 1000
Descriptions: test=1000
Photos: test=1000


In [33]:
filename = 'model_9.h5'
model = load_model(filename)
# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

BLEU-1: 0.536539
BLEU-2: 0.279648
BLEU-3: 0.188560
BLEU-4: 0.084853


In [15]:
filename = 'model_8.h5'
model = load_model(filename)
# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

BLEU-1: 0.542413
BLEU-2: 0.285089
BLEU-3: 0.192270
BLEU-4: 0.087394


In [16]:
filename = 'model_7.h5'
model = load_model(filename)
# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

BLEU-1: 0.545240
BLEU-2: 0.288171
BLEU-3: 0.194931
BLEU-4: 0.090875


In [17]:
filename = 'model_6.h5'
model = load_model(filename)
# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

BLEU-1: 0.541171
BLEU-2: 0.285414
BLEU-3: 0.193602
BLEU-4: 0.089726
