# CSCIS-89 Introduction to Deep Learning

# Final Project: "Image Captioning" 


# Nikolas Papadopoulos

----------------------------------------

References:

- https://www.tensorflow.org/text/guide/word_embeddings
- https://keras.io/examples/nlp/pretrained_word_embeddings/
- https://www.analyticsvidhya.com/blog/2020/11/create-your-own-image-caption-generator-using-keras/
- https://medium.com/@raman.shinde15/image-captioning-with-flickr8k-dataset-bleu-4bcba0b52926


In [None]:
from google.colab import drive

drive.mount('/content/drive')

# Load Flickr8k dataset & GloVe embeddings in Drive

### Flickr8k dataset

In [None]:
!cd "drive/My Drive/IntroDL/Final Project"

In [None]:
!wget -P "drive/My Drive/IntroDL/Final Project" https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip

In [None]:
!wget -P "drive/My Drive/IntroDL/Final Project" https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip

In [None]:
!mkdir "drive/My Drive/IntroDL/Final Project/Flickr8k_Dataset"
!unzip "drive/My Drive/IntroDL/Final Project/Flickr8k_Dataset.zip" -d "drive/My Drive/IntroDL/Final Project/Flickr8k_Dataset"

In [None]:
!find "drive/My Drive/IntroDL/Final Project/Flickr8k_Dataset/Flicker8k_Dataset" -type f | sed -e 's/.*\.//' | sort | uniq -c | sort -n | grep -Ei '(tiff|bmp|jpeg|jpg|png|gif)$'

   8091 jpg


In [None]:
!mkdir "drive/My Drive/IntroDL/Final Project/Flickr8k_text"
!unzip -qq "drive/My Drive/IntroDL/Final Project/Flickr8k_text.zip" -d "drive/My Drive/IntroDL/Final Project/Flickr8k_text"

In [None]:
!cat "drive/My Drive/IntroDL/Final Project/Flickr8k_text/Flickr8k.lemma.token.txt"

### GloVe embeddings

In [None]:
!wget -P "drive/My Drive/IntroDL/Final Project" http://nlp.stanford.edu/data/glove.6B.zip

In [None]:
!unzip -qq '/content/drive/MyDrive/IntroDL/Final Project/glove.6B.zip' -d '/content/drive/MyDrive/IntroDL/Final Project/GloVe'

# Data Preprocessing

### Import libraries

In [None]:
import numpy as np  
import pandas as pd  
import os
import pickle
import tensorflow as tf
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from keras.preprocessing.image import load_img, img_to_array
from keras.models import Model
from keras.models import load_model
from keras.layers import Flatten, Dense, LSTM, Dropout, Embedding, Activation
from keras.layers import concatenate, BatchNormalization, Input
from keras.layers.merge import add
from tensorflow.keras.utils import to_categorical, plot_model
from keras.applications.inception_v3 import InceptionV3, preprocess_input
import matplotlib.pyplot as plt 
import cv2

from numpy import array
import matplotlib.pyplot as plt
# %matplotlib inline

import string
# import os
# import glob
# from PIL import Image
# from time import time

from keras import Input, layers
from keras import optimizers
from keras.optimizers import Adam
from keras.preprocessing import sequence
from keras.preprocessing import image
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.layers import LSTM, Embedding, Dense, Activation, Flatten, Reshape, Dropout
from keras.layers.wrappers import Bidirectional
from keras.layers.merge import add
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input
from keras.models import Model
from tensorflow.keras.utils import to_categorical

### Define directories

In [None]:
token_path = 'drive/My Drive/IntroDL/Final Project/Flickr8k_text/Flickr8k.token.txt'
train_images_path = 'drive/My Drive/IntroDL/Final Project/Flickr8k_text/Flickr_8k.trainImages.txt'
test_images_path = 'drive/My Drive/IntroDL/Final Project/Flickr8k_text/Flickr_8k.testImages.txt'
images_path = 'drive/My Drive/IntroDL/Final Project/Flickr8k_Dataset/Flicker8k_Dataset/'
glove_path = '/content/drive/MyDrive/IntroDL/Final Project/GloVe/'

### Create vocabulary and text cleaning

In [None]:
'''
Create a dictionary that stores as key the image_id and 
as values the 5 descriptions for each image
'''
def load_description(doc):
  mapping = dict()
  for line in doc.split('\n'):
    token = line.split('\t')
    if len(line) > 2: # remove short descriptions
      image_id = token[0].split('.')[0]
      image_description = token[1].split('.')[0]
      if image_id not in mapping:
        mapping[image_id] = list()
      mapping[image_id].append(image_description)
  return mapping


'''
Clean the text of descriptions by removing special characters 
(eg. symbols and numbers) and uncapitalizing text
'''
def clean_description(desc):
  for key, desc_list in desc.items():
    for i in range(len(desc_list)):
      caption = desc_list[i]
      # remove punctuation
      caption = [ch for ch in caption if ch not in string.punctuation] 
      caption = ''.join(caption)
      caption = caption.split(' ')
      # uncapitalize words
      caption = [word.lower() for word in caption if word.isalpha()] 
      caption = ' '.join(caption)
      desc_list[i] = caption 


'''
Create a vocabulary of all unique words existing in our corpus
'''
def create_vocabulary(desc):
	vocabulary = set()
	for key in desc.keys():
		for line in desc[key]:
			vocabulary.update(line.split())
	return vocabulary

'''
Print images from the dataset using the image id
'''
def print_image(id):
  x = plt.imread(images_path + id + '.jpg')
  plt.imshow(x)
  plt.show()


In [None]:
print('\n **** Plot an example image: **** \n')

print_image('1000268201_693b08cb0e')

print('\n=================\n')

print('\n **** Print a part of the corpus: **** \n')

corpus = open(token_path,'r').read()
print(corpus[:410])

print('\n=================\n')

print('\n **** Print descriptions of an image after the creation of the dictionary: **** \n')

descriptions = load_description(corpus)
for i in descriptions['1000268201_693b08cb0e']:
  print(i)

print('\n=================\n')

print('\n **** Print descriptions of the same image after cleaning the text: **** \n')

clean_description(descriptions)
for i in descriptions['1000268201_693b08cb0e']:
  print(i)

print('\n=================\n')

print('\n **** Print vocabulary size: **** \n')

vocabulary = create_vocabulary(descriptions)
print('We have {} words in our vocabulary'.format(len(vocabulary)))

### Create a list of ids of all train and test images

In [None]:
'''
Create a list of ids of all train and test images and 
then create an empty dictionary and map the images 
to their descriptions using image id as key and 
a list of descriptions as its value.
'''

train_images = open(train_images_path, 'r', encoding = 'utf-8').read().split("\n")
train_images = [image.split('.')[0] for image in train_images] # throw the '.jpg' suffix from each image id
train_images = train_images[:-1] # throw the last element which is an empty string

test_images = open(test_images_path, 'r', encoding = 'utf-8').read().split("\n")
test_images = [image.split('.')[0] for image in test_images] # throw the '.jpg' suffix from each image id
test_images = test_images[:-1] # throw the last element which is an empty string

print('There are {} train images and {} test images'.format(len(train_images), len(test_images)))


def load_clean_descriptions(des, dataset):
    dataset_des = dict()
    for key, des_list in des.items():
        if key  in dataset:
            if key not in dataset_des:
                dataset_des[key] = list()
            for line in des_list:
                desc = 'startseq ' + line + ' endseq' # add unique words at the beginning and end
                dataset_des[key].append(desc)
    return dataset_des


train_descriptions = load_clean_descriptions(descriptions, train_images)
test_descriptions = load_clean_descriptions(descriptions, test_images)

print(train_descriptions['2205328215_3ffc094cde'])
print(test_descriptions['2447284966_d6bbdb4b6e'])

### Preprocess caption before Glove embedding extraction

In [None]:
# list of all training captions
all_train_captions = []
for key, val in train_descriptions.items():
    for caption in val:
        all_train_captions.append(caption)

# find the maximum length of a description in a dataset
max_length = max(len(des.split()) for des in all_train_captions)

In [None]:
print(len(all_train_captions), max_length)

In [None]:
# Consider only words which occur at least 10 times

threshold = 10 # you can change this value according to your need
word_counts = {}
for cap in all_train_captions:
    for word in cap.split(' '):
        word_counts[word] = word_counts.get(word, 0) + 1
  
voc = [word for word in word_counts if word_counts[word] >= threshold]

print('Vocabulary = %d' % (len(voc)))

In [None]:
# word mapping to integers
ixtoword = {}
wordtoix = {}
  
ix = 1
for word in voc:
    wordtoix[word] = ix
    ixtoword[ix] = word
    ix += 1

vocab_size = len(ixtoword) + 1

In [None]:
# find the maximum length of a description in a dataset
max_length = max(len(des.split()) for des in all_train_captions)
print(max_length)

In [None]:
embeddings_index = {}
with open(glove_path + 'glove.6B.200d.txt') as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print("Found {} word vectors.".format(len(embeddings_index)))

In [None]:
embedding_dim = 200
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for word, i in wordtoix.items():
    emb_vec = embeddings_index.get(word)
    if emb_vec is not None:
        embedding_matrix[i] = emb_vec
print(embedding_matrix.shape)

### Load the feature vector from all images

Load feature vector from pickle files

In [None]:
train_file = open("drive/My Drive/IntroDL/Final Project/encoding_train.pkl", "rb")
encoding_train = pickle.load(train_file)

test_file = open("drive/My Drive/IntroDL/Final Project/encoding_test.pkl", "rb")
encoding_test = pickle.load(test_file)

### Extract the feature vector from all images

Uncomment in case you want to extract feature vectors for an image dataset. Keep in mind that for Flickr8k data, which has 8000 images, feature extraction lasted almost an hour using Colab's GPU

In [None]:
# def preprocess_img(img_path):
#     # inception v3 accept images of size 299 * 299 * 3
#     img = load_img(img_path, target_size = (299, 299))
#     x = img_to_array(img)
#     x = np.expand_dims(x, axis = 0) # Add one more dimension
#     x = preprocess_input(x)
#     return x
  
# def encode(image):
#     image = preprocess_img(images_path + image + '.jpg')
#     vec = model.predict(image)
#     vec = np.reshape(vec, (vec.shape[1]))
#     return vec
  
# base_model = InceptionV3(weights = 'imagenet')
# model = Model(base_model.input, base_model.layers[-2].output)

# # Run the encode function on all train and test images
# # and store the feature vectors in a list
# encoding_train = {}
# for img in train_images:
#   encoding_train[img] = encode(img)

# encoding_test = {}
# for img in test_images:
#     encoding_test[img] = encode(img)

# # save the file 
# pickle.dump(encoding_train, open("drive/My Drive/IntroDL/Final Project/encoding_train.pkl","wb"))
# # save the file 
# pickle.dump(encoding_test, open("drive/My Drive/IntroDL/Final Project/encoding_test.pkl","wb"))

# Define and train models

### Uncomment this section in case you need to train a model.

### Define the Model

We are creating a Merge model where we combine the image vector and the partial caption. Therefore our model will have 3 major steps:

1. Processing the sequence from the text
2. Extracting the feature vector from the image
3. Decoding the output using softmax by concatenating the above two layers

In [None]:
# # Define the model
# input1 = Input(shape = (2048, ))
# feature1 = Dropout(0.2)(input1)
# feature2 = Dense(256, activation = 'relu')(feature1)

# input2 = Input(shape = (max_length, ))
# sequence1 = Embedding(vocab_size, embedding_dim, mask_zero = True)(input2)
# sequence2 = Dropout(0.2)(sequence1)
# sequence3 = LSTM(256)(sequence2)

# decoder1 = add([feature2, sequence3])
# decoder2 = Dense(256, activation = 'relu')(decoder1)
# outputs = Dense(vocab_size, activation = 'softmax')(decoder2)

# model = Model(inputs = [input1, input2], outputs = outputs)
# model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 37)]         0                                            
__________________________________________________________________________________________________
input_1 (InputLayer)            [(None, 2048)]       0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, 37, 200)      330800      input_2[0][0]                    
__________________________________________________________________________________________________
dropout (Dropout)               (None, 2048)         0           input_1[0][0]                    
______________________________________________________________________________________________

### Model training

In [None]:
# model.layers[2].set_weights([embedding_matrix])
# model.layers[2].trainable = False
# model.compile(loss = 'categorical_crossentropy', optimizer = 'adam')

In [None]:
# def data_generator(descriptions, photos, wordtoix, max_length, num_photos_per_batch):
#     X1, X2, y = list(), list(), list()
#     n = 0
#     # loop for ever over images
#     while 1:
#         for key, desc_list in descriptions.items():
#             n += 1
#             # retrieve the photo feature
#             photo = photos[key]
#             for desc in desc_list:
#                 # encode the sequence
#                 seq = [wordtoix[word] for word in desc.split(' ') if word in wordtoix]
#                 # split one sequence into multiple X, y pairs
#                 for i in range(1, len(seq)):
#                     # split into input and output pair
#                     in_seq, out_seq = seq[:i], seq[i]
#                     # pad input sequence
#                     in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
#                     # encode output sequence
#                     out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
#                     # store
#                     X1.append(photo)
#                     X2.append(in_seq)
#                     y.append(out_seq)

#             if n == num_photos_per_batch:
#                 yield ([array(X1), array(X2)], array(y))
#                 X1, X2, y = list(), list(), list()
#                 n=0

In [None]:
# epochs = 30
# batch_size = 128
# steps = len(train_descriptions) // batch_size
# train_features = encoding_train

# generator = data_generator(train_descriptions, train_features, wordtoix, max_length, batch_size)
# history = model.fit(generator, epochs=epochs, steps_per_epoch=steps, verbose=1)

# model.save('drive/My Drive/IntroDL/Final Project/model_{}.h5'.format(epochs))  # creates a HDF5 file with the model

# Load already trained models

### Load weights from already trained models

In [None]:
epochs = 30
model = load_model('drive/My Drive/IntroDL/Final Project/model_{}.h5'.format(epochs))

In [None]:
history_file = open("drive/My Drive/IntroDL/Final Project/history_model_{}.pkl".format(epochs), "rb")
history = pickle.load(history_file)

loss = history['loss']

epoch_range = range(len(loss))

plt.plot(epoch_range, loss, label='Training loss')
plt.title('Training loss')
plt.legend()

plt.show()

# Generate captions

In [None]:
def greedySearch(photo):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = [wordtoix[w] for w in in_text.split() if w in wordtoix]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo,sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = ixtoword[yhat]
        in_text += ' ' + word
        if word == 'endseq':
            break

    final = in_text.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final

In [None]:
def beam_search_predictions(image, beam_index = 3):
    start = [wordtoix["startseq"]]
    start_word = [[start, 0.0]]
    while len(start_word[0][0]) < max_length:
        temp = []
        for s in start_word:
            par_caps = sequence.pad_sequences([s[0]], maxlen=max_length, padding='post')
            preds = model.predict([image,par_caps], verbose=0)
            word_preds = np.argsort(preds[0])[-beam_index:]
            # Getting the top <beam_index>(n) predictions and creating a 
            # new list so as to put them via the model again
            for w in word_preds:
                next_cap, prob = s[0][:], s[1]
                next_cap.append(w)
                prob += preds[0][w]
                temp.append([next_cap, prob])
                    
        start_word = temp
        # Sorting according to the probabilities
        start_word = sorted(start_word, reverse=False, key=lambda l: l[1])
        # Getting the top words
        start_word = start_word[-beam_index:]
    
    start_word = start_word[-1][0]
    intermediate_caption = [ixtoword[i] for i in start_word]
    final_caption = []
    
    for i in intermediate_caption:
        if i != 'endseq':
            final_caption.append(i)
        else:
            break

    final_caption = ' '.join(final_caption[1:])
    return final_caption

In [None]:
encoding_test_keys = list(encoding_test.keys())[:5]

In [None]:
encoding_test_keys = list(encoding_test.keys())[:5] # take first 5 keys of encoding_test dictionary

for key in encoding_test_keys:
  image = encoding_test[key].reshape((1,2048))
  x = plt.imread(images_path + key + '.jpg')
  plt.imshow(x)
  plt.show()

  print("\nGreedy Search:", greedySearch(image))
  print("Beam Search, K = 3:", beam_search_predictions(image, beam_index = 3))
  print("Beam Search, K = 5:", beam_search_predictions(image, beam_index = 5))
  print("Beam Search, K = 7:", beam_search_predictions(image, beam_index = 7))
  print("Beam Search, K = 10:", beam_search_predictions(image, beam_index = 10))
  print('\n\n')

# Generate caption for new images

In [None]:
def preprocess_img(img_path):
    # inception v3 accept images of size 299 * 299 * 3
    img = load_img(img_path, target_size = (299, 299))
    x = img_to_array(img)
    x = np.expand_dims(x, axis = 0) # Add one more dimension
    x = preprocess_input(x)
    return x
  
def encode(image, img_path):
    image = preprocess_img(img_path + image + '.jpg')
    vec = inception_model.predict(image)
    vec = np.reshape(vec, (vec.shape[1]))
    return vec
  
base_model = InceptionV3(weights = 'imagenet')
inception_model = Model(base_model.input, base_model.layers[-2].output)

# Run the encode function on new images
# and store the feature vectors in a list
new_images_path = 'drive/My Drive/IntroDL/Final Project/New_images/'
# img = 'giannis-antetokounmpo'
# img = 'antetokounmpo-shoot'
# img = 'street-basketball'
# img = 'nadal'
img = 'boy-crying'
encoding_new_images = {}
encoding_new_images[img] = encode(img, new_images_path)

In [None]:
def greedySearch(photo):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = [wordtoix[w] for w in in_text.split() if w in wordtoix]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo, sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = ixtoword[yhat]
        in_text += ' ' + word
        if word == 'endseq':
            break

    final = in_text.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final

In [None]:
def beam_search_predictions(image, beam_index = 3):
    start = [wordtoix["startseq"]]
    start_word = [[start, 0.0]]
    while len(start_word[0][0]) < max_length:
        temp = []
        for s in start_word:
            par_caps = sequence.pad_sequences([s[0]], maxlen=max_length, padding='post')
            preds = model.predict([image,par_caps], verbose=0)
            word_preds = np.argsort(preds[0])[-beam_index:]
            # Getting the top <beam_index>(n) predictions and creating a 
            # new list so as to put them via the model again
            for w in word_preds:
                next_cap, prob = s[0][:], s[1]
                next_cap.append(w)
                prob += preds[0][w]
                temp.append([next_cap, prob])
                    
        start_word = temp
        # Sorting according to the probabilities
        start_word = sorted(start_word, reverse=False, key=lambda l: l[1])
        # Getting the top words
        start_word = start_word[-beam_index:]
    
    start_word = start_word[-1][0]
    intermediate_caption = [ixtoword[i] for i in start_word]
    final_caption = []
    
    for i in intermediate_caption:
        if i != 'endseq':
            final_caption.append(i)
        else:
            break

    final_caption = ' '.join(final_caption[1:])
    return final_caption

In [None]:
# pic = 'street-basketball'
# pic = 'giannis-antetokounmpo'
# pic = 'antetokounmpo-shoot'
# pic = 'nadal'
pic = 'boy-crying'
image = encoding_new_images[pic].reshape((1,2048))
x = plt.imread(new_images_path + pic + '.jpg')
plt.imshow(x)
plt.show()

print("\nGreedy Search:", greedySearch(image))
print("Beam Search, K = 3:", beam_search_predictions(image, beam_index = 3))
print("Beam Search, K = 5:", beam_search_predictions(image, beam_index = 5))
print("Beam Search, K = 7:", beam_search_predictions(image, beam_index = 7))
print("Beam Search, K = 10:", beam_search_predictions(image, beam_index = 10))