In [3]:
import os
import pickle
import joblib
import numpy as np
from keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.saving import load_model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense, LSTM, Dropout, Embedding, add
import re
import contractions

In [None]:
#===========use the code or load directly from pickle from below cell
model = InceptionV3()
# restructure the model i.e. remove the last layer used for classification
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)

# preprocess image
directory = os.path.join('d:/evoastra/','images')
imagedict = {}
for img_name in tqdm(os.listdir(directory)):
    #load the image from file
    img_path = directory + "/" + img_name
    image = load_img(img_path,target_size=(299,299))
    #convert image pixels to numpy array
    image = img_to_array(image)
    #reshape data for model
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = image / 255
    # get image id
    image_id = img_name.split('.')[0]
    imagedict[image_id] = image

#extract features from image
features = {}
for img_array in tqdm(imagedict):
    #extract feature
    feature = model.predict(imagedict[img_array], verbose=0)
    #store the features
    features[img_array] = feature

pickle.dump(features,open('./feaatures.pkl','wb'))
#======================================

In [5]:
features = pickle.load(open('./feaatures.pkl','rb'))

In [None]:
#===========use the code or load directly from pickle from below cell
with open('./captions.txt','r') as f:
    next(f)
    captions_doc = f.read()

#create mapping of image to captions
mapping = {}
#process lines
for lines in tqdm(captions_doc.split('\n')):
    #split the line by comma
    tokens = lines.split(',')
    if len(tokens) < 2:
        continue
    image_id, caption = tokens[0], tokens[1:]
    #remove file extension from image_id
    image_id = image_id.split('.')[0]
    #convert caption list to string
    caption = ' '.join(caption)
    #create list if needed
    if image_id not in mapping:
        mapping[image_id] = []
    #store the caption
    mapping[image_id].append(caption)     

pickle.dump(mapping,open('mappings.pkl','wb'))
#======================================

In [7]:
mapping = pickle.load(open('mappings.pkl','rb'))

In [None]:
#===========use the code or load directly from pickle from below cell
def clean_text(mapping):
    for key,captions in tqdm(mapping.items()):
        for i in range(len(captions)):
            # tke one caption at a time
            caption = captions[i]
            # preprocessing steps
            # convert to lower case 
            caption = caption.lower()
            # expand contractions
            caption = contractions.fix(caption)
            # delete special charectors, digits and others
            #caption = caption.replace('[^a-z]','')
            caption = re.sub(r'[^a-z\s]','',caption)
            # replacing multiple space with one space
            #caption = caption.replace('\s+',' ')
            caption = re.sub(r'\s+',' ',caption)
            # remove one charector word
            caption = ' '.join([word for word in caption.split() if len(word)>1])
            # add start and end
            caption = 'start ' + caption + ' end'
            captions[i] = caption

cleaned_corpus = []
for key in mapping:
    for caption in mapping[key]:
        cleaned_corpus.append(caption)

# tokenize the text
t = Tokenizer()
t.fit_on_texts(cleaned_corpus)

vocab_size = len(t.word_index) + 1
maxlen = max(len(caption.split()) for caption in cleaned_corpus)

pickle.dump(t,open('tokenizer.pkl','wb'))

#Tokenize the captions
embedded_mapping ={}
for key,captions in mapping.items():
    for i in range(len(captions)):
        tokenized_caption = t.texts_to_sequences([captions[i]])[0]
        padded_tokenized_caption = pad_sequences([tokenized_caption],maxlen,padding='post')[0]    #0 to eliminate outside []
        #create list if needed
        if key not in embedded_mapping:
            embedded_mapping[key] = []
        #store the caption
        embedded_mapping[key].append(padded_tokenized_caption)

pickle.dump(embedded_mapping,open('./processed_captions.pkl','wb'))

caption_prop = {}
caption_prop['vocab size'] = vocab_size
caption_prop['max length'] = maxlen
pickle.dump(caption_prop,open('./captions_properties.pkl','wb'))
#======================================

In [11]:
t = pickle.load(open('tokenizer.pkl','rb'))
embedded_mapping = pickle.load(open('./processed_captions.pkl','rb'))
caption_prop = pickle.load(open('./captions_properties.pkl','rb'))
vocab_size = caption_prop['vocab size']
maxlen = caption_prop['max length']

In [13]:
#Sequence generation
image_ids = list(embedded_mapping.keys())

# create data generator to get data in batch
def data_generator(data_keys, embedded_mapping, features, max_length, vocab_size, batch_size):
    #data_keys will be image_ids
    #loop over images
    X1, X2, y = list(), list(), list()
    n = 0
    while 1:
        for key in data_keys:
            n=n+1
            sequences = embedded_mapping[key]
            #process each caption
            for seq in sequences:
                #split the sequence in X,y pairs
                for i in range(1,len(seq)):
                    #stop when padding starts
                    if seq[i]==0:
                        break
                    #split into input and output pairs
                    input_seq, output_seq = seq[:i], seq[i]
                    #pad input sequence
                    input_seq = pad_sequences([input_seq],maxlen=max_length,padding='post')[0]
                    #encode output sequence
                    output_seq = to_categorical(output_seq,num_classes=vocab_size)
                    #store the sequences
                    X1.append(features[key][0])
                    X2.append(input_seq)
                    y.append(output_seq)       
            if n == batch_size:
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                #yield [X1,X2], y
                yield (X1,X2), y
                X1, X2, y = list(), list(), list()
                n = 0


In [15]:
#Model creation
#encoder model
#image feature layer
inputs1 = Input(shape=(2048,))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256,activation='relu')(fe1)

#sequence feature layer
inputs2 = Input(shape=(maxlen,))
se1 = Embedding(vocab_size,256,mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

#decoder model
decoder1 = add([fe2,se3])
decoder2 = Dense(256,activation='relu')(decoder1)
outputs = Dense(vocab_size,activation='softmax')(decoder2)

model = Model(inputs=[inputs1,inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

# train test split
image_ids = list(embedded_mapping.keys())
split = int(len(image_ids) * .8)
train = image_ids[:split]
test = image_ids[split:]

#train the model
def train_model(model,epoch):
    epochs = epoch
    batch_size = 64
    steps = len(train) // batch_size
    
    for i in range(epochs):
        #create data generator
        generator = data_generator(train,embedded_mapping,features,maxlen,vocab_size,batch_size)
        #fit for each epoch
        model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)

train_model(model,1)
model.save('model.keras')

[1m101/101[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1250s[0m 12s/step - loss: 6.4276


In [None]:
 # run this code to retrain model till 13 epochs for getting .56 Bleu score
model = load_model("model.keras")
train_model(model,6)
model.save('model_7.keras')
model = load_model("model_7.keras")
train_model(model,6)
model.save('model_13.keras')       #final model

#### Proceed with testing and hyper parameter tuning iterations

## Testing code 

In [None]:
def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

#Generate caption for an image
def predict_caption(model, image, tokenizer, max_length):
    #add start tag
    in_text = 'start'
    #iterate over the max length of sequence
    for i in range(max_length):
        #encode the input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        #pad the sequence
        sequence = pad_sequences([sequence],max_length,padding='post')
        #predict next word
        yhat = loaded_model.predict([image, sequence], verbose=0)
        #get index with highest probability
        yhat = np.argmax(yhat)
        #convert index to word
        word = idx_to_word(yhat,tokenizer)
        #stop if word not found
        if word is None:
            break
        #append word as input for generating next word
        in_text += ' ' + word
        #stop if we have reached the end tag
        if word == 'end':
            break
    return in_text

from nltk.translate.bleu_score import corpus_bleu
# validate with test data
actual, predicted = list(), list()

for key in test:
    #get the actual caption
    captions = mapping[key]
    #predict the caption for image
    y_pred = predict_caption(loaded_model, features[key], t, maxlen)
    #split into words
    actual_caption = [caption.split() for caption in captions]
    y_pred = y_pred.split()
    #append the list
    actual.append(actual_caption)
    predicted.append(y_pred)

#calculate BLEU score
print('BLEU 1 : %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print('BLEU 2 : %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
