In [1]:
from tensorflow import keras
from keras.applications.vgg16 import VGG16
import keras.utils as image
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing.text import Tokenizer
from keras.utils import pad_sequences
from tensorflow.keras.utils import to_categorical
from keras.utils.vis_utils import plot_model
from tensorflow.keras import layers

from keras.callbacks import ModelCheckpoint

import os
from tqdm import tqdm
import pickle
import string


In [2]:
## **Feature extraction using VGG16** 
directory = 'F:\image-caption-generator\Flickr8k_Dataset'
VGG_model = VGG16()
VGG_model = keras.Model(inputs = VGG_model.inputs, outputs = VGG_model.layers[-2].output)
print(VGG_model.summary())

feature = dict()

for i in tqdm(os.listdir(directory)):
    file_name = directory + '/' + i
    img = image.load_img(file_name, target_size = (224, 224))
    img = image.img_to_array(img)
    img = img.reshape((1,224,224,3))
    img = preprocess_input(img)

    features = VGG_model.predict(img)
    image_id = i.split('.')[0]

    feature[image_id] = features

print("Extracted Features: %d" % len(feature))
pickle.dump(feature, open('features.pkl','wb'))

KeyboardInterrupt: 

In [3]:
filename = "F:\\image-caption-generator\\Flickr8k_text\\Flickr8k.token.txt"

# Loading the caption data
file = open(filename, 'r')
caption = file.read()
file.close()

# Extracting the caption for image data
descriptions = dict()
for line in caption.split('\n'):
    tokens = line.split()

    if len(line) < 2:
        continue

    image_id, image_description = tokens[0], tokens[1:]
    image_id = image_id.split('.')[0]
    image_description = ' '.join(image_description)

    if image_id not in descriptions:
        descriptions[image_id] = list()

    descriptions[image_id].append(image_description)

print("Loaded Captions", len(descriptions))

# Cleaning the extracted caption descriptions of the image data
table = str.maketrans('', '', string.punctuation)
for key, desc_list in descriptions.items():
    for i in range(len(desc_list)):
        description = desc_list[i]

        # Tokenize
        description = description.split()
        
        # Convert to lowercase
        description = [term.lower() for term in description]

        # Remove punctuation from text
        description = [w.translate(table) for w in description]

        # Remove single-character words from captions
        description = [term for term in description if len(term) > 1]

        # Remove tokens with numbers
        description = [term for term in description if term.isalpha()]

        # Store the result as a string
        desc_list[i] = ' '.join(description)

# Creating vocabulary
all_desc = set()
for key in descriptions.keys():
    [all_desc.update(d.split()) for d in descriptions[key]]

print('Vocabulary Size', len(all_desc))

# Save to file
lines = []
for key, desc_list in descriptions.items():
    for description in desc_list:
        lines.append(key + ' ' + description)
data = '\n'.join(lines)

file = open('descriptions.txt', 'w')
file.write(data)
file.close()


Loaded Captions 8092
Vocabulary Size 8763


In [4]:
def load_doc(file_name):
    file = open(file_name, 'r')
    caption = file.read()
    file.close()
    return caption

In [5]:
def load_set(file_name):
    document = load_doc(file_name)
    data_set = list()
    
    # process line by line
    for line in document.split('\n'):
        # skip empty lines
        if len(line) < 1:
            continue
        
        # get the image identifier
        identifier = line.split('.')[0]
        data_set.append(identifier)

    return set(data_set)

In [6]:
# load clean descriptions into memory
def load_clean_descriptions(file_name, data_set):
    # load document
    doc = load_doc(file_name)
#     print(doc)
    desc = dict()
#     print(doc.split('\n'))
    for line in doc.split('\n'):
        # split line by white space
        tokens = line.split()
        
        # split id from description
        image_id, image_desc = tokens[0], tokens[1:]
        if image_id in data_set:
            # create list
            if image_id not in list(desc.keys()):
                desc[image_id] = list()
                
            # wrap desc in tokens
            description = 'startseq ' + ' '.join(image_desc) + ' endseq'
            
            # store
            desc[image_id].append(description)
    return desc

In [7]:
# load photo features
def load_photo_features(file_name, data_set):
    # load all features
    all_features = pickle.load(open(file_name, 'rb'))
    
    # filter features
    features = {k: all_features[k] for k in data_set}
    return features

In [8]:
def to_lines(desc):
    all_descriptions = list()
    for key in desc.keys():
        list(all_descriptions.append(d) for d in desc[key])
    return all_descriptions

In [9]:
def create_tokenizer(desc):
    lines = to_lines(desc)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer

In [10]:
def max_length(desc):
    lines = to_lines(desc)
    return max(len(d.split()) for d in lines)

In [11]:
filename = r"F:\image-caption-generator\Flickr8k_text\Flickr_8k.trainImages.txt"

train = load_set(filename)
print("Dataset:",len(train))

# descriptions
train_descriptions = load_clean_descriptions(r"F:\image-caption-generator\workspace\descriptions.txt", train)
print("Descriptions: train=",len(train_descriptions))

# photo features
train_features = load_photo_features(r"F:\image-caption-generator\workspace\features.pkl", train)
print("Photos: train=",len(train_features))

# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print("Vocabulary Size:",vocab_size)

# determine the maximum sequence length
max_length = max_length(train_descriptions)
print("Description Length:",max_length)

Dataset: 6000
Descriptions: train= 6000
Photos: train= 6000
Vocabulary Size: 7579
Description Length: 34


In [12]:
import numpy as np

# create sequences of images, input sequences and output words for an image
def create_sequences(tokenizer, max_length, desc_list, photo):
    x_1 = []
    x_2 = []
    y = []
    
    for description in desc_list:
        # encode the sequence
        sequence = tokenizer.texts_to_sequences([description])[0]
        
        # splitting one seqeunce into multiple x and y pairs
        for i in range(1, len(sequence)):
            # split in input and output pair
            input_seq, output_seq = sequence[:i], sequence[i]
            
            # pad input sequence
            in_seq = pad_sequences([input_seq], maxlen=max_length)[0]
            
            # encode output sequence
            out_seq = to_categorical([output_seq], num_classes=vocab_size)[0]
            
            # store
            x_1.append(photo)
            x_2.append(input_seq)
            y.append(output_seq)
            
    x_1 = np.array(x_1,object)
    x_2 = np.array(x_2, object)
    y = np.array(y, object)
    print(x_1.shape)
    print(x_2.shape)
    print(y.shape)
    return x_1, x_2, y

In [13]:
# Below code is used to progressively load the batch of data
# data generator will be used in the model.fit_generator()
def data_generator(desc, photos, tokenizer, max_length, batch_size):
    # loop for ever over images
    x_1, x_2, y = list(), list(), list()
    n = 0
    while True:
        for key, desc_list in desc.items():
            n+=1
            # retrieving the photo features
            photo = photos[key][0]
            
            for description in desc_list:
                # encode the sequence
                sequence = tokenizer.texts_to_sequences([description])[0]

                # splitting one seqeunce into multiple x and y pairs
                for i in range(1, len(sequence)):
                    # split in input and output pair
                    input_seq, output_seq = sequence[:i], sequence[i]

                    # pad input sequence
                    in_seq = pad_sequences([input_seq], maxlen=max_length)[0]

                    # encode output sequence
                    out_seq = to_categorical([output_seq], num_classes=vocab_size)[0]

                    # store
                    x_1.append(photo)
                    x_2.append(in_seq)
                    y.append(out_seq)

            if n == batch_size:
                x_1, x_2, y = np.array(x_1), np.array(x_2), np.array(y)
                yield [x_1, x_2], y
                x_1, x_2, y = list(), list(), list()
                n = 0

In [14]:
# define the captioning model
def define_model(vocab_size, max_length):
    # feature extractor model
    inputs1 = keras.Input(shape=(4096,))
    fe_1 = layers.Dropout(0.5)(inputs1)
    fe_2 = layers.Dense(256, activation=tf.nn.relu)(fe_1)
    
    # The sequence model takes input sentences or descriptions to be fed to the embedding layer
    # sequence model
    inputs2 = layers.Input(shape=(max_length,))
    
    # the parameter mask_true is set as true to ignore the padded values
    # The input sequences are of length 34 words
    se_1 = layers.Embedding(vocab_size, 256, mask_zero=True)(inputs2)
    
    # using a dropout layer to reduce overfitting
    se_2 = layers.Dropout(0.5)(se_1)
        
    # After this, we will use an LSTM layer having 256 memory units 
    # to process these text descriptions of the sentences.
    se_3 = layers.LSTM(256)(se_2)
    
    # decodeer model
    # the decoder model merges the vectors from both the 
    # input models by doing an addition operation.
    decoder_1 = layers.add([fe_2, se_3])
    decoder_2 = layers.Dense(256, activation=tf.nn.relu)(decoder_1)
    outputs = layers.Dense(vocab_size, activation=tf.nn.softmax)(decoder_2)
    
    # tie it together [image, sequence] [word]
    model = keras.Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=keras.optimizers.Adam())
    
    # summarize mode
    print(model.summary())
    
    plot_model(model, show_shapes=True)
    return model

In [18]:
import tensorflow as tf

# train the model
model = define_model(vocab_size, max_length)
epochs = 100
batch_size = 64
steps = len(train_descriptions) // batch_size

for i in range(epochs):
    # create the data generator
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length, batch_size)
    
    # fit for one epoch
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    
    # save the model
    model.save('model_' + str(i) + '.h5')

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 34)]         0           []                               
                                                                                                  
 input_1 (InputLayer)           [(None, 4096)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, 34, 256)      1940224     ['input_2[0][0]']                
                                                                                                  
 dropout (Dropout)              (None, 4096)         0           ['input_1[0][0]']                
                                                                                              

  model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)


 2/93 [..............................] - ETA: 51s - loss: 8.7075  

KeyboardInterrupt: 

In [15]:
import tensorflow as tf
model = tf.keras.models.load_model('model_494.h5')
epochs = 6
batch_size = 64
steps = len(train_descriptions) // batch_size

for i in range(epochs):
    # create the data generator
    generator = data_generator(train_descriptions, train_features, tokenizer, max_length, batch_size)
    
    # fit for one epoch
    model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)
    
    # save the model
    model.save('model_' + str(i + 495) + '.h5')

  model.fit_generator(generator, epochs=1, steps_per_epoch=steps, verbose=1)




In [15]:
def word_for_id(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

In [16]:
def generate_desc(model, tokenizer, photo, max_length):
    # seed the generation process
    in_text = "startseq"
    for i in range(max_length):
        # integer encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        
        # pad input
        sequence = pad_sequences([sequence], maxlen=max_length)
        
        # predict next word
        yhat = model.predict([photo, sequence], verbose=0)
        
        # convert proba to integer
        yhat = np.argmax(yhat)
        
        # map integer to word
        word = word_for_id(yhat, tokenizer)
        
        if word is None:
            break
           
        # append as input for generating the next word
        in_text += ' ' + word
        
        # we will stop if we predict the endseq
        if word == 'endseq':
            break
            
    return in_text

In [17]:
from nltk.translate.bleu_score import corpus_bleu

# evalute the skill of the mode
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
    actual, predicted = list(), list()
    # step over the whole set
    for key, desc_list in tqdm(descriptions.items()):
        # generate description
        yhat = generate_desc(model, tokenizer, photos[key], max_length)
        
        # store actual and predicted
        references = [d.split() for d in desc_list]
        actual.append(references)
        predicted.append(yhat.split())
        
    # calculate BLEU score
    print("BLEU-1: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0,0, 0)))
    print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print("BLEU-3: %f" % corpus_bleu(actual, predicted, weights=(0.256, 0.25, 0.25, 0.25)))

In [17]:
test_set_path = r"F:\image-caption-generator\Flickr8k_text\Flickr_8k.testImages.txt"
test = load_set(test_set_path)
print("Dataset: %d" % len(test))

# descriptions
test_descriptions = load_clean_descriptions(r"F:\image-caption-generator\workspace\descriptions.txt", test)
print("Descriptions: test=%d" % len(test_descriptions))

# photo features
test_features = load_photo_features(r"F:\image-caption-generator\workspace\features.pkl", test)
print('Photos: test=%d' % len(test_features))

# load the model which has minimum loss, in this case it was model_18
model_name = r"F:\image-caption-generator\workspace\model\model_500.h5"
model = keras.models.load_model(model_name)

# evalute the model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

Dataset: 1000
Descriptions: test=1000
Photos: test=1000


KeyboardInterrupt: 

In [18]:
feature_read = pickle.load(open(r"F:\image-caption-generator\workspace\features.pkl" ,'rb'))
import tensorflow as tf
model = tf.keras.models.load_model('model/model_500.h5')


In [19]:
from PIL import Image
import matplotlib.pyplot as plt

def visualize_prediction(test_path, num_photos_to_predict, test_set_path, test_desc_path, test_features_path):
    for i in range(num_photos_to_predict):
        rand_image_num = np.random.randint(0, len(os.listdir(test_path)))
        image_path = os.path.join(test_path, os.listdir(test_path)[rand_image_num])
        image_name = os.path.basename(image_path)
        image_name = image_name.split('.')[0]
        image = Image.open(image_path)
        
        test_set = load_set(test_set_path)
        test_descriptions = load_clean_descriptions(test_desc_path, test_set)
        test_features = load_photo_features(test_features_path, test_set)
        
        #captions = descriptions[image_name]
        
        print('---------------------Actual---------------------')
        #for caption in captions:
            #print(caption)
        
        # predict the caption
        y_pred = generate_desc(model, tokenizer, feature_read[image_name], max_length)
        
        print('--------------------Predicted--------------------')

        print(y_pred)
        plt.imshow(image)

In [20]:
from PIL import Image
test_path = r"C:\Users\srija\OneDrive\Desktop\images"
test_set_path = r"F:\image-caption-generator\Flickr8k_text\Flickr_8k.trainImages.txt"
test_desc_path = r"F:\image-caption-generator\workspace\descriptions.txt"
test_features_path = r"F:\image-caption-generator\workspace\features.pkl"

visualize_prediction(test_path, 1, test_set_path, test_desc_path, test_features_path)

---------------------Actual---------------------


KeyError: '1'