In [None]:
try:
  # %tensorflow_version only exists in Colab.
  import tensorflow.compat.v2 as tf
except Exception:
  pass
tf.enable_v2_behavior()

# You'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

#from __future__ import absolute_import, division, print_function, unicode_literals
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
from tqdm.auto import tqdm
import csv
import pandas as pd

import nltk
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('punkt')


from nltk.stem import WordNetLemmatizer 
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 
from nltk.tokenize.treebank import TreebankWordDetokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
 


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


# Reload Data and Preprocess

In [None]:
# Download caption annotation files
annotation_folder = '/annotations/'
if not os.path.exists(os.path.abspath('.') + annotation_folder):
  annotation_zip = tf.keras.utils.get_file('captions.zip',
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
  annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'
  os.remove(annotation_zip)

# Download image files
image_folder = '/train2014/'
if not os.path.exists(os.path.abspath('.') + image_folder):
  image_zip = tf.keras.utils.get_file('train2014.zip',
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip) + image_folder
  os.remove(image_zip)
else:
  PATH = os.path.abspath('.') + image_folder

Downloading data from http://images.cocodataset.org/annotations/annotations_trainval2014.zip
Downloading data from http://images.cocodataset.org/zips/train2014.zip


In [None]:
annotation_file = './annotations/captions_train2014.json'

PATH = './train2014'

In [None]:
# Read the json file
with open(annotation_file, 'r') as f:
    annotations = json.load(f)

# Store captions and image names in vectors
all_captions = []
all_img_name_vector = []

for annot in annotations['annotations']:
    caption = '<start> ' + annot['caption'] + ' <end>'
    image_id = annot['image_id']
    full_coco_image_path = os.path.join(PATH, 'COCO_train2014_' + '%012d.jpg' % (image_id))

    all_img_name_vector.append(full_coco_image_path)
    all_captions.append(caption)

# Shuffle captions and image_names together
# Set a random state
train_captions, img_name_vector = shuffle(all_captions,
                                          all_img_name_vector,
                                          random_state=1)

# Select the first 30000 captions from the shuffled set
num_examples = 30000
train_captions = train_captions[:num_examples]
img_name_vector = img_name_vector[:num_examples]

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

In [None]:
image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
# Get unique images
encode_train = sorted(set(img_name_vector))

# Feel free to change batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

for img, path in tqdm(image_dataset):
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

HBox(children=(FloatProgress(value=0.0, max=1622.0), HTML(value='')))




In [None]:
# Find the maximum length of any caption in our dataset
def calc_max_length(tensor):
    return max(len(t) for t in tensor)


# Choose the top 5000 words from the vocabulary
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)
#train_seqs = tokenizer.texts_to_sequences(train_captions)

tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(train_captions)

# Pad each vector to the max_length of the captions
# If you do not provide a max_length value, pad_sequences calculates it automatically
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

# Calculates the max_length, which is used to store the attention weights
max_length = calc_max_length(train_seqs)

In [None]:
# Create training and validation sets using an 80-20 split
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,
                                                                    cap_vector,
                                                                    test_size=0.0333,
                                                                    random_state=0)

# NEW: Work with Caption to Got Most Similar Images with Your Query

## Reload previous Results
Here you can start to test your model. We provide you a baseline model that use BLEU score in order to compare 2 captions.
This is a first approach and you have to improve it! 

Loading the Data and "all_captions.csv" file you don't have to train again all image captioning model. 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [None]:
all_captions = pd.read_csv("/content/drive/My Drive/Colab Notebooks/Google TC/all_captions_30.csv", sep=',') 

real_captions = [x.split() for x in all_captions['true_caption'].tolist()]
pred_captions = [x.split() for x in all_captions['pred_caption'].tolist()]

In [None]:
len(pred_captions), len(real_captions)

(1000, 1000)

## Use BLEU Score as Similarity Criterion

In [None]:
import warnings
import nltk
import cv2
warnings.filterwarnings("ignore")

In [None]:
'''
embeddings_dict = {}
with open("glove.6B.50d.txt", 'r') as f:
    for line in f:
        values = line.split()
        word = values[0]
        vector = np.asarray(values[1:], "float32")
        embeddings_dict[word] = vector
'''

In [None]:
# FUNCTION BY USING TF-IDF

def get_similar_result_with_tf_idf(idx, real_captions, pred_captions):
    
    stop_words = set(stopwords.words('english'))
    vectorizer = TfidfVectorizer() # tf_idf object
    lemmatizer = WordNetLemmatizer() # lemmatizer object
    
    cosine_score_list = []

    for idx_2 in range(len(pred_captions)):
      sentence_real2 = TreebankWordDetokenizer().detokenize(
          [lemmatizer.lemmatize(w) for w in real_captions[idx] if not w in stop_words]
      ) # delete stopwords and lemmatize words
      sentence_pred2 = TreebankWordDetokenizer().detokenize(
          [lemmatizer.lemmatize(w) for w in pred_captions[idx_2] if not w in stop_words]
      ) # delete stopwords and lemmatize words


      vectors = vectorizer.fit_transform([sentence_real2, sentence_pred2]) # compute tf-idf

      dense = vectors.todense() # from m sparse to dense matrix

      cosine_score = cosine_similarity(dense[0],dense[1]) #compute cosine similarity
      cosine_score_list.append((idx_2, cosine_score))

    cosine_score_list.sort(key=lambda x: x[1], reverse=True)

    return cosine_score_list

In [None]:
# BLUE USING NO STOPWORDS FUNCTION (NOT USE IT, BEACUSE IT RETURNS BAD RESULTS)
'''
def get_similar_result_bleu_no_stops(idx, real_captions, pred_captions):
    
    stop_words = set(stopwords.words('english')) 
    lemmatizer = WordNetLemmatizer()
    
    
    b_score_list = []

    for idx_2 in range(len(pred_captions)):
 
        filtered_sentence_real = [w for w in real_captions[idx] if not w in stop_words]
        filtered_sentence_pred = [w for w in pred_captions[idx_2] if not w in stop_words]
  
        b_score = nltk.translate.bleu_score.sentence_bleu([filtered_sentence_real], filtered_sentence_pred)
        b_score_list.append((idx_2, b_score))

    b_score_list.sort(key=lambda x: x[1], reverse=True)
    print(b_score_list)

    return b_score_list
'''

## Create File with Your Submission Results

In [None]:
def create_submission_file(top_k, img_name_val, real_captions, pred_captions):

    with open('./submission_nostopwords_lemma.csv', 'w') as file:
        writer = csv.writer(file)
        writer.writerow(["caption", "image_list"])

        for idx in tqdm(range(len(img_name_val))):

            #b_score_res = get_similar_result_bleu_no_stops(idx, real_captions, pred_captions)
            b_score_res = get_similar_result_with_tf_idf(idx, real_captions, pred_captions)

            

            writer.writerow([' '.join(real_captions[idx]), ' '.join(list(map(lambda x: str(x[0]), b_score_res[:top_k])))])

In [None]:
# Initialize spacy 'en' model, keeping only tagger component needed for lemmatization
#nlp = spacy.load('en', disable=['parser', 'ner'])

#create submissions
create_submission_file(len(img_name_val), img_name_val, real_captions, pred_captions)

HBox(children=(FloatProgress(value=0.0, max=1000.0), HTML(value='')))

## View Some Results

### Show Qualitative Results for a Choosen Index

In [None]:
def show_image(image_fname, new_figure=True):
  if new_figure:
    plt.figure()
  np_img = cv2.imread(image_fname)
  np_img = cv2.cvtColor(np_img, cv2.COLOR_BGR2RGB)
  plt.imshow(np_img) 

def show_qualitative_results(idx1, top_k=20):

    #b_score_res = get_similar_result_bleu(idx1, real_captions, pred_captions)
    b_score_res = get_similar_result_with_tf_idf(idx1, real_captions, pred_captions)
    

    print("Real capt:", ' '.join(real_captions[idx1]))
    print("Pred capt:", ' '.join(pred_captions[idx1]))
    sentence1 = [w for w in real_captions[idx1] if not w in stopword_punct]
    sentence2 = [w for w in pred_captions[idx1] if not w in stopword_punct]
    ss = nltk.translate.bleu_score.sentence_bleu([sentence1], sentence2)
    print("Score with True Predicted caption:", ss)
    print()

    show_image(img_name_val[idx1], new_figure=False)
    plt.grid(False)
    plt.ioff()
    plt.axis('off')


    fig = plt.figure(figsize=(10, 7))

    for idx2, (idx, sim_val) in enumerate(b_score_res[:20]):
        print(idx, sim_val, ' '.join(pred_captions[idx]))
        plt.subplot(4, 5, idx2+1)
        show_image(img_name_val[idx], new_figure=False)
        plt.grid(False)
        plt.ioff()
        plt.axis('off')
        plt.title('{}'.format(idx2+1))

In [None]:
show_qualitative_results(idx1 = 0)

### Show Distribution of Right Prediction

In [None]:
all_idx = []
top_k = 1000

for ref_idx in tqdm(range(len(img_name_val))):
    b_score_res = get_similar_result_bleu(ref_idx, real_captions, pred_captions)
    list_res = list(map(lambda x: x[0], b_score_res[:top_k]))
    index = list_res.index(ref_idx)
    all_idx.append(index)

n, bins, patches = plt.hist(all_idx, bins=1000)
plt.xlabel('top K')
plt.ylabel('Frequency')

plt.show()