# EYE FOR BLIND
This notebook will be used to prepare the capstone project 'Eye for Blind'

In [None]:
#Import all the required libraries

import warnings
warnings.filterwarnings("ignore")

import glob
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import os
import zipfile
from PIL import Image
import random

# Date and Time 
import datetime,time

# Data manipulation
import numpy as np
import pandas as pd
import collections, random, re
from collections import Counter
import operator

# tensorflow , KERAS Libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array 
from tensorflow.keras.utils import plot_model
from keras.preprocessing.text import text_to_word_sequence
from tqdm import tqdm
from keras.utils.vis_utils import plot_model


# Model building 
from sklearn.model_selection import train_test_split


In [None]:
## extarct the data if not present
if os.path.isdir('/content/Flickr8K/'):
  print("Flickr8K already present")
else:
  zf = zipfile.ZipFile ('/content/drive/MyDrive/Flickr8K.zip', 'r')
  zf.extractall('/content/Flickr8K/')
  zf.close()

Let's read the dataset

## Data understanding
1.Import the dataset and read image & captions into two seperate variables

2.Visualise both the images & text present in the dataset

3.Create word-to-index and index-to-word mappings.

4.Create a dataframe which summarizes the image, path & captions as a dataframe

5.Visualise the top 30 occuring words in the captions

6.Create a list which contains all the captions & path


In [None]:
## images and text path
images_path='/content/Flickr8K/Images'

text_path= '/content/Flickr8K/captions.txt'

In [None]:
#Import the dataset and read the image into a seperate variable
all_imgs = glob.glob(images_path + '/*.jpg',recursive=True)
print("The total images present in the dataset: {}".format(len(all_imgs)))

In [None]:
#Import the dataset and read the text file into a seperate variable
def load_doc(filename):
    lines = []
    with open(filename) as f:
        lines = f.readlines()
        f.close()
    return lines

In [None]:
doc = load_doc(text_path)

## deleting the 1st line from the text as its the header
del doc[0]

print(doc[:3])

In [None]:
#Visualise both the images & text present in the dataset

def Visualise_image_text(image):
    imgPath_vis = all_imgs[0]
    plt.figure(figsize=(6,4))
    plt.imshow(mpimg.imread(imgPath_vis))

    for i in range(len(doc)):
        filename = os.path.basename(imgPath_vis)
        if str(doc[i]).__contains__(filename):
            print(doc[i][len(filename)+1:])

In [None]:
#Plotting one Image & texts
Visualise_image_text(all_imgs[0])

Create a dataframe which summarizes the image, path & captions as a dataframe

Each image id has 5 captions associated with it therefore the total dataset should have 40455 samples.

In [None]:
print("No of Images:" , len(all_imgs))
print("No of Captions:" , len(doc))

In [None]:
dataframe_file = "dataframe_file.pkl"

In [None]:
### save and read the dataframe that stores image id , path and captions so that we don't have to rerun the model everytime.

def read_dataframe(filename):
  ##if file is not extracted the extarct and use it
  read_df = pd.read_pickle(filename)
  return read_df

def save_dataframe(filename,dataframe):
  dataframe.to_pickle(filename)

In [None]:
all_img_id= []#store all the image id here
all_img_vector= []#store all the image path here
annotations= [] #store all the captions here

## if df file is present then read from there else create df file
if os.path.isfile(dataframe_file):
    df = read_dataframe(dataframe_file)
    all_img_id= df.ID
    all_img_vector= df.Path
    annotations= df.Captions
else: ## create the dataframe and save it for future use
    for i in range(len(all_imgs)):
        fileName = os.path.basename(all_imgs[i])
        filePath = all_imgs[i]
        for j in range(len(doc)):
            if str(doc[j]).__contains__(fileName):
                annotations.append(str(doc[j][len(fileName)+1:]).strip())
                all_img_id.append(fileName)
                all_img_vector.append(filePath)                
    df = pd.DataFrame(list(zip(all_img_id, all_img_vector,annotations)),columns =['ID','Path', 'Captions'])
    save_dataframe(dataframe_file,df)
    
df.head(10)

In [None]:
## sorting the dataframe based on image id
df.sort_values(by="ID",inplace=True)

In [None]:
## checking the shape of the fianl dataframe
df.shape

In [None]:
#Create the vocabulary & the counter for the captions
filter_chars = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
vocabulary={} 

for lines in df.Captions:
  ## using keras api to convert lines to words
  for word in text_to_word_sequence(lines,filters=filter_chars,lower=True, split=' '):
      ## adding the word to the dict and the count  
      if word not in vocabulary:
        vocabulary[word] = 1
      else:
        vocabulary[word] = vocabulary[word] + 1

val_count=len(vocabulary)
val_count

In [None]:
## creating a dataframe for the words and count
df_word = pd.DataFrame.from_dict(vocabulary, orient='index')
df_word = df_word.sort_values(by=[0],ascending=False).reset_index()
df_word =df_word.rename(columns={'index':'word', 0:'count'})

In [None]:
## function to Visualise words and count
def show_top_words(index,words,count):
    plt.figure(figsize=(20,3))
    plt.bar(words,count,color='maroon', width =0.4)
    plt.xlabel("Words",  fontsize=20) 
    plt.ylabel("Word Count",rotation=90,fontsize=20) 
    plt.xticks(index,words,rotation=90,fontsize=20)
    plt.title("The top "+ str(len(index)) + " most frequently appearing words",fontsize=20)
    plt.show()

In [None]:
#Visualise the top 30 occuring words in the captions
words = list(df_word[0:30].word)
count =list(df_word['count'][0:30])
show_top_words(list(range(0,30)),words,count)

In [None]:
## Add <start> and <end> tags in the words
df['Captions']=df.Captions.apply(lambda x : f"<start> {x} <end>")

In [None]:
## disply the updated captions
df.head(5)

In [None]:
#Create a list which contains all the captions
annotations=df['Captions']

# Find max length of sequence excluding the spaces between words
max_length = max(df.Captions.apply(lambda x : len(x.split())))

#Create a list which contains all the path to the images
all_img_path = df.Path
unique_img = df.Path.unique()
print("Total captions present in the dataset: "+ str(len(annotations)))
print("Total images present in the dataset: " + str(len(all_img_path)))
print("Total Unique images present in the dataset: " + str(len(unique_img)))
print("Max words of a sentence is :",max_length)

In [None]:
## function to plot image and text side by side
def plot_image_captions(Pathlist,captionsList,fig,count=2,npix=299,nimg=2):
        image_load = load_img(Path,target_size=(npix,npix,3))
        ax = fig.add_subplot(nimg,2,count,xticks=[],yticks=[])
        ax.imshow(image_load)
        
        count +=1
        ax = fig.add_subplot(nimg,2,count)
        plt.axis('off')
        ax.plot()
        ax.set_xlim(0,1)
        ax.set_ylim(0,len(captions))
        for i, caption in enumerate(captions):
            ax.text(0,i,caption,fontsize=10)

In [None]:
## Show 5 images and corresponding text side by side 
fig = plt.figure(figsize=(10,20))
count = 1
    
for Path in df[:20].Path.unique():
    captions = list(df["Captions"].loc[df.Path== Path].values)
    plot_image_captions(Path,captions,fig,count,299,5)
    count +=2
plt.show()

## Pre-Processing the captions
1.Create the tokenized vectors by tokenizing the captions fore ex :split them using spaces & other filters. 
This gives us a vocabulary of all of the unique words in the data. Keep the total vocaublary to top 5,000 words for saving memory.

2.Replace all other words with the unknown token "UNK" .

3.Create word-to-index and index-to-word mappings.

4.Pad all sequences to be the same length as the longest one.

In [None]:
# create the tokenizer function
def tokenize_captions(top_freq_words,captions):
    special_chars = '!"#$%&()*+.,-/:;=?@[\]^_`{|}~ '
    tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_freq_words,
                                                  oov_token="UNK",
                                                  filters=special_chars,
                                                  lower=True, split=' ', char_level=False)
    tokenizer.fit_on_texts(captions)
    
    # Adding PAD to tokenizer list on index 0
    tokenizer.word_index['PAD'] = 0
    tokenizer.index_word[0] = 'PAD'   
   
    return tokenizer

In [None]:
# Create the tokenized vectors
top_freq_words = 5000
tokenizer = tokenize_captions(top_freq_words,df['Captions'])
cap_seqs = tokenizer.texts_to_sequences(df['Captions'])

In [None]:
cap_seqs[:5]

In [None]:
# Create word-to-index and index-to-word mappings functions.
def show_word_2_index(word):
    print("Word = {}, index = {}".format(word, tokenizer.word_index[word]))

def show_index_2_word(index):
    print("Index = {}, Word = {}".format(index, tokenizer.index_word[index]))

In [None]:
### show word-to-index and index-to-word mappings 
          
print("------Word to Index------")
show_word_2_index("PAD")
show_word_2_index("UNK")
show_word_2_index("<start>")
show_word_2_index("<end>")

print('\n')

print("-------Index to Word--------")
show_index_2_word(2)
show_index_2_word(1500)
show_index_2_word(3000)
show_index_2_word(4999)

In [None]:
# Create a word count of your tokenizer to visulize the Top 30 occuring words after text processing

word_count = pd.DataFrame.from_dict(tokenizer.word_counts,orient='index')
word_count.sort_values(by=[0],ascending=False , inplace=True)

words = list(word_count[0:30].index)
count =list(word_count[0:30][0])
show_top_words(list(range(0,30)),words,count)

In [None]:
# Pad each vector to the max_length of the captions & store it to a vairable

# If you do not provide a max_length value, pad_sequences calculates it automatically
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(cap_seqs, padding='post')

print("The shape of Caption vector is :" + str(cap_vector.shape))
print(cap_vector[:5])

## Pre-processing the images

1.Resize them into the shape of (299, 299)

3.Normalize the image within the range of -1 to 1, such that it is in correct format for InceptionV3. 

In [None]:
## as we will be using Inception V3 for trasfer learning for encoding , thats why using image size as below
image_shape = (299, 299)

In [None]:
def preprocess_image(image_path):
    ## applying tensorflow api to read img file , convert image jpeg file to array with reshape
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image,channels=3)
    image = tf.image.resize(image,image_shape) 
    ## preprocess image i.e normalize in corect format for Inception V3
    preprocessed_image = tf.keras.applications.inception_v3.preprocess_input(image)

    return preprocessed_image,image_path

In [None]:
## checking the function with one image 
preprocessed_image = preprocess_image(all_img_vector[0])
print("Image Shape :", preprocessed_image[0].shape)
print("\n")
print("Image Vector Values after normalize :\n\n",preprocessed_image)
print("\n Display Image after processing:\n")
plt.imshow(preprocess_image(all_img_vector[0])[0])

## Create the train & test data 
1.Combine both images & captions to create the train & test dataset using tf.data.Dataset API. Create the train-test spliit using 80-20 ratio & random state = 42

2.Make sure you have done Shuffle and batch while building the dataset

3.The shape of each image in the dataset after building should be (batch_size, 299, 299, 3)

4.The shape of each caption in the dataset after building should be(batch_size, max_len)


In [None]:
BATCH_SIZE = 512

In [None]:
# Creating Image dataset of preprocessed images into batches
images_path = sorted(set(all_img_vector)) ## taking only the unique paths
image_dataset = tf.data.Dataset.from_tensor_slices(images_path)
image_dataset = image_dataset.map(preprocess_image , num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(BATCH_SIZE)

In [None]:
### creating train and test split of image paths and caption vectors
image_train, image_test, caption_train, caption_test = train_test_split(df.Path,
                                                                        cap_vector,
                                                                        test_size=0.2,
                                                                        random_state=42)

In [None]:
## checking size of each train and test sets
print("Training data for images: " + str(len(image_train)))
print("Testing data for images: " + str(len(image_test)))
print("Training data for Captions: " + str(len(caption_train)))
print("Testing data for Captions: " + str(len(caption_test)))

## Load the pretrained Imagenet weights of Inception net V3

1.To save the memory(RAM) from getting exhausted, extract the features of thei mage using the last layer of pre-trained model. Including this as part of training will lead to higher computational time.

2.The shape of the output of this layer is 8x8x2048. 

3.Use a function to extract the features of each image in the train & test dataset such that the shape of each image should be (batch_size, 8*8, 2048)



In [None]:
## building the transfer learning model to extract the features from the images
image_model = tf.keras.applications.InceptionV3(include_top=False,weights='imagenet')

## input same shape as InceptionV3 and we have preprocessed the image vector accordingly
new_input = image_model.input

## output same shape as InceptionV3 last dense layer , not the softmax layer as we dropped it while model selection above
hidden_layer = image_model.layers[-1].output 

image_features_extract_model = tf.keras.Model(inputs=new_input, outputs=hidden_layer)
image_features_extract_model.summary()

In [None]:
### filename for saving the img features after extract
img_feature_dict_filename = 'img_feature_dict.npy'

In [None]:
### save and read the feature directory so that we don't have to rerun the model everytime.

def read_img_features(filename):
  ##if file is not extracted the extarct and use it
  read_dictionary = np.load(filename,allow_pickle='TRUE').item()
  return read_dictionary

def save_img_features(filename,feature_dict):
  np.save(filename, feature_dict)

In [None]:
## extracting the feature vector from each image and saving it in dictionary
image_feature_dict = {}

## if extarcted file is present or the zip is present then read from there else create feature file
if os.path.isfile(img_feature_dict_filename):
  ## extract the image features
  image_feature_dict = read_img_features(img_feature_dict_filename)

else:
  for image,path in tqdm(image_dataset):
      ## extracting features via transfer learning model
      batch_features = image_features_extract_model(image)
      batch_features = tf.reshape(batch_features,(batch_features.shape[0], -1, batch_features.shape[3]))
      for batch_f, p in zip(batch_features, path):
        path_of_feature = p.numpy().decode("utf-8")
        ## creating the dictionary via img path as id and feature vector as value
        image_feature_dict[path_of_feature] =  batch_f.numpy()  

  ## save the image features for reuse
  save_img_features(img_feature_dict_filename,image_feature_dict)
    

In [None]:
### function to get the img features and captions together
def map_function(image_name,caption):
    image_tensor = image_feature_dict[image_name.decode('utf-8')]
    return image_tensor,caption

In [None]:
### function to create a dataset having image feature vector and corresponding caption vector , using autotune and batch fetch
BUFFER_SIZE = 3000
def generate_dataset(images_data, captions_data):
    
    dataset = tf.data.Dataset.from_tensor_slices((images_data, captions_data))
    dataset = dataset.shuffle(BUFFER_SIZE) ## shuffling the dataset

    ## applying the map function on the dataset to extract the image features from the previously created image_feature_dict via the img path value
    dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_function, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(BATCH_SIZE)

    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE) ## appling prefetch based on Autotune buffer size for utilizing optimal resource
    return dataset

In [None]:
### creating the test and train dataset after shuffle and batch
train_dataset=generate_dataset(image_train,caption_train)
test_dataset=generate_dataset(image_test,caption_test)

In [None]:
## checking the shape of the sample batch for image and captions vector
sample_img_batch, sample_cap_batch = next(iter(train_dataset))
print(sample_img_batch.shape)  #(batch_size, 8*8, 2048)
print(sample_cap_batch.shape) #(batch_size,40)

## Model Building
1.Set the parameters

2.Build the Encoder, Attention model & Decoder

In [None]:
embedding_dim = 256 
units = 512
vocab_size = 5001

### Encoder

In [None]:
## this will take the features vectors already created via Inception V3 model above and reduce the dimension as per embedding vector shape 
## so as to keep both feature and caption vector in same shape for Attention model
class Encoder(Model):
    def __init__(self,embed_dim):
        super(Encoder, self).__init__()
        self.fc = layers.Dense(embed_dim , activation="relu") ## applying relu activation on the fc layer
        self.dropout = layers.Dropout(0.4) ## applying dropout on the fc layer to reduce overfitting
        
    def call(self, features):
        features = self.fc(features)
        return features

In [None]:
## creating object of the Encoder model
encoder=Encoder(embedding_dim)

### Attention model

In [None]:
## this custom model will take the feature vector from encoding model and 
## hidden vector from the decoding RNN model and create the context vector based on attention weights calculated 

class Attention_model(Model):
    def __init__(self, units):
        super(Attention_model, self).__init__()
        self.W1 = layers.Dense(units) ## for Img Feature Vector from Encoder
        self.W2 = layers.Dense(units) ## for Caption Hidden Vector from Decoder
        self.V = layers.Dense(1) ## for Scoring based on Feature & Hidden Vector
        self.units=units

    def call(self, features, hidden):
        #features shape: (batch_size, 8*8, embedding_dim)
        # hidden shape: (batch_size, hidden_size)
        hidden_with_time_axis =  tf.expand_dims(hidden, 1) ## adding an extra dimention to Hidden Decoder vector to match feature vector dimension
        score = keras.activations.tanh(self.W1(features) + self.W2(hidden_with_time_axis)) ## calculating the score vector from Feature and Hidden Vector
        attention_weights =  keras.activations.softmax(self.V(score), axis=1) ## converting the Score vector into attention weights via Softmax layer
        context_vector = attention_weights * features ## creating the context vector from Feature vector
        context_vector = tf.reduce_sum(context_vector, axis=1) 
        return context_vector, attention_weights

### Decoder

In [None]:
class Decoder(Model):
    def __init__(self, embed_dim, units, vocab_size):
        super(Decoder, self).__init__()
        self.units=units ## setting the decoder units
        self.attention = Attention_model(self.units) ## setting the attanetion model units for creating the W1 & W2 dense layers
        self.embed = layers.Embedding(vocab_size, embed_dim,mask_zero=False) ## creating an embeding layer for converting the caption vector as per embed dimension which matches the encoder output layer
        self.gru = tf.keras.layers.GRU(self.units,return_sequences=True,return_state=True,recurrent_initializer='glorot_uniform') ## defining the RNN GRU for predicting the words sequencially
        self.d1 = layers.Dense(self.units) ## defining a dense layer as per the decoder units
        self.d2 = layers.Dense(vocab_size) ## defining a dense layer for final decoder output  
        self.dropout = layers.Dropout(0.4) ## defining a dropout on the fc layer to reduce overfitting
        

    def call(self,x,features, hidden):
        context_vector, attention_weights = self.attention(features, hidden) ## calling the attention model to provide the context vector
        embed = self.dropout(self.embed(x)) ## applying the dropout defined above on the embedding layer
        mask = self.embed.compute_mask(x) ## applying masking on the  embedded input caption vector to not consider the padded values for model traning
        embed =  tf.concat([tf.expand_dims(context_vector, 1), embed], axis=-1) ## concatting the captions embedded masked vector with the context vector
        output,state = self.gru(embed,mask=mask) ## passing the final embedded vector after concat into the GRU
        output = self.d1(output) ##passing the output of GRU via the dense layer 1 defined above
        output = tf.reshape(output, (-1, output.shape[2])) ## reshaping the output layer of dense layer 1
        output = self.d2(output) ## passing the final output via the last dense layer 2 for match the output vector dimension
        return output,state, attention_weights
    
    def init_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

In [None]:
## creating the decoder object
decoder=Decoder(embedding_dim, units, vocab_size)

In [None]:
## checking the different model output shape from sample train batch
features=encoder(sample_img_batch) ## creating the encoder model

hidden = decoder.init_state(batch_size=sample_cap_batch.shape[0]) ## creating the initial hidden layer
dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * sample_cap_batch.shape[0], 1) ## creating the decoder input

predictions, hidden_out, attention_weights= decoder(dec_input, features, hidden) ## creating the decoder model
print('Feature shape from Encoder: {}'.format(features.shape)) #(batch, 8*8, embed_dim)
print('Predcitions shape from Decoder: {}'.format(predictions.shape)) #(batch,vocab_size)
print('Attention weights shape from Decoder: {}'.format(attention_weights.shape)) #(batch, 8*8, embed_dim)

## Model training & optimization
1.Set the optimizer & loss object

2.Create your checkpoint path

3.Create your training & testing step functions

4.Create your loss function for the test dataset

In [None]:
## defining the optimizer and loss objects
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
##optimizer = tf.keras.optimizers.SGD(learning_rate=0.001, momentum=0.9, nesterov=True)

loss_object = keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

In [None]:
## creating the custom loss function which handles the padded values while calculating the mean loss

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0)) ## creating the mask because we don't want to consider captions padding values for loss function else it will be wrong
    
    loss_ = loss_object(real, pred) ## calculating the loss value via the keras.losses.SparseCategoricalCrossentropy

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask ## applying the masked value on the top of loss output so that we don't consider padded value's loss

    return tf.reduce_mean(loss_) ## returning the mean loss value

In [None]:
## defining the checkpoint object to save the group of trackable objects to a checkpoint file during model training
checkpoint_path = "/content/checkpoints/"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=2)

In [None]:
## if model training breaks in between, when we start it again then it will check for last checkpoint 
## and start the epoch from there else will start from 0
start_epoch = 0

if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])

In [None]:
## using tensorflow functions decorator in order to turn plain Python code into graph for faster computation

@tf.function
def train_step(img_tensor, target): ##defining the model training function
    loss = 0 ## setting the initail loss to 0
    hidden = decoder.init_state(batch_size=target.shape[0]) ## setting up the initial hidden layer
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1) ## expanding the initial decorator input dimesnion by 1 from caption vector
    
    ## using gradiant tape for handling custom gradiant decent to update weights on the models
    with tf.GradientTape() as tape:
        features = encoder(img_tensor) ## extracting feature vector via the encoder
        for i in range(1, target.shape[1]): ## running loop though each word in he caption vector
            predictions, hidden, _ = decoder(dec_input, features, hidden) ## getting the output from the decoder
            loss += loss_function(target[:, i], predictions) ## calculating the loss from the decoder output and actual target word
            dec_input = tf.expand_dims(target[:, i], 1) ## decoder input with the actual word
        avg_loss = (loss/int(target.shape[1])) ## calculating the average loss after all words are passed
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables ## finding all the trainable variables from the excoder and decoder models
    gradients = tape.gradient(loss, trainable_variables) ## calculating the derivative of the total loss by all the trainable variables
    optimizer.apply_gradients(zip(gradients, trainable_variables)) ## applying the Adams optimizer based on the trainable variables based on the gradiant calculated
    return loss, avg_loss

In [None]:
## using tensorflow functions decorator in order to turn plain Python code into graph for faster computation

@tf.function
def test_step(img_tensor, target):##defining the model testing function
    each_batch_loss = 0
    hidden = decoder.init_state(batch_size=target.shape[0]) ## setting up the initial hidden layer
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1) ## expanding the initial decorator input dimesnion by 1 from caption vector

    features = encoder(img_tensor) ## extracting feature vector via the encoder

    ## iterating through each actual word and calculating the loss from prediction
    for i in range(1, target.shape[1]):
      predictions, hidden, _ = decoder(dec_input, features, hidden)
      each_batch_loss += loss_function(target[:, i], predictions) ## calculating the loss by comparing with actual words
      dec_input = tf.expand_dims(target[:, i], 1) ## decoder input with actual word

    avg_batch_loss = (each_batch_loss / int(target.shape[1])) ## mean loss of each word predicted

    return each_batch_loss, avg_batch_loss

In [None]:
## function to calculate the average test data loss
def test_loss_cal(test_dataset):
    total_avg_batch_loss = 0
    test_num_steps = 0
    for (batch, (img_tensor, target)) in enumerate(test_dataset):
        each_batch_loss, avg_batch_loss = test_step(img_tensor, target) ## finding the batch lossfrom test dataset
        total_avg_batch_loss += avg_batch_loss
        test_num_steps = test_num_steps + 1
    avg_test_loss=total_avg_batch_loss/test_num_steps ## calculating he avg test loss from all the batches
    return avg_test_loss

In [None]:
## Array to plot the train and test loss 
loss_plot = []
test_loss_plot = []

In [None]:
## executing the model training process , calcuting the training loss and test dataset loss on the model

EPOCHS = 200 ## max epoch
stop_epoch_count = 0

best_test_loss=100

for epoch in tqdm(range(0, EPOCHS)):
    start = time.time()
    total_avg_batch_loss = 0
    train_num_steps = 0
    for (batch, (img_tensor, target)) in enumerate(train_dataset):
        total_loss, avg_loss = train_step(img_tensor, target) ## finding the total and avg batch loss of train data
        total_avg_batch_loss += avg_loss ## summing up the average loss
        train_num_steps = train_num_steps + 1

    avg_train_loss=total_avg_batch_loss / train_num_steps ##calculating the average of the total no of batches
    loss_plot.append(avg_train_loss) ## appending the traning loss to plot graph   
    test_loss = test_loss_cal(test_dataset) ## calculating the loss from test data
    test_loss_plot.append(test_loss) ## appending the testing loss to plot graph 
    
    print ('For epoch: {}, the train loss is {:.3f}, & test loss is {:.3f}'.format(epoch+1,avg_train_loss,test_loss))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
    
    ## for each epoch is the test data loss is less than previous then saving the model via checkpoint
    if round(float(test_loss),3) < best_test_loss :
      print('Test loss has been reduced from %.3f to %.3f thus saving the checkpoint \n\n' % (best_test_loss, test_loss))
      best_test_loss = round(float(test_loss),3)
      stop_epoch_count = 0 ## reset counter once checkpoint saved
      ## save the group of trackable objects to a checkpoint file.
      ckpt_manager.save()
    else:
      stop_epoch_count = stop_epoch_count + 1 ##increase counter for non improve test loss epoch
      print("No Test Loss improvement in this Epoch\n")
      if stop_epoch_count >= 5: ## if loss doesn't reduce in last 5 epochs then stop execution
        print("No Improvement in Test Loss , hence we have reached the Global Minima. Stopping Epoch run.")
        break 

In [None]:
## plotting the traning and testing dataset avg loss values
plt.figure(figsize=(8,6))
plt.plot(loss_plot,label='Traning Loss')
plt.plot(test_loss_plot,label='Testing Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Plot')
plt.legend()
plt.show()

## Model Evaluation
1.Define your evaluation function using greedy search

2.Define your evaluation function using beam search ( optional)

3.Test it on a sample data using BLEU score

### Greedy Search

In [None]:
def evaluate(image):
    max_length=39

    hidden = decoder.init_state(batch_size=1) ## creating the hiden state for 1 item for Decoder model

    temp_input = tf.expand_dims(preprocess_image(image)[0], 0) #process the input image to desired format before extracting features
    img_tensor_val = image_features_extract_model(temp_input) ## extracting the features from img via Inception V3 transfer learning
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) ## reshapring the image

    attention_plot = np.zeros((max_length, img_tensor_val.shape[1])) ## creating the attention feature vector based on the caption max lenth

    features = encoder(img_tensor_val) ## passing the feature vector via encoder model

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0) ## creating the initial Decoder input
    result = []

    for i in range(max_length): ## running the loop though the max word count of the caption vector
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden) ## predicting each word via the Decoder model

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() ## adding the addention weights for the word returned from Decoder into attention_plot

        predicted_id = tf.argmax(predictions[0]).numpy() ## extracting the max probability word token from predictions vector
        result.append(tokenizer.index_word[predicted_id]) ## converting the token to word and saving it into list

        if tokenizer.index_word[predicted_id] == '<end>': ## if reach end of line via end tag then return and exit
            return result, attention_plot,predictions

        dec_input = tf.expand_dims([predicted_id], 0) ## else Decoder input the next word

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot,predictions

### Beam Search

In [None]:
def beam_evaluate(image, beam_index = 3):
    max_length=39
    start = [tokenizer.word_index['<start>']]
    result = [[start, 0.0]]

    hidden = decoder.init_state(batch_size=1)

    temp_input = tf.expand_dims(preprocess_image(image)[0], 0) #process the input image to desired format before extracting features
    img_tensor_val = image_features_extract_model(temp_input) ## extracting the features from img via Inception V3 transfer learning
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) ## reshapring the image

    attention_plot = np.zeros((max_length, img_tensor_val.shape[1])) ## creating the attention feature vector based on the caption max lenth

    features = encoder(img_tensor_val) ## passing the feature vector via encoder model

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0) ## creating the initial Decoder input

    while len(result[0][0]) < max_length: ## running a loop though the max number of words
        i=0
        temp = []
        for s in result: ## running loop through each word predicted and stored in results list
            predictions, hidden, attention_weights = decoder(dec_input, features, hidden) ## preduct the words from decoder along with attanetion wts and hidden layer
            attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() ## adding the addention weights for the word returned from Decoder into attention_plot
            i=i+1
            word_preds = np.argsort(predictions[0])[-beam_index:] ## selecting the max probality words by sorting based on beam index provided as input i.e top 3
          
            for w in word_preds: ## running a loop through the top beam index words from predictions vector returned above
                next_cap, prob = s[0][:], s[1] ## fetching the word and probability of the next word
                next_cap.append(w) 
                prob += np.log(predictions[0][w]) ## calculating the cumulative sum of probabilities of the top words predicted and appeneded 
                temp.append([next_cap, prob]) ##saving the combilation and total probaility in a variable
        result = temp ## once out of aloop sav ethe temp into results
        result = sorted(result, reverse=False, key=lambda l: l[1]) ## sort the results array in reverse order to get the max probability combination on top
        result = result[-beam_index:] ##selecting gthe top beam index combinations
        
        
        predicted_id = result[-1]
        pred_list = predicted_id[0]
        
        prd_id = pred_list[-1] 
        if(prd_id!=3): ## break if reach <end> tag
            dec_input = tf.expand_dims([prd_id], 0)  ## providing the best combination as input to the Decoder in next run
        else:
            break
    
    
    result2 = result[-1][0]
    
    ## creating the setence from the caption vector values
    intermediate_caption = [tokenizer.index_word[i] for i in result2]
    final_caption = []
    for i in intermediate_caption:
        if i != '<end>':
            final_caption.append(i)
            
        else:
            break

    attention_plot = attention_plot[:len(result), :]
    final_caption = ' '.join(final_caption[1:])
    return final_caption ,attention_plot

In [None]:
def plot_attmap(caption, weights, image):

    fig = plt.figure(figsize=(10, 10))
    temp_img = np.array(Image.open(image)) ## extracting the image array
    
    len_cap = len(caption) ## getting the max lenth of the caption
    for cap in range(len_cap): ## looping through the max word count and displaying the focused area of the image from where the word is predicted
        weights_img = np.reshape(weights[cap], (8,8)) ## getting the attention weight vector on each word
        weights_img = np.array(Image.fromarray(weights_img).resize(image_shape, Image.LANCZOS)) ## reshaping the weight vector
        
        ax = fig.add_subplot(len_cap//2, len_cap//2, cap+1) ## calculating the number of subplots based on no of words
        ax.set_title(caption[cap], fontsize=15) ## displaying the word predicted from each part of the focussed image
        
        img=ax.imshow(temp_img) ## displaying the actual image
        
        ## putting an overlay on top of the actual image to focus on a particular area based the attention weights for that word
        ax.imshow(weights_img, cmap='gist_heat', alpha=0.6,extent=img.get_extent()) 

        ax.axis('off')
    plt.subplots_adjust(hspace=0.2, wspace=0.2)
    plt.show()

In [None]:
from nltk.translate.bleu_score import sentence_bleu

In [None]:
## fuction to remove start end and unk tags from the captions predicted
def filt_text(text):
    filt=['<start>','UNK','<end>'] 
    temp= text.split()
    [temp.remove(j) for k in filt for j in temp if k==j]
    text=' '.join(temp)
    return text

In [None]:
rid = np.random.randint(0,len(df)) ## selecting a random index from dataframe
print("Random Test Image ID :" , rid)
test_image = df.Path[rid]

## getting the real caption from the df
real_captions = df.Captions[df.Path == test_image]

## extarting the prediction words from the image along with attention weight vctor for each word from Greedy search
greedy_result, greedy_attention_plot,pred_test = evaluate(test_image)

## extarting the prediction words from the image along with attention weight vector for each word from Beam search
beam_result, beam_attention_plot = beam_evaluate(test_image)

## creating the prediction sentence
greedy_pred_caption=' '.join(greedy_result).rsplit(' ', 1)[0]
greedy_candidate = greedy_pred_caption.split() ## predicted word list from greedy search
beam_candidate = beam_result.split() ## predicted word list beam search

filtered_real_captions = [] ## this is used to display with the actual image

greedy_bleu_score = 0
beam_bleu_score = 0

for real_caption in real_captions :
  ## removing the start , end and UNK from the real caption sentence
  real_caption=filt_text(real_caption)  

  ## adding the filtered the sentence to the list for future display
  filtered_real_captions.append(real_caption) 

  ## extracting the words from the real caption sentence
  reference = [] 
  reference.append(real_caption.split())

  ## getting the BLEU score for Greedy search words from the predicted and actual words for a image for each real caption
  greedy_bleu_score = greedy_bleu_score + sentence_bleu(reference, greedy_candidate, weights=(0.25, 0.25, 0.25, 0.25))  

  ## getting the BLEU score for Beam search words from the predicted and actual words for a image for each real caption
  beam_bleu_score = beam_bleu_score + sentence_bleu(reference, beam_candidate, weights=(0.25, 0.25, 0.25, 0.25)) 

## showing the average BLEU score comparing with all the actual captions for the image
print("\nMean BELU score for Greedy Search: " , round(((greedy_bleu_score / len(real_captions))*100),3)) 
print("\nMean BELU score for Beam Search: " , round(((beam_bleu_score / len(real_captions))*100),3)) 

print('\nGreedy Search Prediction Caption:', greedy_pred_caption)
print('\nBeam Search Prediction Caption:', beam_result)

## displying the attantion plot map to understand how each is predicted by focusing on each part of image -- Greedy Search
print('\nGreedy Search Plot:')
plot_attmap(greedy_result, greedy_attention_plot, test_image)

print('\nReal Captions:')
for caption in filtered_real_captions :
  print(caption)

## displaying actual image
Image.open(test_image)

## Installing and Importing the Google Text to Speech API

In [None]:
! pip install gTTS

In [None]:
# Import the required module for text to speech conversion
from gtts import gTTS

## Final Function to predict Text & Audio from any input image

In [None]:
def EyeForBlind(imagepath):
  result, __,_ = evaluate(imagepath) ## using Greedy Search
  beam_pred ,_ = beam_evaluate(imagepath) ## using Beam Search

  ## creating the prediction sentence
  pred_caption=' '.join(result).rsplit(' ', 1)[0]

  print('\nImage to Text Caption via Greedy Search :', pred_caption)
  print('\nImage to Text Caption via Beam Search :', beam_pred)
  print('\n')

  ## display the test image
  plt.figure(figsize=(10,8))
  plt.imshow(mpimg.imread(imagepath))

  # Language in which you want to convert
  language = 'en'
  
  # getting the sound from gTTS by passing the predicted caption
  soundobj = gTTS(text=pred_caption, lang=language, slow=False)
  
  # Saving the converted audio in a mp3 file named
  soundobj.save("Image_to_Sound.mp3")

  # Playing the converted file
  os.system("./Image_to_Sound.mp3")
  

In [None]:
## checking the model prediction with random uploaded image from google
EyeForBlind("test.jpg")