## **Import Libraries/Dataset**

**Import packages**

In [1]:
import string
import numpy as np
import pandas as pd
from pickle import load,dump

import matplotlib.pyplot as plt
import sys, time, os, warnings
warnings.filterwarnings("ignore")
import re

import keras
import tensorflow as tf
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu

from keras.preprocessing.sequence import pad_sequences
#from tensorflow.keras.utils import to_categorical
#from tensorflow.keras.utils import plot_model
from keras.models import Model
#from keras.layers import Input
from keras.layers import Dense, BatchNormalization,GRU,Embedding,Dropout
# from keras.layers import GRU
# from keras.layers import Embedding
# from keras.layers import Dropout
#from keras.layers.merge import add
#from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import load_img
from keras.preprocessing.text import Tokenizer
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.regularizers import l2

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import os.path
import zipfile

**Mounting the Google Drive to get and Store data**

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
BASE_PATH = '/content/drive/MyDrive/group43/dl/assignment2/'
IMAGES_FOLDER =  BASE_PATH + "Flicker8k_Dataset/"
FEATURE_FILE = BASE_PATH + "/data/features.pkl"
FEATURE_EXTRACTED = BASE_PATH + "/data/saved/features.extracted.id"
IMAGE_SUFFIX= ".jpg"
IMAGE_SIZE = (299, 299)
VOCAB_SIZE = 10000
SEQ_LENGTH = 20
EMBED_DIM = 512
UNIT_DIM = 256
NUM_HEADS = 2
FF_DIM = 512
BATCH_SIZE = 64
EPOCHS = 30
AUTOTUNE = tf.data.AUTOTUNE
LEARNING_RATE = 0.00001

**Importing Dataset**

In [None]:
jpgs = os.listdir(IMAGES_FOLDER)
#fileName = "set_3"
pickle_file = BASE_PATH+ "set_3.pkl"
df = pd.read_pickle(pickle_file, compression='infer')
print("Size of the dataset",len(df))

## **Data Visualization and augmentation**

**Loading the data**

In [None]:
datatxt = []
for row in df:
   col = row.split('\t')
   if len(col) == 1:
       continue
   w = col[0].split("#")
   datatxt.append(w + [col[1].lower()])

data = pd.DataFrame(datatxt,columns=["filename","index","caption"])
data = data.reindex(columns =['index','filename','caption'])

data = data[data.filename != '2258277193_586949ec62.jpg.1']
uni_filenames = np.unique(data.filename.values)

data.head()

**Plot of two samples and their captions**

In [None]:
npic = 5
npix = 224
target_size = (npix,npix,3)
count = 1

fig = plt.figure(figsize=(10,20))
for jpgfnm in uni_filenames[12:14]:
   filename = IMAGES_FOLDER + '/' + jpgfnm
   captions = list(data["caption"].loc[data["filename"]==jpgfnm].values)
   image_load = load_img(filename, target_size=target_size)
   ax = fig.add_subplot(npic,2,count,xticks=[],yticks=[])
   ax.imshow(image_load)
   count += 1

   ax = fig.add_subplot(npic,2,count)
   plt.axis('off')
   ax.plot()
   ax.set_xlim(0,1)
   ax.set_ylim(0,len(captions))
   for i, caption in enumerate(captions):
       ax.text(0,i,caption,fontsize=20,color="blue")
   count += 1
plt.show()

**Preprocessing the data**

In [None]:
vocabulary = []
for txt in data.caption.values:
   vocabulary.extend(txt.split())
print('Vocabulary Size: %d' % len(set(vocabulary)))

def remove_punctuation(text_original):
   text_no_punctuation = text_original.translate(string.punctuation)
   return(text_no_punctuation)

def remove_single_character(text):
   text_len_more_than1 = ""
   for word in text.split():
       if len(word) > 1:
           text_len_more_than1 += " " + word
   return(text_len_more_than1)

def remove_numeric(text):
   text_no_numeric = ""
   for word in text.split():
       isalpha = word.isalpha()
       if isalpha:
           text_no_numeric += " " + word
   return(text_no_numeric)

def text_clean(text_original):
   text = remove_punctuation(text_original)
   text = remove_single_character(text)
   text = remove_numeric(text)
   return(text)

for i, caption in enumerate(data.caption.values):
   newcaption = text_clean(caption)
   data["caption"].iloc[i] = newcaption

clean_vocabulary = []
for txt in data.caption.values:
   clean_vocabulary.extend(txt.split())
print('Clean Vocabulary Size: %d' % len(set(clean_vocabulary)))

all_captions = []
for caption  in data["caption"].astype(str):
   caption = '<start> ' + caption+ ' <end>'
   all_captions.append(caption)

all_captions[:10]


all_img_name_vector = []
for annot in data["filename"]:
   full_image_path = IMAGES_FOLDER + annot
   all_img_name_vector.append(full_image_path)

all_img_name_vector[:10]

words = [txt.split() for txt in data["caption"].astype(str)]

unique = []
for word in words:
    unique.extend(word)
unique = list(set(unique))
word_index = {}
index_word={}

for i,word in enumerate(unique):

    word_index[word] = i
    index_word[i] = word

partial_captions = []
for text in data["caption"].astype(str):
  one = [word_index[txt] for txt in text.split()]
  partial_captions.append(one)



In [None]:
def data_limiter(num,total_captions,all_img_name_vector):
 train_captions, img_name_vector = shuffle(total_captions,all_img_name_vector,random_state=1)
 train_captions = train_captions[:num]
 img_name_vector = img_name_vector[:num]
 return train_captions,img_name_vector

train_captions,img_name_vector = data_limiter(8000,all_captions,all_img_name_vector)

## **Model Building**

**Use Pretrained VGG-16 model trained on ImageNet dataset for image feature extraction.**

In [None]:
def load_image(image_path):
   img = tf.io.read_file(image_path)
   img = tf.image.decode_jpeg(img, channels=3)
   img = tf.image.resize(img, (224, 224))
   img = preprocess_input(img)
   return img, image_path

image_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = Model(new_input, hidden_layer)

image_features_extract_model.summary()

In [None]:
encode_train = sorted(set(img_name_vector))
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(64)

In [None]:
%%time
if not os.path.isfile(FEATURE_EXTRACTED):

  for img, path in tqdm(image_dataset):
    batch_features = image_features_extract_model(img)
    batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

    for bf, p in zip(batch_features, path):
      path_of_feature = p.numpy().decode("utf-8")
      #all_features[path_of_feature] =bf.numpy()
      fileNamePath= BASE_PATH+ "data/saved/"+ os.path.basename(path_of_feature)
      np.save(fileNamePath, bf.numpy())
  f = open(FEATURE_EXTRACTED, "w")
  f.write("Feature Extracted")
  f.close()



**Tokenization of data**

In [None]:
tokenizer = tf.keras.preprocessing.text.Tokenizer(
                                                 oov_token="<unk>",
                                                 filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')

tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'
train_seqs = tokenizer.texts_to_sequences(train_captions)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')


In [None]:
def calc_max_length(tensor):
   return max(len(t) for t in tensor)
max_length = calc_max_length(train_seqs)

def calc_min_length(tensor):
   return min(len(t) for t in tensor)
min_length = calc_min_length(train_seqs)

print('Max Length of any caption : Min Length of any caption = '+ str(max_length) +" : "+str(min_length))

In [None]:
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,cap_vector, test_size=0.2, random_state=0)

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index) + 1
num_steps = len(img_name_train) // BATCH_SIZE
features_shape = 512
attention_features_shape = 49

In [None]:
def map_func(img_name, cap):
 fileNamePath= BASE_PATH+ "data/saved/"+ os.path.basename(img_name.decode('utf-8'))

 img_tensor = np.load(fileNamePath+'.npy')
 #img_tensor = all_features[img_name.decode('utf-8')]
 return img_tensor, cap



In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))
# Use map to load the numpy files in parallel
train_dataset = train_dataset.map(lambda item1, item2: tf.numpy_function(
        map_func, [item1, item2], [tf.float32, tf.int32]),
         num_parallel_calls=tf.data.experimental.AUTOTUNE)

train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)


# test_dataset = tf.data.Dataset.from_tensor_slices((img_name_val, cap_val))
# # Use map to load the numpy files in parallel
# test_dataset = test_dataset.map(lambda item1, item2: tf.numpy_function(
#         map_func, [item1, item2], [tf.float32, tf.int32]),
#          num_parallel_calls=tf.data.experimental.AUTOTUNE)

# test_dataset = test_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
# test_dataset = test_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
class Encoder(tf.keras.Model):
   # This encoder passes the features through a Fully connected layer
   def __init__(self, embedding_dim):
       super(Encoder, self).__init__()
       # shape after fc == (batch_size, 49, embedding_dim)
       self.fc = tf.keras.layers.Dense(embedding_dim)
       self.dropout = tf.keras.layers.Dropout(0.5, noise_shape=None, seed=None)

   def call(self, x):
       #x= self.dropout(x)
       x = self.fc(x)
       x = tf.nn.relu(x)
       return x  

class RNN_Decoder(tf.keras.Model):
 def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru_1 = GRU(self.units,
                                  return_sequences=True,
                                  return_state=True,
                                  recurrent_initializer='glorot_uniform',
                                  kernel_regularizer=l2(0.01), 
                                  bias_regularizer=l2(0.01))
    self.gru_2 = GRU(self.units,
                                  return_sequences=True,
                                  return_state=True,
                                  recurrent_initializer='glorot_uniform')
    self.gru_3 = GRU(self.units,
                                  return_sequences=True,
                                  return_state=True,
                                  recurrent_initializer='glorot_uniform') 
    self.fc1 = Dense(self.units)

    self.dropout = Dropout(0.5, noise_shape=None, seed=None)
    self.batchnormalization = BatchNormalization(axis=-1, momentum=0.99, 
                                  epsilon=0.001, center=True, scale=True, beta_initializer='zeros', 
                                  gamma_initializer='ones', moving_mean_initializer='zeros', 
                                  moving_variance_initializer='ones', beta_regularizer=None, gamma_regularizer=None, 
                                  beta_constraint=None, gamma_constraint=None)

    self.fc2 = Dense(vocab_size)

    # Implementing Attention Mechanism
    self.Uattn = Dense(units)
    self.Wattn = Dense(units)
    self.Vattn = Dense(1)

 def call(self, x, features, hidden):

   hidden_with_time_axis = tf.expand_dims(hidden, 1)

   score = self.Vattn(tf.nn.tanh(self.Uattn(features) + self.Wattn(hidden_with_time_axis)))

   # you get 1 at the last axis because you are applying score to self.Vattn
   # Then find Probability using Softmax
   '''attention_weights(alpha(ij)) = softmax(e(ij))'''

   attention_weights = tf.nn.softmax(score, axis=1)

   # attention_weights shape == (64, 49, 1)
   # Give weights to the different pixels in the image
   ''' C(t) = Summation(j=1 to T) (attention_weights * VGG-16 features) '''

   context_vector = attention_weights * features
   context_vector = tf.reduce_sum(context_vector, axis=1)

   # Context Vector(64,256) = AttentionWeights(64,49,1) * features(64,49,256)
   # context_vector shape after sum == (64, 256)
   # x shape after passing through embedding == (64, 1, 256)

   x = self.embedding(x)
   # x shape after concatenation == (64, 1,  512)

   x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
   # passing the concatenated vector to the GRU

   output,state = self.gru_1(x)
   output,state = self.gru_2(output,state)
   output,state = self.gru_3(output,state)

   x = self.fc1(output)

   # x shape == (batch_size * max_length, hidden_size)
   x = tf.reshape(x, (-1, x.shape[2]))

   # Adding Dropout and BatchNorm Layers
   x= self.dropout(x)
   x= self.batchnormalization(x)

   # output shape == (64 * 512)
   x = self.fc2(x)

   # shape : (64 * 8329(vocab))
   return x, state, attention_weights

 def reset_state(self, batch_size):
   return tf.zeros((batch_size, self.units))

In [None]:
encoder = Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)


In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
   from_logits=True, reduction='none')

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

# test_loss = tf.keras.metrics.Mean(name='test_loss')
# test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

## **Model Training**

In [None]:
train_loss_plot = []
test_loss_plot = []
train_accuracy_plot=[]
test_accuracy_plot=[]

@tf.function
def train_step(img_tensor, target):
 # initializing the hidden state for each batch
 # because the captions are not related from image to image

 hidden = decoder.reset_state(batch_size=target.shape[0])
 dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

 with tf.GradientTape() as tape:
     features = encoder(img_tensor)
     for i in range(1, target.shape[1]):
         # passing the features through the decoder
         predictions, hidden, _ = decoder(dec_input, features, hidden)
         loss = loss_object(target[:, i], predictions)
         train_loss(loss)
         train_accuracy(target[:, i], predictions)
         # using teacher forcing
         dec_input = tf.expand_dims(target[:, i], 1)

 trainable_variables = encoder.trainable_variables + decoder.trainable_variables
 gradients = tape.gradient(loss, trainable_variables)
 optimizer.apply_gradients(zip(gradients, trainable_variables))

In [None]:
# @tf.function
# def test_step(images, labels):
#   # training=False is only needed if there are layers with different
#   # behavior during training versus inference (e.g. Dropout).
#   hidden = decoder.reset_state(batch_size=target.shape[0])
#   dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

#   with tf.GradientTape() as tape:
#      features = encoder(img_tensor)
#      for i in range(1, target.shape[1]):
#          # passing the features through the decoder
#          predictions, hidden, _ = decoder(dec_input, features, hidden)
#          loss = loss_object(target[:, i], predictions)
#          test_loss(loss)
#          test_accuracy(target[:, i], predictions)
#          # using teacher forcing
#          dec_input = tf.expand_dims(target[:, i], 1)


In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

for epoch in range(1, EPOCHS):
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()
  start = time.time()
  total_loss = 0
  total_accuracy=0
  for (batch, (img_tensor, target)) in enumerate(train_dataset):
      #batch_loss, t_loss,batch_accuracy,t_accuracy = train_step(img_tensor, target)
      train_step(img_tensor, target)

  # for (batch,(img_tensor, target)) in enumerate(test_dataset):
  #     test_step(img_tensor,target)
  
  train_loss_plot.append(train_loss.result().numpy()) 
  train_accuracy_plot.append(train_accuracy.result().numpy()*100)   
  # test_loss_plot.append(test_loss.result().numpy()) 
  # test_accuracy_plot.append(test_accuracy.result().numpy()*100)   

  print(
  f'Epoch {epoch + 1}, '
  f'Loss: {train_loss.result()}, '
  f'Accuracy: {train_accuracy.result() * 100} '
  # f'Test Loss: {test_loss.result()}, '
  # f'Test Accuracy: {test_accuracy.result() * 100}'
  )
  print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

In [None]:

# plt.plot(train_loss_plot)
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.title('Loss Plot')
# plt.show()

# plot the losses
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, len(train_loss_plot)), train_loss_plot,
	label="Training Loss")
plt.plot(np.arange(0, len(train_accuracy_plot)), train_accuracy_plot,
	label="Training Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.title('Training Accuracy Loss Plot Training')
plt.legend(loc="upper right")
plt.show()

### **Model Evaluation**

In [None]:
def evaluate(image):
  attention_plot = np.zeros((max_length, attention_features_shape))

  hidden = decoder.reset_state(batch_size=1)
  temp_input = tf.expand_dims(load_image(image)[0], 0)
  img_tensor_val = image_features_extract_model(temp_input)
  img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

  extracted_features = encoder(img_tensor_val)
  dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
  result = []

  for i in range(max_length):
      predictions, hidden, attention_weights = decoder(dec_input, extracted_features, hidden)
      attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()
      predicted_id = tf.argmax(predictions[0]).numpy()
      result.append(tokenizer.index_word[predicted_id])

      if tokenizer.index_word[predicted_id] == '<end>':
          return result, attention_plot

      dec_input = tf.expand_dims([predicted_id], 0)
  attention_plot = attention_plot[:len(result), :]

  return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
   temp_image = np.array(Image.open(image))
   fig = plt.figure(figsize=(10, 10))
   len_result = len(result)
   for l in range(len_result):
       temp_att = np.resize(attention_plot[l], (8, 8))
       ax = fig.add_subplot(len_result//2, len_result//2, l+1)
       ax.set_title(result[l])
       img = ax.imshow(temp_image)
       ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

   plt.tight_layout()
   plt.show()

In [None]:
# captions on the validation set
rid = np.random.randint(0, len(img_name_val))
image = IMAGES_FOLDER +'2319175397_3e586cfaf8.jpg'

# real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)
real_caption = 'Two white dogs are playing in the snow'
# remove <start> and <end> from the real_caption
first = real_caption.split(' ', 1)[1]


#remove "<unk>" in result
for i in result:
   if i=="<unk>":
       result.remove(i)

for i in real_caption:
   if i=="<unk>":
       real_caption.remove(i)

#remove <end> from result        
result_join = ' '.join(result)
result_final = result_join.rsplit(' ', 1)[0]

real_appn = []
real_appn.append(real_caption.split())
reference = real_appn
candidate = result

score = sentence_bleu(reference, candidate)
print(f"BELU score: {score*100}")

print ('Real Caption:', real_caption)
print ('Prediction Caption:', result_final)
plot_attention(image, result, attention_plot)