Import the necessary libraries

In [22]:
import string 
import numpy as np
from PIL import Image
import os
from pickle import dump, load
import numpy as np

from keras.applications.xception import Xception, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.layers import add
from keras.models import Model, load_model
from keras.layers import Input, Dense, LSTM, Embedding, Dropout

from tqdm.notebook import tqdm as tqdm



# Cleaning The Data

We are using flicker8k dataset which contails 8091 images

In [2]:
"""The class File contains
load_file: Reads the given text file and returns the text
img_captions: Creates a dictionary which maps a file name to 5 captions
data_cleaning: Removes punctuation, removes words containing numbers and turns all alphabets into lowercase
get_vocabulary: Returns a set of all the words used in the captions
store_description: Map each caption to the text file name and store it in a text file
"""
class FileHandling:
    def __init__(self):
        pass
    def load_file(self, file):
        file = open(file,"r")
        text = file.read()
        file.close()
        return text
    def img_captions(self, file_contents):
        img_captions = dict()
        for file_name_raw in file_contents.split('\n'):
            file_name = file_name_raw.split('\t')[0].split('#')[0]
            if file_name not in img_captions.keys():
                img_captions[file_name] = []
            img_captions[file_name].append(file_name_raw.split('\t')[-1])
        return img_captions
    def data_cleaning(self, captions):
        #create a dictionary of ascii values of all punctuation mapped to none values
        img_edited_captions = {}
        punctuation_none_map = str.maketrans('','',string.punctuation)
        for img_name,all_captions in captions.items():
            if img_name not in img_edited_captions.values():
                img_edited_captions[img_name] = []
            for caption in all_captions:
                #remove punctuation
                caption = caption.translate(punctuation_none_map)
                all_words = caption.split()
                #convert to lower case
                all_words = [word.lower() for word in all_words]
                #remove hanging 's and a'
                all_words = [word for word in all_words if len(word) > 1]
                #remove words containing numbers
                all_words = [word for word in all_words if word.isalpha()]
                caption_edited = ' '.join(all_words)
                img_edited_captions[img_name].append(caption_edited)
        return img_edited_captions
    def get_vocabulary(self, captions):
        vocab = set()
        for img in captions.keys():
            for caption in captions[img]:
                vocab.update(caption.split())
        return vocab
            
    def store_description(self, all_captions, file_name):
        export_text = []
        for img,captions in all_captions.items():
            for caption in captions:
                export_text.append("{}\t{}".format(img,caption))
        file = open(file_name,"w")
        file.write('\n'.join(export_text))
        file.close()

In [3]:
File = FileHandling()
#File.load_file('Flickr8k_Text/Flickr8k.token.txt')
file_contents = File.load_file('Flickr8k_Text/Flickr8k.token.txt')
descriptions = File.img_captions(file_contents)
img_captions = File.data_cleaning(descriptions)
vocabulary = File.get_vocabulary(img_captions)
File.store_description(img_captions,'descriptions.txt')

# Extracting Features From Each Image

In [4]:
#extract_feature: Given a directory, this function extracts all the features using Xception model
def extract_features(directory):
    model = Xception(include_top = False, pooling = 'avg')
    features = {}
    for img in tqdm(os.listdir(directory)):
        file_name = directory+'/'+img
        image = Image.open(file_name)
        image = image.resize((299,299))
        image = np.expand_dims(image,axis=0)
        image = image / 127.5
        image = image - 1.0
        feature = model.predict(image)
        features[img] = feature
    return features

In [5]:
img_dataset = 'Flicker8k_Dataset'
features = extract_features(img_dataset)

  0%|          | 0/8091 [00:00<?, ?it/s]









































































































In [6]:
dump(features, open("features.p","wb"))

# Getting Training Features

In [7]:
"""The TrainModel class contains all the necessary functions to train our model
load_photos: Returns the file names from a text file
clean_description: Returns a dictionary where the image names are maped to the captions
load_features: Loads the features for the given list of immages from the previously trained Xception model
"""
class TrainModel:
    def __init__(self):
        pass
    
    def load_photos(self,file_name):
        File = FileHandling()
        file = File.load_file(file_name)
        img_list = file.split('\n')[:-1]
        return img_list
    
    def clean_description(self,file_name,photos):
        File = FileHandling()
        file = File.load_file(file_name)
        descriptions = file.split('\n')
        clean_desc = {}
        for img_desc in descriptions:
            img = img_desc.split('\t')[0]
            if len(img_desc.split('\t')[-1].split()) < 1:
                continue
            if img in photos:
                if img not in clean_desc.keys():
                    clean_desc[img] = []
                clean_desc[img].append("<start> {} <end>".format(img_desc.split('\t')[-1]))
        return clean_desc
        
    def load_features(self,photos):
        all_features = load(open("features.p","rb"))
        features = {}
        for img in photos:
            features[img] = all_features[img]
        return features
                

In [8]:
train_model = TrainModel()
training_img = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train_imgs = train_model.load_photos(training_img)
train_descriptions = train_model.clean_description('descriptions.txt', train_imgs)
train_features = train_model.load_features(train_imgs)

# Tokenizing

Each of the words in the vocabulary are mapped to a special index and stored in a pickle file

In [9]:
"""
dict_list: Given a dictionary, converts the values into a list
create_tokens: maps each word to a unique index
"""
def dict_list(descriptions):
    all_desc = []
    for img in descriptions.keys():
        [all_desc.append(desc) for desc in descriptions[img]]
    return all_desc
def create_tokens(descriptions):
    desc_list = dict_list(descriptions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(desc_list)
    return tokenizer

In [10]:
tokens = create_tokens(train_descriptions)
dump(tokens, open('tokenizer.p', 'wb'))
vocab_size = len(tokens.word_index) + 1
print(vocab_size)

7577


In [11]:
#calculate the maximum length of descriptions
def max_length(descriptions):
    desc_list = dict_list(descriptions)
    return max(len(d.split()) for d in desc_list)
max_length = max_length(train_descriptions)
max_length

34

# Data Generator

In [12]:
def create_sequence(tokenizer, max_length, desc_list, feature):
    X1=list()
    X2=list()
    y=list()
    for desc in desc_list:
        #sequnce encodng
        seq = tokenizer.texts_to_sequences([desc])[0]
        for i in range(0,len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            #pad input sequence
            in_seq = pad_sequences([in_seq], maxlen = max_length)[0]
            #encode output sequence
            out_seq = to_categorical([out_seq],num_classes=vocab_size)[0]
            X1.append(feature)
            X2.append(in_seq)
            y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

def data_generator(descriptions, features, tokenizer, max_length):
    while 1:
        for key, desc_list in descriptions.items():
            feature = features[key][0]
            input_image, input_seq, output_word = create_sequence(tokenizer, max_length, desc_list, feature)
            yield [[input_image, input_seq], output_word]
            
[a,b],c = next(data_generator(train_descriptions, features,tokens, vocab_size))
print(a.shape,b.shape,c.shape)

(52, 2048) (52, 7577) (52, 7577)


# CNN RNN model

In [20]:
def define_model(vocab_size, max_length):
    #Features changed from CNN model 
    inputs1 = Input(shape=(2048,))
    fel = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation = 'relu')(fel)
    
    # LSTM sequence model
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size,256,mask_zero=True)(inputs2)
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)
    
    #Merging models
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation='relu')(decoder1)
    outputs = Dense(vocab_size, activation='softmax')(decoder2)
    
    model = Model(inputs = [inputs1,inputs2], outputs = outputs)
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam')
    
    #summarize
    print(model.summary())
    plot_model(model, to_file='model.png', show_shapes = True)
    return model

# Training

In [25]:
print('Dataset: ',len(train_imgs))
print('Training descriptions: ',len(train_descriptions))
print('Training photos: ',len(train_features))
print('Vocabulary size: ',vocab_size)
print('Description length: ', max_length)

model = define_model(vocab_size,max_length)
epochs = 10
steps = len(train_descriptions)
for i in range(epochs):
    generator = data_generator(train_descriptions, train_features, tokens, max_length)
    model.fit_generator(generator, epochs=1, steps_per_epoch = steps, verbose =1)
    model.save("models/model_{}.h5".format(i))

Dataset:  6000
Training descriptions:  6000
Training photos:  6000
Vocabulary size:  7577
Description length:  34
Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_11 (InputLayer)       [(None, 34)]                 0         []                            
                                                                                                  
 input_10 (InputLayer)       [(None, 2048)]               0         []                            
                                                                                                  
 embedding_3 (Embedding)     (None, 34, 256)              1939712   ['input_11[0][0]']            
                                                                                                  
 dropout_7 (Dropout)         (None, 2048)                 0         ['input_1

  model.fit_generator(generator, epochs=1, steps_per_epoch = steps, verbose =1)


   2/6000 [..............................] - ETA: 7:07 - loss: 3.3837

  saving_api.save_model(


