In [None]:
## import libraries
import numpy as np 
import pandas as pd 
import os 
import pickle 
import re 
import string
from keras.applications.vgg16 import VGG16 , preprocess_input
from keras.preprocessing.image import load_img ,img_to_array 
from keras.preprocessing.text import Tokenizer 
from keras.preprocessing.sequence import pad_sequences
from keras.models import Model
from keras.utils import to_categorical
from keras.layers import LSTM ,Input ,Embedding ,Dropout ,Dense
from keras.layers.merge import  add
from keras.utils.vis_utils import plot_model

In [None]:
### extract features from image 
## take directory of images return  dictionary with image_id>>feature
def extract_features(directory):
  features={}
  model=VGG16()
  model.layers.pop()
  model=Model(inputs=model.inputs,outputs=model.layers[-1].output)
  for name in os.listdir(directory):
    file_name=os.path.join(directory,name)
    img=load_img(file_name,target_size=(224,224))
    img_array=img_to_array(img)
    img_array=img_array.reshape((1,img_array.shape[0],img_array.shape[1],img_array.shape[2]))
    processed_img=preprocess_input(img_array)
    feature=model.predict(processed_img,verbose=0)
    image_id=name.split('.')[0]
    features[image_id]=feature
  return features  


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
img=load_img('/content/drive/My Drive/Flicker8k_Dataset/369360998_ba56fb436f.jpg')
img

In [None]:
# extract features from all images
directory = '/content/drive/My Drive/Flicker8k_Dataset'
features = extract_features(directory)
print('Extracted Features: %d' % len(features))
# save to file

In [None]:
pickle.dump(features, open('features.pkl', 'wb'))

In [None]:
## load text file 
def load_doc(doc):
  with open(doc,'r') as f:
    text=f.read()
  return text  

In [None]:
###load dexcription 
## return image_id >> list of descriptions for this image 
def load_description(doc):
  mapping={}
  for line in doc.split('\n'):
    tokens=line.split('\t')
    if len(line)<2:
      continue
    image_id,desc=tokens[0],tokens[1]
    image_id=image_id.split('.')[0]
    if image_id not in mapping:
      mapping[image_id]=list()
    mapping[image_id].append(desc)
  return mapping     


In [None]:
doc=load_doc('/content/drive/My Drive/Flickr8k/Flickr8k.token.txt')
descriptions=load_description(doc)

In [None]:
descriptions['2081446176_f97dc76951']

In [None]:
### clean description 
def clean_descriptions(description):
  re_punc=re.compile('[%s]' %re.escape(string.punctuation))
  for img_id,desc_list in description.items():
    for i in range(len(desc_list)):
      tokens=desc_list[i].split()
      tokens=[word.lower() for word in tokens]
      tokens=[re_punc.sub('',word) for word in tokens]
      tokens=[word for word in tokens if len(word)>1]
      tokens=[word for word in tokens if word.isalpha()]
      desc_list[i]=' '.join(tokens)

In [None]:
clean_descriptions(descriptions)

In [None]:
### load set 
##  return identifier 
def load_set(file_name):
  doc=load_doc(file_name)
  identifiers=[]
  for line in doc.split('\n'):
    if len(line)<1:
      continue
    image_id=  line.split('.')[0]
    identifiers.append(image_id)
  return set(identifiers)


In [None]:
def save_descriptions(descriptions,file_name):
  lines=[]
  for img_id ,desc_list in descriptions.items():
    for desc in desc_list:
      lines.append(img_id+' '+desc)
    data='\n'.join(lines)  
    with open(file_name,'w') as f:
      f.write(data)

In [None]:
save_descriptions(descriptions, 'descriptions.txt')

In [None]:
### load clean description 
## add start and end to sequence 
def load_clean_descriptions(filename,dataset):
  doc=doc=load_doc(filename)
  descriptions={}
  for line in doc.split('\n'):
    tokens=line.split()
    image_id,desc=tokens[0],tokens[1:]
    if image_id in dataset:
      if image_id not in descriptions:
        descriptions[image_id]=list()
      desc='startseq '+' '.join(desc)  +' endseq'
      descriptions[image_id].append(desc)
  return descriptions    


In [None]:
### get features of data set 
def load_photo_features(filename,dataset):
  all_feature=pickle.load(open(filename, 'rb'))
  feature={k:all_feature[k] for k in dataset  }
  return feature 

In [None]:
### convert descriptions to lines 
def to_lines(discriptions):
  all_desc=[]
  for k in discriptions.keys():
    [all_desc.append(d) for d in discriptions[k]]
  return all_desc

In [None]:
### create tokenizer
def create_tokenizer(descriptions):
  lines=to_lines(descriptions)
  tokenizer=Tokenizer()
  tokenizer.fit_on_texts(lines)
  return tokenizer


In [None]:
def max_length(descriptions):
  lines=to_lines(descriptions)
  return max([len(d.split())for d in lines])

In [None]:
### create seqiences image ,input sequence ,word


def create_sequences(tokenizer,max_length,descriptions,image_feature):
  vocab_size=len(tokenizer.word_index) +1
  x1,x2,y=list(),list(),list()
  for img_id,desc_list in descriptions.items():
    for desc in desc_list:
      tokens=tokenizer.texts_to_sequences([desc])[0]
      for i in range(1,len(tokens)):
        seq_in,seq_out=tokens[:i],tokens[i]
        seq_in=pad_sequences([seq_in],maxlen=max_length)[0]
        seq_out=to_categorical([seq_out],num_classes=vocab_size)[0]
        x1.append(image_feature[img_id].reshape((4096,)))
        x2.append(seq_in)
        y.append(seq_out)

  return  np.array(x1)  ,np.array(x2)   , np.array(y)  

In [None]:
#### define model 
def define_model(vocab_size, max_length):
  ### feature extractor model
  input_1=Input(shape=(4096,))
  fe1=Dropout(.5)(input_1)
  fe2=Dense(256,activation='relu')(fe1)

  #### sequence model
  input_2=Input(shape=(max_length,))
  se1=Embedding(vocab_size,256,mask_zero=True)(input_2)
  se2=Dropout(.5)(se1)
  se3=LSTM(256)(se2)


  ###decoder model
  de1=add([fe2,se3])
  de2=Dense(256,activation='relu')(de1)
  outputs =Dense(vocab_size,activation='softmax')(de2)

  model=Model(inputs=[input_1,input_2],outputs=outputs )
  model.compile(loss='categorical_crossentropy', optimizer='adam')
  return model

In [None]:
#### load traing data set
train=load_set('/content/drive/My Drive/Flickr8k/Flickr_8k.trainImages.txt')
train=list(train)
train_descriptions= load_clean_descriptions('/content/descriptions.txt',train[:3000])
train_features = load_photo_features('/content/features.pkl', train[:3000])
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)
# determine the maximum sequence length
max_length =34 
print('Description Length: %d' % max_length)


In [None]:
X1train, X2train, ytrain = create_sequences(tokenizer, max_length, train_descriptions, train_features)

In [None]:
X1train.shape

In [None]:
# load test set
filename = '/content/drive/My Drive/Flickr8k/Flickr_8k.devImages.txt'
test = load_set(filename)
test=list(test)
print('Dataset: %d' % len(test))
# descriptions
test_descriptions = load_clean_descriptions('descriptions.txt', test[:1000])
print('Descriptions: test=%d' % len(test_descriptions))
# photo features
test_features = load_photo_features('/content/features.pkl', test[:1000])
print('Photos: test=%d' % len(test_features))
# prepare sequences
X1test, X2test, ytest = create_sequences(tokenizer, max_length, test_descriptions, test_features)

In [None]:
X1test.shape

In [None]:
model = define_model(vocab_size, max_length)

In [None]:
from keras.callbacks import ModelCheckpoint
checkpoint = ModelCheckpoint('model.h5', monitor='val_loss', verbose=1, save_best_only=True, mode='min')

In [None]:
model.fit([X1train, X2train], ytrain, epochs=20, verbose=2, callbacks=[checkpoint], validation_data=([X1test, X2test], ytest))

In [None]:
### map int to word
def word_for_id(integer,tokenizier):
  for word,id in tokenizier.word_index.items():
    if id==integer:
      return word
  return None    



In [None]:
### generate sequence
def generate_desc(model, tokenizer, photo, max_length):
  in_text='startseq'
  for i in range(max_length):
    encodded=tokenizer.texts_to_sequences([in_text])[0]
    padded_encodded=pad_sequences([encodded],maxlen=max_length)
    prediction=model.predict([photo,padded_encodded],verbose=0)
    prediction=np.argmax(prediction)
    word=word_for_id(prediction,tokenizer)
    if word is None :
      break
    in_text= ' '+word
    if word== 'endseq' :
      break
  return  in_text  


In [None]:
### remove startseq and endseq from sequence
def cleanup_summary(summary):
  index=summary.find('startseq ')
  if index >-1:
    summary=summary[len('startseq '):]
  index=summary.find(' endseq')
  if index >-1:
    summary=summary[:index]  
  return  summary 




In [None]:
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
	actual, predicted = list(), list()
	# step over the whole set
	for key, desc_list in descriptions.items():
		# generate description
		yhat = generate_desc(model, tokenizer, photos[key], max_length)
		# clean up prediction
		yhat = cleanup_summary(yhat)
		# store actual and predicted
		references = [cleanup_summary(d).split() for d in desc_list]
		actual.append(references)
		predicted.append(yhat.split())
	# calculate BLEU score
	print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
	print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
	print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
	print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
from nltk.translate.bleu_score import corpus_bleu
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

In [None]:
pickle.dump(tokenizer, open('tokenizer.pkl', 'wb'))

In [None]:
# extract features from each photo in the directory
def extract_features(filename):
	# load the model
	model = VGG16()
	# re-structure the model
	model.layers.pop()
	model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
	# load the photo
	image = load_img(filename, target_size=(224, 224))
	# convert the image pixels to a numpy array
	image = img_to_array(image)
	# reshape data for the model
	image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
	# prepare the image for the VGG model
	image = preprocess_input(image)
	# get features
	feature = model.predict(image, verbose=0)
	return feature


In [None]:
# generate a description for an image
def generate_desc(model, tokenizer, photo, max_length):
	# seed the generation process
	in_text = 'startseq'
	# iterate over the whole length of the sequence
	for _ in range(max_length):
		# integer encode input sequence
		sequence = tokenizer.texts_to_sequences([in_text])[0]
		# pad input
		sequence = pad_sequences([sequence], maxlen=max_length)
		# predict next word
		yhat = model.predict([photo,sequence], verbose=0)
		# convert probability to integer
		yhat = np.argmax(yhat)
		# map integer to word
		word = word_for_id(yhat, tokenizer)
		# stop if we cannot map the word
		if word is None:
			break
		# append as input for generating the next word
		in_text += ' ' + word
		# stop if we predict the end of the sequence
		if word == 'endseq':
			break
	return in_text

In [None]:
photo = extract_features('/content/sample_data/example.jpg')

In [None]:
photo.shape

In [None]:
description = generate_desc(model, tokenizer, photo, max_length)

In [None]:
description

In [None]:
description = cleanup_summary(description)
print(description)