<a href="https://colab.research.google.com/github/mamunm/iamge_caption_generator/blob/main/notebooks/Flicker8_keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pickle import load, dump
import os 
from keras.applications.vgg16 import VGG16
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
from nltk.translate.bleu_score import corpus_bleu
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.vgg16 import preprocess_input
from numpy import array
import numpy as np
from tqdm import tqdm

In [None]:
# load doc into memory
def load_doc(filename):
	with open(filename, 'r') as f:
	    text = f.read()
	return text

# load a pre-defined list of photo identifiers
def load_set(filename):
	doc = load_doc(filename)
	dataset = list()
	for line in doc.split('\n'):
		if len(line) < 1:
			continue
		identifier = line.split('.')[0]
		dataset.append(identifier)
	return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
	doc = load_doc(filename)
	descriptions = dict()
	for line in doc.split('\n'):
		tokens = line.split()
		image_id, image_desc = tokens[0], tokens[1:]
		if image_id in dataset:
			if image_id not in descriptions:
				descriptions[image_id] = list()
			desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
			descriptions[image_id].append(desc)
	return descriptions

# load photo features
def load_photo_features(filename, dataset):
	all_features = load(open(filename, 'rb'))
	features = {k: all_features[k] for k in dataset}
	return features



In [None]:
# load training dataset (6K)
filename = os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K', 
    'Flickr8k_text/Flickr_8k.trainImages.txt')
train = load_set(filename)
print(f'Dataset: {len(train)}')
train_descriptions = load_clean_descriptions(os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K', 
    'descriptions.txt'), train)
print(f'Descriptions: train={len(train_descriptions)}')
train_features = load_photo_features(os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K',
    'features.pkl'), train)
print(f'Photos: train={len(train_features)}')

Dataset: 6000
Descriptions: train=6000
Photos: train=6000


In [None]:
# load test set
filename = os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K', 
    'Flickr8k_text/Flickr_8k.devImages.txt')
test = load_set(filename)
print(f'Dataset: {len(test)}')
test_descriptions = load_clean_descriptions(os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K', 
    'descriptions.txt'), test)
print(f'Descriptions: test={len(test_descriptions)}')
test_features = load_photo_features(os.path.join(
    'drive/MyDrive/image_captioning_data/Flickr8K', 
    'features.pkl'), test)
print(f'Photos: test={len(test_features)}') 

Dataset: 1000
Descriptions: test=1000
Photos: test=1000


In [None]:
# convert a dictionary of clean descriptions 
# to a list of descriptions
def to_lines(descriptions):
	all_desc = list()
	for key in descriptions.keys():
		[all_desc.append(d) for d in descriptions[key]]
	return all_desc

# fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
	lines = to_lines(descriptions)
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

In [None]:
# create sequences of images, input sequences and output 
# words for an image
def create_sequences(tokenizer, 
                     max_length, 
                     descriptions, 
                     photos, 
                     vocab_size):
	X1, X2, y = list(), list(), list()
	for key, desc_list in descriptions.items():
		for desc in desc_list:
			seq = tokenizer.texts_to_sequences([desc])[0]
			for i in range(1, len(seq)):
				in_seq, out_seq = seq[:i], seq[i]
				in_seq = pad_sequences([in_seq], 
                           maxlen=max_length)[0]
				out_seq = to_categorical([out_seq], 
                             num_classes=vocab_size)[0]
				X1.append(photos[key][0])
				X2.append(in_seq)
				y.append(out_seq)
	return array(X1), array(X2), array(y)

In [None]:
# calculate the length of the description with the most words
def max_length(descriptions):
	lines = to_lines(descriptions)
	return max(len(d.split()) for d in lines)

In [None]:
# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print(f'Vocabulary Size: {vocab_size}')
# determine the maximum sequence length
max_length = max_length(train_descriptions)
print(f'Description Length: {max_length}')

Vocabulary Size: 7579
Description Length: 34


In [None]:
# prepare train sequences
X1train, X2train, ytrain = create_sequences(tokenizer, 
                                            max_length, 
                                            train_descriptions, 
                                            train_features, 
                                            vocab_size)

In [None]:
# prepare test sequences
X1test, X2test, ytest = create_sequences(tokenizer, 
                                         max_length, 
                                         test_descriptions, 
                                         test_features, 
                                         vocab_size)

In [None]:
# define the captioning model
def define_model(vocab_size, max_length):
	inputs1 = Input(shape=(4096,))
	fe1 = Dropout(0.5)(inputs1)
	fe2 = Dense(256, activation='relu')(fe1)
	inputs2 = Input(shape=(max_length,))
	se1 = Embedding(vocab_size, 256, 
                 mask_zero=True)(inputs2)
	se2 = Dropout(0.5)(se1)
	se3 = LSTM(256)(se2)
	decoder1 = add([fe2, se3])
	decoder2 = Dense(256, 
                  activation='relu')(decoder1)
	outputs = Dense(vocab_size, 
                 activation='softmax')(decoder2)
	model = Model(inputs=[inputs1, inputs2], 
               outputs=outputs)
	model.compile(loss='categorical_crossentropy', 
               optimizer='adam')
	print(model.summary())
	plot_model(model, to_file=os.path.join(
          'drive/MyDrive/image_captioning_data/Flickr8K', 
          'model.png'), show_shapes=True)
	return model

In [None]:
# define the model
model = define_model(vocab_size, max_length)

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 34)]         0                                            
__________________________________________________________________________________________________
input_1 (InputLayer)            [(None, 4096)]       0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, 34, 256)      1940224     input_2[0][0]                    
__________________________________________________________________________________________________
dropout (Dropout)               (None, 4096)         0           input_1[0][0]                    
______________________________________________________________________________________________

In [None]:
# define checkpoint callback
filepath = os.path.join(
          'drive/MyDrive/image_captioning_data/Flickr8K', 
          'model-ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}.h5')
checkpoint = ModelCheckpoint(filepath, 
                             monitor='val_loss', 
                             verbose=1, 
                             save_best_only=True, 
                             mode='min')

In [None]:
# fit model
model.fit([X1train, X2train], 
          ytrain, 
          epochs=20, 
          verbose=2, 
          callbacks=[checkpoint], 
          validation_data=([X1test, X2test], 
                           ytest))

Epoch 1/20


KeyboardInterrupt: ignored

In [None]:
model = load_model(os.path.join('drive/MyDrive/image_captioning_data/Flickr8K', 
          'model-ep005-loss3.506-val_loss3.910.h5'))

In [None]:
# map an integer to a word
def word_for_id(integer, tokenizer):
	for word, index in tokenizer.word_index.items():
		if index == integer:
			return word
	return None
    
# generate a description for an image (Greedy) (try BEAM search later)
def generate_desc(model, tokenizer, photo, max_length):
	in_text = 'startseq'
	for i in range(max_length):
		sequence = tokenizer.texts_to_sequences([in_text])[0]
		sequence = pad_sequences([sequence], maxlen=max_length)
		yhat = model.predict([photo,sequence], verbose=0)
		yhat = np.argmax(yhat)
		word = word_for_id(yhat, tokenizer)
		if word is None:
			break
		in_text += ' ' + word
		if word == 'endseq':
			break
	return in_text

In [None]:
# evaluate the skill of the model
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
	actual, predicted = list(), list()
	for key, desc_list in descriptions.items():
		yhat = generate_desc(model, tokenizer, photos[key], max_length)
		references = [d.split() for d in desc_list]
		actual.append(references)
		predicted.append(yhat.split())
	# calculate BLEU score
	print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
	print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
	print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
	print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))
 

In [None]:
# evaluate model
evaluate_model(model, test_descriptions, test_features, tokenizer, max_length)

BLEU-1: 0.520917
BLEU-2: 0.264242
BLEU-3: 0.177287
BLEU-4: 0.079928


In [None]:
# save the tokenizer
dump(tokenizer, open(os.path.join('drive/MyDrive/image_captioning_data/Flickr8K', 
                                  'tokenizer.pkl'), 'wb'))

In [None]:
# extract image features from a collection of images in a directory
def extract_features(filename):
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    image = load_img(filename, target_size=(224, 224))
    image = img_to_array(image)
    image = image.reshape((1, *image.shape))
    image = preprocess_input(image)
    feature = model.predict(image, verbose=0)
    return feature

In [None]:
# load and prepare the photograph
photo = extract_features(os.path.join('drive/MyDrive/image_captioning_data/Flickr8K', 
                                      'example.jpg'))

In [None]:
# generate description
description = generate_desc(model, tokenizer, photo, max_length)
print(description)

startseq dog is running through the water endseq


In [None]:
# load and prepare the photograph
photo = extract_features(os.path.join('drive/MyDrive/image_captioning_data/Flickr8K', 
                                      'ex_1.jpg'))

# generate description
description = generate_desc(model, tokenizer, photo, max_length)
print(description)


startseq man in red shirt is standing on the sidewalk endseq


TODO

1. LSTM, RNN, attention
2. BEAM search
3. wordtovec, glove, ELMO
4. VGG16, InceptionV3
5. keras, pytorch, jax