In [0]:
from os import listdir
import string
from pickle import dump
from numpy import argmax, array
from pickle import load
from itertools import islice

from keras.applications.vgg16 import VGG16
from keras.applications.vgg19 import VGG19
from keras.applications.resnet50 import ResNet50
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_resnet_v2 import InceptionResNetV2

from keras.preprocessing.image import load_img, img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.models import Model
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from keras.layers import Input, GlobalAveragePooling2D, Dense, Dropout, LSTM, Embedding
from keras.utils import to_categorical, plot_model
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint
from nltk.translate.bleu_score import corpus_bleu

In [0]:
# feature extraction
def feature_extraction(directory):
	model = VGG16()
 	# model = VGG19()
	# model = InceptionV3()
	# model = ResNet50()
	# model = InceptionResNetV2()
	# feature_extractor = InceptionResNetV2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
	# model = feature_extractor.output
	# model = GlobalAveragePooling2D()(model)
	# model = Dropout(0.5)(model)
	# model = Dense(4096, activation="relu")(model)
	# model = Dropout(0.5)(model)
	# model = Dense(4096, activation="relu")(model)

	model.layers.pop()
	model = Model(inputs=model.inputs, outputs=model.layers[-1].output)

	features = dict()
	for name in listdir(directory):
		img = load_img(directory + '/' + name, target_size=(224, 224))
		img = img_to_array(img)
		img = img.reshape((1, img.shape[0], img.shape[1], img.shape[2]))
		img_id = name.split('.')[0]
		features[img_id] = model.predict(preprocess_input(img), verbose=0)
	return features


In [0]:
# extracting features and dumping the features into a pickle file
directory = 'Flicker8k_Dataset'
features = feature_extraction(directory)
print('Number of Features Extracted:', len(features))
dump(features, open('features.pkl', 'wb'))

In [0]:
#extracting and cleaning up the descriptions
def load_file(name):
	f = open(name, 'r')
	txt = f.read()
	f.close()
	return txt


def extract_descriptions(file):
	d_map = {}
	for r in file.split('\n'):
		word = r.split()
		if len(r) < 2:
			continue
		img_id, img_desc = word[0], word[1:]
		img_id = img_id.split('.')[0]
		img_desc = ' '.join(img_desc)
		if img_id not in d_map:
			d_map[img_id] = []
		d_map[img_id].append(img_desc)
	return d_map

def get_vocab(text):
	doc = set()
	for k in text.keys():
		[doc.update(txt.split()) for txt in text[k]]
	return doc

def proc_descriptions(text):
	lemma = str.maketrans('', '', string.punctuation)
	for key, desc_list in text.items():
		for word in range(len(desc_list)):
			desc = desc_list[word].split()
			desc = [w for w in desc if len(w)>1]
			desc = [w.lower() for w in desc]
			desc = [w for w in desc if w.isalpha()]
			desc = [w.translate(lemma) for w in desc]
			desc_list[word] =  ' '.join(desc)


def save_desc(text, name):
	row = []
	for k, txt in text.items():
		for word in txt:
			row.append(k + ' ' + word)
	lemma = '\n'.join(row)
	f = open(name, 'w')
	f.write(lemma)
	f.close()

In [0]:
tokenfile = 'Flickr8k_text/Flickr8k.token.txt'
# loading token file
tokens = load_file(tokenfile)
# extracting descriptions
descriptions = extract_descriptions(tokens)
print('Loaded Descriptions:', len(descriptions))
# processing
proc_descriptions(descriptions)
# creating vocabulary
vocab = get_vocab(descriptions)
print('Size of Vocabulary:', len(vocab))
# saving descriptions file
save_desc(descriptions, 'descriptions.txt')

In [0]:
# view description sample
def desc_sample(n, items):
    return list(islice(items, n))
desc_sample(1, descriptions.items())

In [0]:
# load the dataset and then creating list of descriptions
def dataset_load(name):
	dataset = []
	data = load_file(name)
	for row in data.split('\n'):
		if len(row) < 1:
			continue
		dataset.append(row.split('.')[0])
	return set(dataset)

def desc_load(name, dataset):
	data = load_file(name)
	captions = {}
	for row in data.split('\n'):
		words = row.split()
		img_id, img_desc = words[0], words[1:]
		if img_id in dataset:
			if img_id not in captions:
				captions[img_id] = list()
			captions[img_id].append('capstart ' + ' '.join(img_desc) + ' capend')
	return captions

def img_desc(name, dataset):
	lemma = load(open(name, 'rb'))
	features = {data: lemma[data] for data in dataset}
	return features

def list_desc(captions):
	lemma = []
	for word in captions.keys():
		[lemma.append(token) for token in captions[word]]
	return lemma

def tokenize(captions):
	token = Tokenizer()
	token.fit_on_texts(list_desc(captions))
	return token

def max_length(captions):
	rows = list_desc(captions)
	return max(len(row.split()) for row in rows)

In [0]:
def caption_generator(token, max_length, captions, images, word_count):
	first, second, result = [], [], []
	for key, value in captions.items():
		for val in value:
			cap = token.texts_to_sequences([val])[0]
			for word in range(1, len(cap)):
				ins, outs = cap[:word], cap[word]
				ins = pad_sequences([ins], maxlen=max_length)[0]
				outs = to_categorical([outs], num_classes=word_count)[0]
				first.append(images[key][0])
				second.append(ins)
				result.append(outs)
	return array(first), array(second), array(result)

def training_model(word_count, max_length):
	feature0 = Input(shape=(4096,))
	feature1 = Dropout(0.5)(feature0)
	feature2 = Dense(256, activation='relu')(feature1)
	# feature1 = Dropout(0.25)(feature0)
	# feature1 = Dropout(0.75)(feature0)
	# feature1 = Dropout(0.87)(feature0)
	# feature1 = Dropout(1)(feature0)
	# feature2 = Dense(256, activation='relu')(feature2)
	inputs2 = Input(shape=(max_length,))
	sequence1 = Embedding(word_count, 256, mask_zero=True)(inputs2)
	sequence2 = Dropout(0.5)(sequence1)
	sequence3 = LSTM(256)(sequence2)
	merge1 = add([feature2, sequence3])
	merge2 = Dense(256, activation='relu')(merge1)
	outputs = Dense(word_count, activation='softmax')(merge2)
	model = Model(inputs=[feature0, inputs2], outputs=outputs)
	model.compile(loss='categorical_crossentropy', optimizer='adam')
	plot_model(model, to_file='model.png', show_shapes=True)
	return model

In [0]:
# training
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = dataset_load(filename)
print('Training Dataset:', len(train))
train_desc = desc_load('descriptions.txt', train)
print('Training Descriptions:', len(train_desc))
train_features = img_desc('features.pkl', train)
print('Training Images: ', len(train_features))
token = tokenize(train_desc)
word_count = len(token.word_index) + 1
print('Total number of Vocabulary words:', word_count)
max_length = max_length(train_desc)
X1train, X2train, ytrain = caption_generator(token, max_length, train_desc, train_features, word_count)

# development
filename = 'Flickr8k_text/Flickr_8k.devImages.txt'
dev = dataset_load(filename)
print('Development Dataset:', len(dev))
dev_desc = desc_load('descriptions.txt', dev)
print('Development Descriptions:', len(dev_desc))
dev_features = img_desc('features.pkl', dev)
print('Development Images:', len(dev_features))
X1dev, X2dev, ydev = caption_generator(token, max_length, dev_desc, dev_features, word_count)

# Fitting the model on the training model
model = training_model(word_count, max_length)
filepath = 'model-ep{epoch:03d}.h5'
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=2, save_best_only=True, mode='min')
history = model.fit([X1train, X2train], ytrain, epochs=10, verbose=1, callbacks=[checkpoint], validation_data=([X1dev, X2dev], ydev))
dump(history.history, open('history.pkl', 'wb'))

In [0]:
history = load(open('history.pkl', 'rb'))
history['val_loss']

In [0]:
def tokenize_word(result, token):
	for tok, value in token.word_index.items():
		if value == result:
			return tok
	return None

# image caption generator
def final_caption_generator(model, token, image, max_length):
	start = 'capstart'
	for _ in range(max_length):
		sequence = token.texts_to_sequences([start])[0]
		sequence = pad_sequences([sequence], maxlen=max_length)
		yhat = argmax(model.predict([image,sequence], verbose=0))
		word = tokenize_word(yhat, token)
		if word is None:
			break
		start += ' ' + word
		if word == 'capend':
			break
	return start

# get BLEU Scores
def bleu_scores(model, caption, images, token, max_length):
	actual, predicted = [], []
	for key, value in caption.items():
		pred = final_caption_generator(model, token, images[key], max_length)
		lemma = [val.split() for val in value]
		actual.append(lemma)
		predicted.append(pred.split())
	print('BLEU-1:', corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
	print('BLEU-2:', corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
	print('BLEU-3:', corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
	print('BLEU-4:', corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [0]:
# again create token for training
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = dataset_load(filename)
print('Training Dataset:', len(train))
train_desc = desc_load('descriptions.txt', train)
print('Training Descriptions:', len(train_desc))
token = tokenize(train_desc)
word_count = len(token.word_index) + 1
print('Total number of Vocabulary words:', word_count)
max_length = max_length(train_desc)

In [0]:
#testing
filename = 'Flickr8k_text/Flickr_8k.testImages.txt'
test = dataset_load(filename)
print('Testing Dataset:', len(test))
test_desc = desc_load('descriptions.txt', test)
print('Testing Descriptions:', len(test_desc))
test_features = img_desc('features.pkl', test)
print('Testing Images:', len(test_features))

In [0]:
#getting bleu scores for the best model
model = load_model('model-ep002.h5')
bleu_scores(model, test_desc, test_features, token, max_length)

In [0]:
# generating captions for one test image
filename = 'Flickr8k_text/Flickr_8k.trainImages.txt'
train = dataset_load(filename)
print('Training Dataset:', len(train))
train_desc = desc_load('descriptions.txt', train)
print('Training Descriptions:', len(train_desc))
token = tokenize(train_desc)
# save the tokenizer
dump(token, open('token.pkl', 'wb'))

In [0]:
# extracting features from one test image
def extract_features_of_one(name):
	model = VGG16()
	# model = VGG19()
	# model = InceptionV3()
	# model = ResNet50()
	# model = InceptionResNetV2()
	# feature_extractor = InceptionResNetV2(weights="imagenet", include_top=False, input_tensor=Input(shape=(224, 224, 3)))
	# model = feature_extractor.output
	# model = GlobalAveragePooling2D()(model)
	# model = Dropout(0.5)(model)
	# model = Dense(4096, activation="relu")(model)
	# model = Dropout(0.5)(model)
	# model = Dense(4096, activation="relu")(model)

	model.layers.pop()
	model = Model(inputs=model.inputs, outputs=model.layers[-1].output)

	img = img_to_array(load_img(name, target_size=(224, 224)))
	img = img.reshape((1, img.shape[0], img.shape[1], img.shape[2]))
	img = preprocess_input(img)
	feature = model.predict(img, verbose=0)
	return feature

In [0]:
token = load(open('token.pkl', 'rb'))
max_length = max_length(train_desc)
model = load_model('model-ep002.h5')

In [0]:
# load and prepare the photograph
image = extract_features_of_one('test_image.jpg')
description = final_caption_generator(model, token, image, max_length)
print(description.split(' ', 1)[1].rsplit(' ', 1)[0] )