In [None]:
import numpy as np
from PIL import Image
import os
import string
import shutil
from keras.utils import to_categorical, plot_model

from pickle import dump, load # pickle is used to serialize and deserialized the object, i.e., dump and load respectively
from keras.applications.xception import Xception, preprocess_input
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
#from keras.layers.merge import add
from keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from keras.models import Model, load_model
from tqdm import tqdm_notebook as tqdm


In [None]:
os.makedirs('/root/.kaggle', exist_ok=True)
shutil.move('kaggle.json', '/root/.kaggle/kaggle.json')
os.chmod('/root/.kaggle/kaggle.json', 600)

In [None]:
!kaggle datasets download -d adityajn105/flickr8k

In [None]:
import zipfile
with zipfile.ZipFile('flickr8k.zip', 'r') as zip_ref:
  zip_ref.extractall('flickr8k')

In [None]:
os.listdir('flickr8k')

In [None]:
model = VGG16()
model = Model(inputs= model.inputs, outputs = model.layers[-2].output)
print(model.summary())



# **Extract the image features**

In [None]:
features ={}
directory = 'flickr8k/Images'
for img_name in tqdm(os.listdir(directory)):
  img_path = directory + '/' + img_name
  #print(img_path)
  image = load_img(img_path, target_size=(224,224))
  image = img_to_array(image)
  #print(image.shape)
  image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
  image = preprocess_input(image)
  feature =model.predict(image, verbose=0)
  img_id = img_name.split('.')[0]
  features[img_id] = feature

In [None]:
features['1000268201_693b08cb0e']

In [None]:
dump(features, open(os.path.join('/kaggle','features.pkl'), 'wb'))

In [None]:
with open('/kaggle/features.pkl', 'rb') as f:
  features = load(f)

In [None]:
with open('flickr8k/captions.txt', 'r') as f:
  next(f)
  capt_doc = f.read()

In [None]:
len(capt_doc)

In [None]:
capt_doc

In [None]:
mapping ={}
for line in tqdm(capt_doc.split('\n')):
  token = line.split(',')
  #print(len(line))
  #print('line.....',line)
  #print('token....', token)
  if len(line) < 2:
    continue
  img_id , caption = token[0], token[1:]
  # remove extension from image ID
  img_id = img_id.split('.')[0]
  #convert list into string
  caption = ' '.join(caption)
  #create caption list to string
  if img_id not in mapping:
    mapping[img_id] = []
  mapping[img_id].append(caption)


In [None]:
len(mapping)

# **Preprocessing text data**

In [None]:
def clean(mapping):
  for key, caption in mapping.items():
    for i in range(len(caption)):
      cap = caption[i]
      cap = cap.lower()
      #delete digits, punctations, special chara
      cap = cap.replace('[^A-Za-z]', '')
      # delete additional space
      cap = cap.replace('\s+', ' ')
      # add start and end tags to caption
      cap = 'startseq ' + " ".join([word for word in cap.split() if len(word)>1]) + ' endseq'
      caption[i] = cap


In [None]:
mapping['1000268201_693b08cb0e']

In [None]:
clean(mapping)

In [None]:
mapping['1000268201_693b08cb0e']

In [None]:
all_captions = []
for key in mapping:
  for caption in mapping[key]:
    all_captions.append(caption)

In [None]:
len(all_captions)

In [None]:
all_captions[:10]

Processing of text data

In [None]:
#tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index)+1

In [None]:
tokenizer.word_index

In [None]:
vocab_size

In [None]:
#get maximum length of caption available
max_len = max(len(cap.split()) for cap in all_captions)

In [None]:
max_len

Train Test Split

In [None]:
img_id = list(mapping.keys())
split = int(len(img_id)*0.90)
train = img_id[:split]
test = img_id[split:]

In [None]:
print(split)
print(len(img_id))

In [None]:
def data_generator(data_keys, mapping, features, tokenizer, max_len, vocab_size, batch_size):
  #loop over images

  while 1:
    X1, X2, y = list(), list(), list()
    n = 0
    for key in data_keys:
      #n +=1
      captions = mapping[key]
      for caption in captions:
        print('caption....',caption)
        seq = tokenizer.texts_to_sequences([caption])[0]
        print('seq....',seq)
        for i in range(1, len(seq)):
          #split the input and output pairs
          in_seq , out_seq = seq[:i], seq[i]
          print('in_seq...',in_seq)
          print('out_seq...', out_seq)
          in_seq = pad_sequences([in_seq], maxlen=max_len)[0]
          print('in_seq...',in_seq)
          # encode output sequences
          out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
          print('out_seq...', out_seq)
          X1.append(features[key][0])
          print('features[key][0]', features[key][0])
          X2.append(in_seq)
          y.append(out_seq)
          n +=1
          if n == batch_size:
            X1, X2, y = np.array(X1), np.array(X2), np.array(y)
            print(f"Shapes - X1: {X1.shape}, X2: {X2.shape}, y: {y.shape}")
            yield [X1, X2], y
            X1, X2, y = list(), list(), list()
            n = 0
    if n >0:
      print(f"Final batch - X1: {len(X1)}, X2: {len(X2)}, y: {len(y)}")
      X1, X2, y = np.array(X1), np.array(X2), np.array(y)
      print(f"Shapes - X1: {X1.shape}, X2: {X2.shape}, y: {y.shape}")
      print(f"Data types - X1: {X1.dtype}, X2: {X2.dtype}, y: {y.dtype}")
      yield [X1, X2], y
  #return print(f"Shapes - X1: {X1.shape}, X2: {X2.shape}, y: {y.shape}")


Model Creation

In [None]:
#encoder model
#image feature layers
input1 = Input(shape=(4096,))
fe1 = Dropout(0.4)(input1)
fe2 = Dense(256, activation='relu')(fe1)
# sequence feature layers
input2 = Input(shape=(max_len,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(input2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)
#decoder model
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[input1, input2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')
plot_model(model, show_shapes=True)

In [None]:
for key in features.keys():
    features[key] = features[key].reshape((1, 4096))

In [None]:
print('Dataset: ', len(train))
print('Descriptions: mapping=', len(mapping))
print('Photos: train=', len(features))
print('Vocabulary Size:', vocab_size)
print('Description Length: ', max_len)

In [None]:
#train the model
epoch = 20
batch_size = 32
steps = len(train)//batch_size
for i in range(epoch):
  generator = data_generator(train, mapping, features, tokenizer, max_len, vocab_size, batch_size)
  model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)

In [None]:
model.save('/best_model.h5')

Generate Captions for images

In [None]:
def idx_to_word(integer, tokenizer):
  for word, index in tokenizer.word_index.items():
    if index == integer:
      return word
  return None

**Convert the predicted index from model into a word**

In [None]:
def predict_caption(model, image, tokenizer, max_len):
  in_text = 'startseq'
  #iterate over the max length of sequence
  for i in range(max_len):
    sequence = tokenizer.texts_to_sequences([in_text])[0]
    #pad the sequence
    sequence = pad_sequences([sequence], max_len)
    # predict the next word
    yhat = model.predict([image, sequence], verbose=0)
    #get the index with high probability
    yhat = np.argmax(yhat)
    #print('yhat', yhat)
    #convert index to word
    word = idx_to_word(yhat, tokenizer)
    # stop if word not found
    if word is None:
      break
    # append word as input for generating next word
    in_text += " " + word
    #stop if we reach end tag
    if word == 'endseq':
      break
  return in_text

**Model Validation**

In [None]:
#now evaluting the data using BLEU score
from nltk.translate.bleu_score import corpus_bleu
#validate the test data
actual, predicted = list(), list()
if len(test) == 0:
  print("the test set is empty!")
else:
  for key in tqdm(test):
    #get actual captions
    captions = mapping.get(key, [])
    if not captions:
      print(f"No captions found for key: {key}")
      continue
  y_pred = predict_caption(model, features[key], tokenizer, max_len)
  #split into words
  actual_caption = [caption.split() for caption in captions]
  y_pred = y_pred.split()
  #append into list
  actual.append(actual_caption)
  predicted.append(y_pred)


In [None]:
len(actual)

In [None]:
print("BLEU-1: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))



In [None]:
from PIL import Image
import matplotlib.pyplot as plt
def generate_caption(img_name):
  # load the image
  # image_name = "1001773457_577c3a7d70.jpg"
  image_id = img_name.split('.')[0]
  img_path = os.path.join("flickr8k/Images", img_name)
  image = Image.open(img_path)
  captions = mapping[image_id]
  print("-----------------Actual-----------------")
  for caption in captions:
    print(caption)
  y_pred = predict_caption(model, features[image_id], tokenizer, max_len)
  print('-------------Predicted--------------')
  print(y_pred)
  plt.imshow(image)

In [None]:
generate_caption("1001773457_577c3a7d70.jpg")

In [None]:
generate_caption("1002674143_1b742ab4b8.jpg")


In [None]:
generate_caption("101669240_b2d3e7f17b.jpg")
