In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import string
import cv2
from PIL import Image
from keras.preprocessing import image
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, Embedding, Input, LSTM
from tensorflow.keras.layers import Add
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.inception_v3 import preprocess_input
from tensorflow.keras.models import Model
import nltk
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
import matplotlib.pyplot as plt

In [None]:
%matplotlib inline

## generating tokens 
mapping image name to list of captions

In [1]:
filename = "/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr8k.token.txt"
file = open(filename,"r")
doc = file.readlines()

FileNotFoundError: [Errno 2] No such file or directory: '/Flickr_Data/Flickr_TextData/Flickr8k.token.txt'

In [None]:
descriptions ={} # a dictionary where key is image_id and value is list of 5 captions 
for line in doc:
    tokens = line.split("\t")
    img_id = tokens[0].split(".")[0]
    img_desc = tokens[1].strip('\n').lower() #converting all characters into lower case
    if img_id not in descriptions:
        descriptions[img_id]=[]
    descriptions[img_id].append(img_desc)
descriptions

## Data cleaning
1. removing punctuations
2. removing single characters
3. converting to lower case

In [None]:
# prepare translation table for removing punctuation
table = str.maketrans('', '', string.punctuation)

for img_id, captions in descriptions.items():
    for i in range(len(captions)):
        desc = captions[i].split()
        # remove punctuation from each token
        desc = [w.translate(table) for w in desc]
        desc = [word for word in desc if len(word)>1 and word.isalpha()]
        
        captions[i] = " ".join(desc)


## Preparing Train captions

In [None]:
# <============ Prepating list of image ids of train data ===============>
def load_data(filename):
    file = open(filename,'r')
    doc = file.readlines()
    dataset = []
    image_folder_path = '/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Images/'
    
    for line in doc:
        if len(line)<1:
            continue
        image_id = line.split('.')[0]
        image_path = image_folder_path + image_id + '.jpg'
        if(os.path.isfile(image_path)):
            dataset.append(image_id)
    return dataset
 
# <================== mapping dataset images to their captions ============>
'''
parameter 1: 'all captions' is 'descriptions' dictionary which we created earlier
parameter 2 : dataset: train, dev or test
'''
def map_captions(all_captions, dataset): 
    dataset_desc={}
    for img_id in dataset:
        captions = all_captions[img_id]
        for i in range(len(captions)):
            if img_id not in dataset_desc:
                dataset_desc[img_id]=[]
            desc =captions[i].split()
            desc = 'startseq ' + ' '.join(desc) + ' endseq'
            dataset_desc[img_id].append(desc)
    return dataset_desc

#train_descriptions
train_images_file_name = "/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.trainImages.txt"
train_dataset = load_data(train_images_file_name)  
train_descriptions = map_captions(descriptions, train_dataset)

# <================== keeping words with word count >=10 =============>
train_captions=[]

for key, values in train_descriptions.items():
    for val in values:
        train_captions.append(val)
        

# <============ getting frequency for all words ======================>
word_counts={}
for cap in train_captions:
    for w in cap.split(' '):
        word_counts[w] = word_counts.get(w, 0) + 1
        
#<===================ploting bar plot of words==========================>
x=word_counts.keys()
y=word_counts.values()
plt.bar(x,y)
#plt.show()


In [None]:
# <============ filtering words with freq>=10 ======================>
vocab = [w for w in word_counts if word_counts[w] >= 10]

# <=========== index to word and word to index mapping===============>  
w_to_ix={}
ix_to_w={}
i=0
for word in vocab:
    w_to_ix[word] = i
    ix_to_w[i]=word
    i+=1
print(len(w_to_ix))
print(len(vocab))

In [None]:
# description with most number of words
max_length = max(len(d.split()) for d in train_captions)
max_length

## InceptionV3 model

In [None]:
# Load the inception v3 model
model = InceptionV3(weights='imagenet')
# removing the last layer (output layer) from the inception v3
model_CNN = Model(model.input, model.layers[-2].output)
model_CNN.summary()

## get image feature vector
input is id of image

In [None]:
def get_image_feature_vector(image_id):
    # path of image
    image_folder_path = '/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Images/'
    image_path = image_folder_path + image_id + '.jpg'
    # converting image to required size of inception model
    #print('image id is ',image_id)
    img = image.load_img(image_path, target_size=(299, 299))
    # Convert image to numpy array of 3-dimensions
    img_arr = image.img_to_array(img)
    #rescaling image
    imag_arr = img_arr/255.0
    # Add one more dimension
    img_arr = np.expand_dims(img_arr, axis=0)
    #print('image size is' , img_arr.shape)
    
    # preprocess the images using preprocess_input() from inception module
    img_arr = preprocess_input(img_arr)
    
    image_feature_vector = model_CNN.predict(img_arr) # Get the encoding vector for the image
    image_feature_vector = np.reshape(image_feature_vector, image_feature_vector.shape[1]) # reshape from (1, 2048) to (2048, )
    return image_feature_vector
    

## Data Generator

In [None]:
'''
Parameter 1: Dict with key as image_id and value as list of captions associated with image
Parameter 2: word-to-index conversion dict
Parameter 3: vocabulary
Parameter 4: max_length of a sentences
Parameter 5: batch size of data generated
'''
# data generator, intended to be used in a call to model.fit_generator()
def data_generator(train_descriptions, w_to_ix, vocab, max_length, batch_size):
    image_vectors, caption_sequences, output_sequences = [], [], []
    n=0
    # loop for ever over images
    while 1:
        for image_id, captions_list in train_descriptions.items():
            n+=1
            # retrieve the image vector
            image_feature_vector = get_image_feature_vector(image_id)
            
            for caption in captions_list:
                # encode the sequence
                caption_sequence = [w_to_ix[word] for word in caption.split(' ') if word in vocab]
                # split one sequence into multiple X, y pairs
                for i in range(1, len(caption_sequence)):
                    # split into input and output pair
                    input_sequence, output_sequence = caption_sequence[:i], caption_sequence[i]
                    # pad input sequence
                    input_sequence = pad_sequences([input_sequence], maxlen=max_length)[0]
                    # encode output sequence
                    output_sequence = to_categorical([output_sequence], num_classes=len(vocab))[0]
                    
                    image_vectors.append(image_feature_vector)
                    caption_sequences.append(input_sequence)
                    output_sequences.append(output_sequence)
                    
            # yield the batch data
            if n==batch_size:
                #print(np.array(image_vectors).shape,np.array(caption_sequences).shape,np.array(output_sequences).shape)
                yield ([np.array(image_vectors), np.array(caption_sequences)], np.array(output_sequences))
                image_vectors, caption_sequences, output_sequences = [], [], []
                n=0

## RNN model

In [None]:
vocab_size = len(vocab)
# image feature extractor model
inputs_image = Input(shape=(2048,))
fe1 = Dropout(0.5)(inputs_image)
fe2 = Dense(256, activation='relu')(fe1)

# partial caption sequence model
inputs_caption = Input(shape=(max_length,))
seq1 = Embedding(vocab_size, 200)(inputs_caption)
seq2 = Dropout(0.5)(seq1)
seq3 = LSTM(256)(seq2)

# decoder (feed forward) model
decoder1 = Add()([fe2, seq3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

# merge the two input models
model_RNN = Model(inputs=[inputs_image, inputs_caption], outputs=outputs)
model_RNN.summary()

## Training

In [None]:
model_weights = '/kaggle/input/iitb-caption-weight/model_RNN_v2_0.0001.h5'

In [None]:
model_RNN.load_weights(model_weights)
model_RNN.compile(loss='categorical_crossentropy', optimizer='adam')

In [None]:
epochs = 10
batch_size_train = 16 #16 * tpu_strategy.num_replicas_in_sync
steps_train = len(train_descriptions)//batch_size_train

In [None]:
'''
train_generator = data_generator(train_descriptions, w_to_ix,vocab, max_length, batch_size_train)    
model_RNN.fit_generator(train_generator, epochs=10, steps_per_epoch=steps_train )
model_RNN.save('final_model')
model_RNN.save_weights('model_RNN.h5')
'''

## Prediction Techniques

### Greedy Search

In [None]:
def greedySearch(image_id):
    image_feature_vector = get_image_feature_vector(image_id).reshape(1,-1)
    caption = 'startseq'
    for i in range(max_length):
        sequence = [w_to_ix[w] for w in caption.split(' ') if w in w_to_ix]
        sequence = pad_sequences([sequence], maxlen=max_length)
        #print(image_feature_vector.shape)
        predicted_vector = model_RNN.predict([image_feature_vector,sequence], verbose=0)
        predicted_index = np.argmax(predicted_vector)
        predicted_word = ix_to_w[predicted_index]
        caption += ' ' + predicted_word
        if predicted_word == 'endseq':
            break
    
    return caption

### Beam Search

In [None]:
# selecting k top probablities sentences for each iteration
def beamSearch(image_id,k):
    image_feature_vector = get_image_feature_vector(image_id).reshape(1,-1)
    # search space is a list of size k with each caption having a probablity
    search_space = []
    caption = 'startseq'
    
    for i in range(k):
        search_space.append([caption,1.0])
        
    for i in range(max_length):
        # all_candidates store k^2 cadidate captions
        all_candidates=[]
        for j in range(k):
            
            sequence = [w_to_ix[w] for w in search_space[j][0].split(' ') if w in w_to_ix]
            
            if sequence[-1]=='endseq':
                all_candidates.append(search_space[j][0])
                continue
                
            sequence = pad_sequences([sequence], maxlen=max_length)
            predicted_vector = model_RNN.predict([image_feature_vector,sequence], verbose=0)
            #print(predicted_vector[:6])
            predicted_indexes = np.argsort(predicted_vector).T[-k:]
            #print(predicted_indexes)
            for index in list(predicted_indexes):
             #   print(index[0])
                predicted_word = ix_to_w[index[0]]
                new_caption = search_space[j][0]+ ' ' + predicted_word
                cur_prob = np.array(predicted_vector[:,index[0]])[0]
              #  print(cur_prob)
                caption_prob = cur_prob*search_space[j][1]
                all_candidates.append([new_caption,caption_prob])
        
        ordered = sorted(all_candidates, key=lambda tup:tup[1])
        # select k best
        search_space = ordered[:k]
        flag=0
        for caption in search_space:
            sequence = [w for w in caption[0].split(' ')]
            if sequence[-1]!='endseq':
                flag=1
        if(flag==0):
            break
                
    return sorted(search_space, key=lambda tup:tup[1])[0][0]

## Validating Model using BLEU score

In [None]:
def get_validation_bleu_score(greedy=True,k=1):
    
    #validation_descriptions is dict with image_id as key and list of 5 captions as value
    val_images_file_name = "/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.devImages.txt"
    val_dataset = load_data(val_images_file_name)  
    val_descriptions = map_captions(descriptions, val_dataset)
    
    val_predictions={} # dict with image_id as key and predicted caption as value
    if(greedy):  
        for image_id in val_descriptions.keys():
            caption_generated = greedySearch(image_id)
            val_predictions[image_id]=caption_generated
    else:
        for image_id in val_descriptions.keys():
            caption_generated = beamSearch(image_id,k)
            val_predictions[image_id]=caption_generated
    
    # candidates : list of predicted captions for each image
    # references : list of 5 reference captions for each image
    candidates , references = [],[]
    for image_id in val_predictions.keys():
        candidates.append(val_predictions[image_id])
        references.append(val_descriptions[image_id])
        
    val_bleu_score = corpus_bleu(references,candidates)
    print('val bleu score is --', val_bleu_score)
    
    bleu_score_arr = []
    # dict with image id as key and list of [ candidate , reference ,belu_score ]  as value, dict_i_j has images with bleu score betweeen 0.i to 0.j
    dict_3_4 , dict_4_5 = {} , {}
    for i,(candidate,reference) in enumerate(zip(candidates,references)):
        sent_bleu = sentence_bleu(reference,candidate)
        bleu_score_arr.append(sent_bleu)
        #image_id = validation_predictions.keys()[i]
        #dict_value = [candidate,reference,sent_bleu]
        
        #if(sent_bleu < 0.4):
         #   dict_3_4[image_id]=value
        #elif(sent_bleu < 0.5):
         #   dict_4_5[image_id]=value
                                            
    return bleu_score_arr

## Testing 

In [None]:
def get_test_bleu_score(greedy=True,k=1):
    
    #test_descriptions is dict with image_id as key and list of 5 captions as value
    test_images_file_name = "/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.testImages.txt"
    test_dataset = load_data(test_images_file_name)  
    test_descriptions = map_captions(descriptions, test_dataset)
    
    test_predictions={} # dict with image_id as key and predicted caption as value
    
    if(greedy):
        for image_id in test_descriptions.keys():
            caption_generated = greedySearch(image_id)
            test_predictions[image_id]=caption_generated
    else:
        for image_id in test_descriptions.keys():
            caption_generated = beamSearch(image_id,k)
            test_predictions[image_id]=caption_generated
    
    # candidates : list of predicted captions for each image
    # references : list of 5 reference captions for each image
    candidates , references = [],[]
    for image_id in test_predictions.keys():
        candidates.append(test_predictions[image_id])
        references.append(test_descriptions[image_id])
        
    test_bleu_score = corpus_bleu(references,candidates)
    print('test bleu score is --', test_bleu_score)
    bleu_score_arr = []
    # dict with image id as key and list of [ candidate , reference ,belu_score ]  as value, dict_i_j has images with bleu score betweeen 0.i to 0.j
    dict_3_4 , dict_4_5 = {} , {}
    for i,(candidate,reference) in enumerate(zip(candidates,references)):
        sent_bleu = sentence_bleu(reference,candidate)
        bleu_score_arr.append(sent_bleu)
        image_id = list(test_predictions.keys())[i]
        dict_value = [candidate,reference,sent_bleu]
        
        if(sent_bleu < 0.4):
            dict_3_4[image_id]=dict_value
        elif(sent_bleu < 0.5):
            dict_4_5[image_id]=dict_value
                                            
                                            
    return bleu_score_arr , dict_3_4 , dict_4_5

# Analysis of bleu score

In [None]:
###histogram of test bleu score using greedy search
greedy_test_scores, greedy_test_dict_3_4, greedy_test_dict_4_5 = get_test_bleu_score()
bins = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
plt.figure(figsize=[10,8])
plt.hist(greedy_test_scores,bins)
plt.xlabel('Test Bleu Scores')
plt.ylabel('Count')
plt.title('Bleu Scores on Test Data using Greedy Search')
plt.savefig('Test_Greedy.jpg')
plt.show()

In [None]:
for image_id in greedy_test_dict_4_5.keys():
    image_folder_path = '/kaggle/input/flickr8k/Flickr_Data/Flickr_Data/Images/'
    image_path = image_folder_path + image_id + '.jpg'
    img = image.load_img(image_path, target_size=(299, 299))
    plt.imshow(img)
    plt.show()
    print(image_id)
    print(greedy_test_dict_4_5[image_id][0] )
    print(greedy_test_dict_4_5[image_id][1][0] )
    print(greedy_test_dict_4_5[image_id][2] )
    print()

In [None]:
###histogram of validation bleu score using greedy search
greedy_val_scores = get_validation_bleu_score()
bins = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
plt.figure(figsize=[10,8])
plt.hist(greedy_val_scores,bins)
plt.xlabel('Validation Bleu Scores')
plt.ylabel('Count')
plt.title('Bleu Scores on Validation Data using Greedy Search')
plt.savefig('Validation_Greedy.jpg')
plt.show()

In [None]:
###histogram of test bleu score using beam search
scores = []
for k in range(2,5):
    score = get_test_bleu_score(False,k)
    scores.append(score)
bins = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
plt.figure(figsize=[10,8])
plt.hist(scores,bins)
plt.xlabel('Test Bleu Scores')
plt.ylabel('Count')
plt.title('Bleu Scores on Test Data using Beam Search')
plt.legend(['k=2','k=3','k=4'])
plt.savefig('Test_Beam.jpg')
plt.show()   

In [None]:
###histogram of validation bleu score using beam search
scores = []
for k in range(2,4):
    score = get_validation_bleu_score(False,k)
    scores.append(score)
bins = [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
plt.figure(figsize=[10,8])
plt.hist(scores,bins)
plt.xlabel('Validation Bleu Scores')
plt.ylabel('Count')
plt.title('Bleu Scores on Validation Data using Beam Search')
plt.legend(['k=2','k=3'])
plt.savefig('Validation_Beam.jpg')
plt.show() 

##-------------------------------------------------------FINISH--------------------------------------------------------------------------