Image retrieval based on the crowd-sourced Hindi captions.

This model only supports Hindi captions.

# **Loading the dataset**

In [None]:
import os
from google.colab import drive
from PIL import Image
from numpy import asarray
import random
import numpy as np

In [None]:
!pip install tensorflow-text

Collecting tensorflow-text
  Downloading tensorflow_text-2.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
Collecting tensorflow<2.16,>=2.15.0 (from tensorflow-text)
  Downloading tensorflow-2.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (475.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m475.2/475.2 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
Collecting tensorboard<2.16,>=2.15 (from tensorflow<2.16,>=2.15.0->tensorflow-text)
  Downloading tensorboard-2.15.1-py3-none-any.whl (5.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.5/5.5 MB[0m [31m99.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tensorflow-estimator<2.16,>=2.15.0 (from tensorflow<2.16,>=2.15.0->tensorflow-text)
  Downloading tensorflow_estimator-2.15.0-py2.py3-none-any.whl (441 kB)
[2K     [90m━━━━━━━━━━━━━━━━━

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras import applications
import tensorflow_hub as hub
import tensorflow_text as text

In [None]:
seed = 21
np.random.seed(seed)
tf.random.set_seed(seed)

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
caption_path = '/content/drive/MyDrive/CSCI567/Flickr8k_text/Flickr-Hindi.txt'
f = open(caption_path)
lines = f.readlines()
captions = [] # stores entries in format - label(image name), caption
for line in lines:
  parts = line.split("	")
  captions.append([parts[0].split("#")[0], parts[1]])

captions = np.array(captions)

In [None]:
def column(arr, i):
    return [row[i] for row in arr]

In [None]:
# creating a list of the 500 unique images that the captions txt file has
print(len(column(captions,0)))
unique_images = set(column(captions,0))
print(len(unique_images))

2472
500


In [None]:
image_directory = '/content/drive/MyDrive/CSCI567/Flickr8k_Dataset/Flicker8k_Dataset'
image_paths_temp = [os.path.join(image_directory, filename) for filename in unique_images]

In [None]:
print(len(image_paths_temp))

500


In [None]:
# image paths which correspond to actual images in the dataset
image_paths = []
for path in image_paths_temp:
  if os.path.isfile(path):
    image_paths.append(path)

In [None]:
print(len(image_paths))

229


In [None]:
images = []

for path in image_paths:
  image = Image.open(path).resize((64,64))
  data = asarray(image) # might need to reshape
  image_label = path.split(image_directory+'/')[1]
  images.append([image_label,data])
  #print(image_label, data)

In [None]:
# TEST-TRAIN SPLIT
#print(captions)
np.random.shuffle(captions)
#print(captions)
split = int(len(captions)*0.8)
training_captions, test_captions = captions[:split,:], captions[split:,:]
print(len(training_captions), len(test_captions))

1977 495


**Creating triplets**

In [None]:
def triplet_generator(images, captions, num_triplets_per_caption):
    imgs = []
    positives = []
    negatives = []
    # loop through the captions, each caption and it's corresponding image will be stored with one random negative caption, creating a triplet
    for caption in captions:
        positive = caption[1]
        label = caption[0]
        image = None

        for img in images:
            if img[0] == label:
                image = np.array(img[1])

        if image is None:
            continue

        for i in range(num_triplets_per_caption):
          while True:
            chosen_caption = random.choice(captions)
            l = chosen_caption[0]
            if l != label:
                negative = chosen_caption[1]
                break
          # triplets.append([image, positive, negative])
          imgs.append(image)
          positives.append(positive)
          negatives.append(negative)

    print(f"Dataset size: {len(imgs)}")
    dummy_labels = np.array([0] * len(imgs))
    return np.array(imgs, dtype='float32'), np.array(positives), np.array(negatives), dummy_labels


In [None]:
imgs, positives, negatives, dummy_labels = triplet_generator(images, training_captions, 3)

Dataset size: 2718


In [None]:
print(np.shape(positives))

(2718,)


In [None]:
print(positives[0])

कुत्ता जंपिंग खेल चल रहा है. 



# **Model**

In [None]:
# TRIPLET LOSS- referred to https://github.com/prabhnoor0212/Kaggle-Recognizing-faces-in-the-wild/blob/master/kinship_triplet_loss.ipynb

def triplet_loss(y_pred, alpha = 0.5):
    # print(y_true.shape, y_pred.shape)
    anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), axis=-1)
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), axis=-1)
    basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), alpha)
    loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0))

    return loss

In [None]:
# Image base
img_base = applications.ResNet50(weights = 'imagenet', include_top=False, input_shape= (64,64,3), pooling='avg')

# Text base

tfhub_handle_encoder = "https://www.kaggle.com/models/google/muril/frameworks/TensorFlow2/variations/muril/versions/1"
tfhub_handle_preprocess = "https://kaggle.com/models/google/muril/frameworks/TensorFlow2/variations/preprocess/versions/1"

preprocessor = hub.load(tfhub_handle_preprocess)
tokenize = hub.KerasLayer(preprocessor.tokenize)
seq_length = 8
bert_pack_inputs = hub.KerasLayer(
    preprocessor.bert_pack_inputs,
    arguments=dict(seq_length=seq_length))

bert_preprocess_model = layers.Lambda(lambda x: bert_pack_inputs([tokenize(x)]))
bert_model = hub.KerasLayer(tfhub_handle_encoder, name="text_bert_output")

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
# MAIN MODEL ARCHITECTURE

img_inputs = layers.Input(shape=(64,64,3), dtype="float32", name="input_image")
positive_text_inputs = layers.Input(shape=(), dtype=tf.string, name="positive_input_captions")
negative_text_inputs = layers.Input(shape=(), dtype=tf.string, name="negative_input_captions")

# positive text embedding
text_preprocessed_p = bert_preprocess_model(positive_text_inputs)
bert_embedded_p = bert_model(text_preprocessed_p)
embedded_p = tf.reduce_mean(bert_embedded_p['sequence_output'], axis=1)

# negative text embedding
text_preprocessed_n = bert_preprocess_model(negative_text_inputs)
bert_embedded_n = bert_model(text_preprocessed_n)
embedded_n = tf.reduce_mean(bert_embedded_n['sequence_output'], axis=1)

# image embedding
image_embedding = img_base(img_inputs)

n_hidden = 12
l2_norm_layer = layers.Lambda(lambda x: tf.math.l2_normalize(x))

text_dense_layer = keras.Sequential([
  layers.Dense(n_hidden, activation="relu", name="text_dense_1"),
  layers.Dense(n_hidden, name="text_dense_2"),
  l2_norm_layer,
])

image_dense_layer = keras.Sequential([
  layers.Dense(n_hidden, activation="relu", name="image_dense_1"),
  layers.Dense(n_hidden, name="image_dense_2"),
  l2_norm_layer,
])


positive_emb = text_dense_layer(embedded_p)

negative_emb = text_dense_layer(embedded_n)

image_emb = image_dense_layer(image_embedding)

In [None]:
MODEL = keras.Model([img_inputs, positive_text_inputs, negative_text_inputs], [image_emb, positive_emb, negative_emb])

# MODEL.compile(optimizer=tf.keras.optimizers.AdamW(learning_rate=0.0001),loss=triplet_loss)

print(MODEL.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 positive_input_captions (I  [(None,)]                    0         []                            
 nputLayer)                                                                                       
                                                                                                  
 negative_input_captions (I  [(None,)]                    0         []                            
 nputLayer)                                                                                       
                                                                                                  
 lambda (Lambda)             {'input_word_ids': (None,    0         ['positive_input_captions[0][0
                             8),                                    ]',                       

In [None]:
epochs = 8
optimizer=tf.keras.optimizers.AdamW(learning_rate=0.0002)
dataset = tf.data.Dataset.from_tensor_slices((imgs, positives, negatives)).shuffle(buffer_size=256).batch(64)
for epoch in range(epochs):
    print(f"\nStart of epoch {epoch}")
    # start_time = time.time()

    for step, (imgs_batch, positives_batch, negatives_batch) in enumerate(dataset):
        with tf.GradientTape() as tape:
            output = MODEL([imgs_batch, positives_batch, negatives_batch], training=True)
            # print(f"output shape - {output.shape}")
            loss = triplet_loss(output)
        grads = tape.gradient(loss, MODEL.trainable_weights)
        optimizer.apply_gradients(zip(grads, MODEL.trainable_weights))

        print(f"Training loss (for 1 batch) at step {step}: {float(loss):.4f}")


Start of epoch 0
Training loss (for 1 batch) at step 0: 0.4998
Training loss (for 1 batch) at step 1: 0.4998
Training loss (for 1 batch) at step 2: 0.4996
Training loss (for 1 batch) at step 3: 0.4996
Training loss (for 1 batch) at step 4: 0.4995
Training loss (for 1 batch) at step 5: 0.4995
Training loss (for 1 batch) at step 6: 0.4998
Training loss (for 1 batch) at step 7: 0.4997
Training loss (for 1 batch) at step 8: 0.4996
Training loss (for 1 batch) at step 9: 0.4995
Training loss (for 1 batch) at step 10: 0.4998
Training loss (for 1 batch) at step 11: 0.4996
Training loss (for 1 batch) at step 12: 0.4992
Training loss (for 1 batch) at step 13: 0.4992
Training loss (for 1 batch) at step 14: 0.4989
Training loss (for 1 batch) at step 15: 0.4998
Training loss (for 1 batch) at step 16: 0.4985
Training loss (for 1 batch) at step 17: 0.4996
Training loss (for 1 batch) at step 18: 0.4996
Training loss (for 1 batch) at step 19: 0.4992
Training loss (for 1 batch) at step 20: 0.4991
Train

In [None]:
#MODEL.save("hindi-6184.keras")

# **Evaluation**

In [None]:
eval_images = set(column(test_captions,0))
print(len(eval_images))

eval_captions = test_captions

329


In [None]:
retrieval_set = []
valid_queries = []
valid_images = []

# given all the test image file names, set aside those which actually have a corresponding image
for i in eval_images:
  for img in images:
      if img[0] == i:
          valid_images.append(img[0]) # list of valid labels
          retrieval_set.append(img[1]) # list of images
print(len(retrieval_set))

# list of those captions which have a corresponding image in valid_images
for cap in eval_captions:
  if cap[0] in valid_images:
    valid_queries.append(cap)

retrieval_set = np.array(retrieval_set, dtype='float32')

154


In [None]:
print(len(valid_queries))

### **Image Embedding**

In [None]:
image_embeddings = []
for img in retrieval_set:
  #print(img)
  input = img.reshape(-1, 64, 64, 3)
  dummy_caps = np.array(['dummy'])
  output = MODEL([input, dummy_caps, dummy_caps], training=False)
  image_embeddings.append(output[0])

### **Text Embedding, Top K retrieved Images**

For all valid test queries

In [None]:
k = 50
total_queries = len(valid_queries)
correct_output = 0

for q in valid_queries:
  label = q[0]
  query = np.array([q[1]])
  distances = []

  # feed the query into the model with any image - we only care about the text embedding generated
  output = MODEL([imgs[0].reshape(-1, 64, 64, 3), query, query], training=False)
  query_embedding = output[1]

  for img_emb in image_embeddings:
    dist = np.linalg.norm(img_emb - query_embedding)
    distances.append(dist)

  sort_index = np.argsort(distances)
  top_k = sort_index[:k]

  retrieved_images = [valid_images[i] for i in top_k]
  correct = label in retrieved_images
  if correct:
    #print("correct")
    correct_output += 1

In [None]:
accuracy = correct_output/total_queries
print('This model was successfully able to retrieve a relevant image', accuracy*100, '% of the time')

This model was successfully able to retrieve a relevant image 61.33333333333333 % of the time


* 20 hidden, num_triplets_per_caption= 3 :

1 epoch - 33.02%, 3 - 61.84%
* 20 hidden, num_triplets_per_caption= 5 :

5 epochs - 57.45 %


* 12 hidden, num_triplets_per_caption= 3 :

3 epochs - 44.29%

* 12 hidden, num_triplets_per_caption= 3, LR = 0.0002 :

4 epochs - 66.66% 8 - 61.83%