# **Version 2**
----
Model with image encoder and language encoder.

Image encoder is the features extracted from a pre-trained model.

Language encoder is built using GRUs, Attention layers for the partial text of the caption.

The features are then merged to predict the next word in for the caption.

#### Importing the necessary libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pickle
import numpy as np
import os
import cv2
import string
from collections import Counter
from PIL import Image
import string
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Input, Add, Conv2D, MaxPooling2D
from tensorflow.keras.layers import Dense, Concatenate, Flatten
from tensorflow.keras.layers import LSTM, Bidirectional, GRU
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50V2, VGG16
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras import backend as K
from tensorflow.keras import initializers, regularizers, constraints
from tensorflow.keras.layers import Layer
from tensorflow.keras.optimizers import Adam
import random
import json

In [None]:
!pip3 install pipreqsnb
!pipreqsnb --savepath 'requirements.txt' '/content/drive/My Drive/Colab Notebooks/ImageCaptioning_V2.ipynb'

Collecting pipreqsnb
  Downloading https://files.pythonhosted.org/packages/e7/99/1fd7d0ce621dd5491e9e0086cdd84223a9729fe9ae76202758e2e444c70f/pipreqsnb-0.2.2.tar.gz
Collecting pipreqs
  Downloading https://files.pythonhosted.org/packages/9b/83/b1560948400a07ec094a15c2f64587b70e1a5ab5f7b375ba902fcab5b6c3/pipreqs-0.4.10-py2.py3-none-any.whl
Collecting yarg
  Downloading https://files.pythonhosted.org/packages/8b/90/89a2ff242ccab6a24fbab18dbbabc67c51a6f0ed01f9a0f41689dc177419/yarg-0.1.9-py2.py3-none-any.whl
Building wheels for collected packages: pipreqsnb
  Building wheel for pipreqsnb (setup.py) ... [?25l[?25hdone
  Created wheel for pipreqsnb: filename=pipreqsnb-0.2.2-cp36-none-any.whl size=3989 sha256=141293f7c3d3f25ac1f13c36f8e9b40bf9defdc4e8e13b8dbbb0ba90243eeb02
  Stored in directory: /root/.cache/pip/wheels/d5/48/eb/c365e598808484772b5285721af9252665e29a20dbae98182a
Successfully built pipreqsnb
Installing collected packages: yarg, pipreqs, pipreqsnb
Successfully installed pipreq

#### Loading the annotations file, pre-processing it

In [None]:
main_dir = '/content/drive/My Drive/ImageCaptioning/'

In [None]:
#### LOADING ANNOTATIONS FILE, PREPROCESSING IT AND SAVING IT AS 'DESCRIPTIONS.TXT
def load_doc(filename):
	file = open(filename, 'r')
	text = file.read()
	file.close()
	return text

# extract descriptions for images
def load_descriptions(doc):
	mapping = dict()
	for line in doc.split('\n'):
		tokens = line.strip().split('\t')
		if len(line) < 2:
			continue
		image_id, image_desc = tokens[0], tokens[1:]
		image_id = image_id.split('.')[0]
		image_desc = ' '.join(image_desc)
		if image_id not in mapping:
			mapping[image_id] = image_desc
	return mapping

def clean_descriptions(descriptions):
	table = str.maketrans('', '', string.punctuation)
	for key, desc in descriptions.items():
		desc = desc.split()
		desc = [word.lower() for word in desc]
		desc = [w.translate(table) for w in desc]
		desc = [word for word in desc if len(word)>1]
		descriptions[key] =  ' '.join(desc)

# save descriptions to file, one per line
def save_doc(descriptions, filename):
	lines = list()
	for key, desc in descriptions.items():
		lines.append(key + '\t' + desc)
	data = '\n'.join(lines)
	file = open(filename, 'w')
	file.write(data)
	file.close()

filename = main_dir + 'annotations_ajio_v4_full(2).txt'
doc = load_doc(filename)
print('Finished loading', filename)
descriptions = load_descriptions(doc)
print('Loaded: %d ' % len(descriptions))
clean_descriptions(descriptions)
print("Finished cleaning descriptions")
all_tokens = ' '.join(descriptions.values()).split()
vocabulary = set(all_tokens)
print('Vocabulary Size: %d' % len(vocabulary))
#save_doc(descriptions, main_dir+'descriptions_v2.txt')

#### Shuffling and splitting the dataset to training, validation and testing

In [None]:
product_ids = list(descriptions.keys())
random.shuffle(product_ids)
train_product_ids = product_ids[:int(0.9*len(product_ids))]
random.shuffle(train_product_ids)
val_product_ids = product_ids[int(0.9*len(product_ids)):int(0.95*len(product_ids))]
random.shuffle(val_product_ids)
test_product_ids = product_ids[int(0.95*len(product_ids)):]
random.shuffle(test_product_ids)
print(len(product_ids))
print(len(train_product_ids))
print(len(val_product_ids))
print(len(test_product_ids))

#### Pre-processing to find vocab_size and max_caption_length

In [None]:
### PREPROCESSING CAPTIONS FOR TRAINING
def load_captions(descriptions,train_product_ids):
    train_captions=[]
    for image_id in descriptions.keys():
      if image_id in train_product_ids:
        train_captions.append('startseq '+descriptions[image_id]+' endseq')
    
    return train_captions

train_captions = load_captions(descriptions,train_product_ids)
val_captions = load_captions(descriptions, val_product_ids)
test_captions = load_captions(descriptions, test_product_ids)

In [None]:
### WORD TO INDEX DICTIONARY

corpus = []
for caption in val_captions+train_captions+test_captions:
    for token in caption.split():
        corpus.append(token)
        
hash_map = Counter(corpus)
vocab = []
for token,count in hash_map.items():
        if count > 1:
            vocab.append(token)
        
print('Number of original tokens',len(hash_map))
print('Number of tokens after threshold',len(vocab))

word_to_index = {}
index_to_word = {}
    
for idx,token in enumerate(vocab):
    word_to_index[token] = idx+1
    index_to_word[idx+1] = token

vocab_size = len(index_to_word) + 1 # one for appended 0's

print(len(index_to_word))

## MAX LENGTH OF CAPTIONS

def max_len_caption(all_train_captions):   
    max_len = 0
    for caption in all_train_captions:
        max_len = max(max_len,len(caption.split()))
    print('Maximum length of caption= ',max_len)
    return max_len

max_length_caption = max_len_caption(train_captions+val_captions+test_captions)

#### Extracting the image folder

In [None]:
## UNZIPPING IMAGES
from zipfile import ZipFile

filename= main_dir+'images_v4_full(2) (1).zip'
print(filename)
zip = ZipFile(filename)
zip.extractall()

import os
print(len(os.listdir('/content/images_v4_full(2)')))

#### Extracting the image features and store it in a dictionary

In [None]:
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input, decode_predictions

def load_img_features(product_ids):
	features=dict()
	product_ids_new = []
	model = InceptionV3(include_top=False, pooling='max')
	image_dir ='/content/images_v4_full(2)/'

	for j,id in enumerate(product_ids): 
		if j%100 == 0:
			print(j)
		try:
			image_name = image_dir+ id+'.jpg'
			image=  load_img(image_name,target_size=(299, 299,3))
			image = img_to_array(image)
			image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
			image = preprocess_input(image)
			feature = model.predict(image, verbose=0)
			product_ids_new.append(id)
			features[id] = feature.reshape(2,2,512)
		except OSError:
		  print("Error with file")
  
	print("Loaded", len(features.keys()) ,"number of features" )
	print(features[id].shape)
	print(type(features[id]))
	return features, product_ids_new


In [None]:
train_features, train_product_ids = load_img_features(train_product_ids)
val_features, val_product_ids = load_img_features(val_product_ids)

#### Loading the captions dictionary

Creating seperate dictionaries for different splits of data, along with the `<startseq>` and `<endseq>` token.

In [None]:
def load_captions_dict(descriptions,train_product_ids):
    train_captions=dict()
    for image_id in descriptions.keys():
      if image_id in train_product_ids:
        train_captions[image_id]= 'startseq '+descriptions[image_id]+' endseq'
    
    return train_captions

train_captions = load_captions_dict(descriptions,train_product_ids)
val_captions = load_captions_dict(descriptions, val_product_ids)
test_captions = load_captions_dict(descriptions, test_product_ids)

#### Defining the Attention Layer

In [None]:
class Attention(Layer):
    def __init__(self, step_dim,
                 W_regularizer=None, b_regularizer=None,
                 W_constraint=None, b_constraint=None,
                 bias=True, **kwargs):
 
        self.supports_masking = True
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        self.step_dim = step_dim
        self.features_dim = 0
        super(Attention, self).__init__(**kwargs)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
                #'supports_masking':self.supports_masking,
                #'init':self.init,
                'W_regularizer': self.W_regularizer,
                'b_regularizer': self.b_regularizer,
                'W_constraint': self.W_constraint,
                'b_constraint': self.b_constraint,
                'bias': self.bias,
                'step_dim':self.step_dim,
                #'features_dim':self.features_dim,
        })
        return config

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        self.features_dim = input_shape[-1]

        if self.bias:
            self.b = self.add_weight(shape=(input_shape[1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)
        else:
            self.b = None

        self.built = True

    def compute_mask(self, input, input_mask=None):
        return None

    def call(self, x, mask=None):

        features_dim = self.features_dim
        step_dim = self.step_dim

        eij = K.reshape(K.dot(K.reshape(x, (-1, features_dim)), K.reshape(self.W, (features_dim, 1))), (-1, step_dim))

        if self.bias:
            eij += self.b

        eij = K.tanh(eij)
        a = K.exp(eij)
        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        return K.sum(weighted_input, axis=1)

    def compute_output_shape(self, input_shape):
        return input_shape[0],  self.features_dim

#### Defining the model

In [None]:
ImageEncoderInput = Input(shape=(512,))
ImageEncoder = Dropout(0.35)(ImageEncoderInput)
ImageEncoder = Dense(256, activation='relu')(ImageEncoder)

# Language Encoder
LanguageEncoderInput = Input(shape=(max_length_caption,))
LanguageEncoder = Embedding(vocab_size, 128, mask_zero=True)(LanguageEncoderInput)
LanguageEncoder = Dropout(0.35)(LanguageEncoder)
LanguageEncoder = Bidirectional(GRU(128, return_sequences=True, dropout=0.25,recurrent_dropout=0.25))(LanguageEncoder) 
LanguageEncoder = Attention(max_length_caption)(LanguageEncoder)

# Decoder
Decoder = Add()([ImageEncoder, LanguageEncoder])
Decoder = Dense(500, activation='relu')(Decoder)
FinalDecoder = Dense(vocab_size, activation='softmax')(Decoder)

model = Model(inputs=[ImageEncoderInput, LanguageEncoderInput], outputs=FinalDecoder)
model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=5e-4,decay=1e-5))
model.summary()

#### Plotting the model

In [None]:
from keras.utils import plot_model
plot_model(model)

#### Defining the custom data generator

In [None]:
def data_generator(descriptions, wordtoix, photos, categories, max_length, num_photos_per_batch):

	image_dir ='/content/images_v4_full(2)/'
	#in_layer = Input(shape=(224, 224, 3))

	X1, X2, y1, y2 = list(), list(), list(), list()
	n=0
	# loop for ever over images
	while 1:
		for key, desc in descriptions.items():
			n+=1
			# retrieve the photo feature
			temp=main_dir
			try:
				photo = photos[key]
			
				for abc in range(1):
					# encode the sequence
				
					seq = [wordtoix[word] for word in desc.split(' ') if word in wordtoix]
						
					# split one sequence into multiple X, y pairs
					for i in range(1, len(seq)):
						# split into input and output pair
						in_seq, out_seq = seq[:i], seq[i]
						# pad input sequence
						in_seq = pad_sequences([in_seq], maxlen=max_length, dtype='float64')[0]
						# encode output sequence
						out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
						# store
					
						X1.append(photo)
						X2.append(in_seq)
						y1.append(out_seq)
						y2.append(categories[key])
			except KeyError:
				continue

			# yield the batch data
			if n==num_photos_per_batch:
				"""
				yield ([np.array(X1, dtype='float64'),
							np.array(X2, dtype='float64')], np.array(y1, dtype='float64'))
				"""
				yield ([np.array(X1, dtype='float64'),
							np.array(X2, dtype='float64')], {'decoder_output':np.array(y1, dtype='float64'),
															'classifier_output':np.array(y2, dtype='float64')})
				
				#X1, X2, y1, y2 = list(), list(), list(), list()
				X1, X2, y1, y2 = list(), list(), list(), list()
				n=0
			

#### Finally! Training the model.

In [None]:
epochs = 50
number_pics_per_batch = 128
steps = len(train_captions)//number_pics_per_batch
temp = main_dir

In [None]:
generator = data_generator(train_captions,  word_to_index, train_features ,train_categories, max_length_caption, number_pics_per_batch)
val_generator = data_generator(val_captions, word_to_index, val_features, val_categories, max_length_caption, number_pics_per_batch)

In [None]:
output_dir = main_dir + 'model2/'

from tensorflow.keras.callbacks import ReduceLROnPlateau

reduce_lr = ReduceLROnPlateau(monitor='val_loss', patience=5, verbose=1,min_lr=1e-7, factor = 0.5)

if os.path.exists(output_dir) == False:
  os.mkdir(output_dir)
  
history = model.fit_generator(generator, validation_data = val_generator, 
                                    validation_steps = len(val_captions)//number_pics_per_batch
                                    ,epochs=epochs,
                                    #,epochs=1,
                                steps_per_epoch=steps,
                                verbose=1, 
                               callbacks=[reduce_lr])

#### Saving the model

In [None]:
model.save(output_dir+'V2(1).h5')

#### Plotting the loss curves

In [None]:
import matplotlib.pyplot as plt
print(history.history.keys())
 
# summarize history for loss
print('loss')
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model total loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

print('decoder_output_loss')
plt.plot(history.history['decoder_output_loss'])
plt.plot(history.history['val_decoder_output_loss'])
plt.title('model decoder loss')
plt.ylabel('decoder loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()

print('classifier_output_loss')
plt.plot(history.history['classifier_output_loss'])
plt.plot(history.history['val_classifier_output_loss'])
plt.title('model classifier loss')
plt.ylabel('classifier loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper right')
plt.show()


#### Saving the corresponding mapping data in **JSON**

In [None]:
f= open(output_dir+'V2(1).json','w')
data = {'word_to_index':word_to_index,
        'index_to_word':index_to_word}
json.dump(data,f)
f.close()

#### Loading the saved model

In [None]:
from tensorflow.keras.models import load_model
output_dir = main_dir + 'model2/'

from tensorflow.keras.utils import CustomObjectScope
from tensorflow.keras.initializers import glorot_uniform

with CustomObjectScope({'GlorotUniform': glorot_uniform()}):
        model = load_model(output_dir+'V2(1).h5', custom_objects={'Attention':Attention})

#### Loading the test data

In [None]:
test_features, test_product_ids = load_img_features(test_product_ids)

#### Testing the model

In [None]:
import cv2
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow

def greedySearch(photo):
  in_text = 'startseq'
  for i in range(max_length_caption):
    sequence = [word_to_index[w] for w in in_text.split(' ') if w in word_to_index]
    sequence = pad_sequences([sequence], maxlen = max_length_caption)
    yhat = model.predict([photo,sequence],verbose=1)
    yhat = np.argmax(yhat,axis=0)
    word = index_to_word[yhat]
    in_text+=' '+word
    if word == 'endseq':
      break
  final = in_text.split()
  final = final[1:-1]
  final = ' '.join(final)
  return final

print(len(test_product_ids))
z=173
pic= test_product_ids[z]
print(pic)
image = test_features[pic].reshape(1,2,2,512)
cv2_imshow(cv2.imread('images_v4_full(2)/'+pic+'.jpg'))
result = greedySearch(image)

print("Actual output:",test_captions[pic])
print("Predicted output:",result)