# Búsqueda semántica inspirada en CLIP
Selecciona las imágenes que se 'aproximan semánticamente' a un texto de búsqueda

In [1]:
pip install -q -U tensorflow-hub tensorflow-text tensorflow-addons

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m612.1/612.1 kB[0m [31m29.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m524.1/524.1 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m84.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m82.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m440.8/440.8 kB[0m [31m35.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pydantic 2.2.1 requires typing-extensions>=4.6.1, but you have typing-extensions 4.5.0 which i

Bibliotecas que se utilizan en este programa

In [5]:
import os
import collections
import json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_hub as hub
import tensorflow_text as text
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from tqdm import tqdm

# Suppressing tf.hub warnings
tf.get_logger().setLevel("ERROR")


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



We will use the MS-COCO dataset to train our dual encoder model. MS-COCO contains over 82,000 images, each of which has at least 5 different caption annotations. The dataset is usually used for image captioning tasks, but we can repurpose the image-caption pairs to train our dual encoder model for image search.

Download and extract the data

First, let's download the dataset, which consists of two compressed folders: one with images, and the other—with associated image captions. Note that the compressed images folder is 13GB in size.

In [4]:
root_dir = "datasets"
annotations_dir = os.path.join(root_dir, "annotations")
images_dir = os.path.join(root_dir, "train2014")
tfrecords_dir = os.path.join(root_dir, "tfrecords")
annotation_file = os.path.join(annotations_dir, "captions_train2014.json")

# Download caption annotation files
if not os.path.exists(annotations_dir):
    annotation_zip = tf.keras.utils.get_file(
        "captions.zip",
        cache_dir=os.path.abspath("."),
        origin="http://images.cocodataset.org/annotations/annotations_trainval2014.zip",
        extract=True,
    )
    os.remove(annotation_zip)

# Download image files
if not os.path.exists(images_dir):
    image_zip = tf.keras.utils.get_file(
        "train2014.zip",
        cache_dir=os.path.abspath("."),
        origin="http://images.cocodataset.org/zips/train2014.zip",
        extract=True,
    )
    os.remove(image_zip)

print("Dataset is downloaded and extracted successfully.")

with open(annotation_file, "r") as f:
    annotations = json.load(f)["annotations"]

image_path_to_caption = collections.defaultdict(list)
for element in annotations:
    caption = f"{element['caption'].lower().rstrip('.')}"
    image_path = images_dir + "/COCO_train2014_" + "%012d.jpg" % (element["image_id"])
    image_path_to_caption[image_path].append(caption)

image_paths = list(image_path_to_caption.keys())
print(f"Number of images: {len(image_paths)}")

Dataset is downloaded and extracted successfully.
Number of images: 82783


<h3>Process and save the data to TFRecord files</h3><br>
You can change the sample_size parameter to control many image-caption pairs will be used for training the dual encoder model. In this example we set train_size to 30,000 images, which is about 35% of the dataset. We use 2 captions for each image, thus producing 60,000 image-caption pairs. The size of the training set affects the quality of the produced encoders, but more examples would lead to longer training time.
Esta porción de código nos importa menos para los objetivos de la asignatura. Se encuentra en el ámbito de 'Programación para Ciencia de Datos".



In [5]:
train_size = 30000
valid_size = 5000
captions_per_image = 2
images_per_file = 2000

train_image_paths = image_paths[:train_size]
num_train_files = int(np.ceil(train_size / images_per_file))
train_files_prefix = os.path.join(tfrecords_dir, "train")

valid_image_paths = image_paths[-valid_size:]
num_valid_files = int(np.ceil(valid_size / images_per_file))
valid_files_prefix = os.path.join(tfrecords_dir, "valid")

tf.io.gfile.makedirs(tfrecords_dir)


def bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def create_example(image_path, caption):
    feature = {
        "caption": bytes_feature(caption.encode()),
        "raw_image": bytes_feature(tf.io.read_file(image_path).numpy()),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


def write_tfrecords(file_name, image_paths):
    caption_list = []
    image_path_list = []
    for image_path in image_paths:
        captions = image_path_to_caption[image_path][:captions_per_image]
        caption_list.extend(captions)
        image_path_list.extend([image_path] * len(captions))

    with tf.io.TFRecordWriter(file_name) as writer:
        for example_idx in range(len(image_path_list)):
            example = create_example(
                image_path_list[example_idx], caption_list[example_idx]
            )
            writer.write(example.SerializeToString())
    return example_idx + 1


def write_data(image_paths, num_files, files_prefix):
    example_counter = 0
    for file_idx in tqdm(range(num_files)):
        file_name = files_prefix + "-%02d.tfrecord" % (file_idx)
        start_idx = images_per_file * file_idx
        end_idx = start_idx + images_per_file
        example_counter += write_tfrecords(file_name, image_paths[start_idx:end_idx])
    return example_counter


train_example_count = write_data(train_image_paths, num_train_files, train_files_prefix)
print(f"{train_example_count} training examples were written to tfrecord files.")

valid_example_count = write_data(valid_image_paths, num_valid_files, valid_files_prefix)
print(f"{valid_example_count} evaluation examples were written to tfrecord files.")

100%|██████████| 15/15 [03:42<00:00, 14.84s/it]


60000 training examples were written to tfrecord files.


100%|██████████| 3/3 [00:34<00:00, 11.45s/it]

10000 evaluation examples were written to tfrecord files.





Create *tf.data.Dataset* for training and evaluation

In [6]:
feature_description = {
    "caption": tf.io.FixedLenFeature([], tf.string),
    "raw_image": tf.io.FixedLenFeature([], tf.string),
}


def read_example(example):
    features = tf.io.parse_single_example(example, feature_description)
    raw_image = features.pop("raw_image")
    features["image"] = tf.image.resize(
        tf.image.decode_jpeg(raw_image, channels=3), size=(299, 299)
    )
    return features


def get_dataset(file_pattern, batch_size):

    return (
        tf.data.TFRecordDataset(tf.data.Dataset.list_files(file_pattern))
        .map(
            read_example,
            num_parallel_calls=tf.data.AUTOTUNE,
            deterministic=False,
        )
        .shuffle(batch_size * 10)
        .prefetch(buffer_size=tf.data.AUTOTUNE)
        .batch(batch_size)
    )

El modelo que vamos a crear es un *Dual Encoder*. El primer encoder recoge imágenes, mientras que el segundo encoder recoge los textos asociados a cada imagen. El resultado de ambos encoders va a ser comparado; para hacer esa comparación eficiente y adecuadamente, el formato de salida de ambos encoders debe coincidir, por ejemplo para realizar el producto escalar de ambos vectores. La función *project_embeddings* sirve para insertar unas capas como última etapa de cada uno de los dos decoders (imágenes y texto). Haciendo coincidir el tamaño de la última capa densa de ambos decoders, sus salidas serán comparables. <br>
La línea 4 de la función crea una capa densa de tamaño *projection_dims* que recoge las entradas que se pasan como argumento a la función en su parámetro 'embeddings'. Este argumento será la salida del visual encoder, en un caso, y del text encoder en el otro. La profundidad del modelo que crea la función se establece en el argumento que se le pasa al parámetro 'num_projection_layers'. (línea 5). A mayor complejidad de la salida del encoder, mayor profundidad en este modelo. Cada capa de proyección incorpora una función de activación GeLu, una capa densa (de tamaño *projection_dims*), un dropout para regularizar, y una conexión que facilite que no se pierda la información de la capa anterior ('x') y que disminuya el efecto indeseado de 'vanishing gradient'. Finalmente se realiza una normalización.  
<img src=CLIP.jpg width=80%></img>

In [1]:
def project_embeddings(
    embeddings, num_projection_layers, projection_dims, dropout_rate
):
    projected_embeddings = layers.Dense(units=projection_dims)(embeddings)
    for _ in range(num_projection_layers):
        x = tf.nn.gelu(projected_embeddings)
        x = layers.Dense(projection_dims)(x)
        x = layers.Dropout(dropout_rate)(x)
        x = layers.Add()([projected_embeddings, x])
        projected_embeddings = layers.LayerNormalization()(x)
    return projected_embeddings

La función *create_vision_encoder* crea el encoder de las imágenes. Se toma el model *xception* como base de codificación (líneas 5 a 7) y se habilita su entrenamiento (líneas 9 y 10). Esto permite un adecuado fine tunning, pero a costa de un tiempo de procesamiento grande. El modelo completo recoge imágenes RGB de 299 x 299 (línea 12), las transforma al formato del modelo xception (línea 14), las traspasa al modelo xception (línea 16) y, finalmente, injecta su salida al modelo que hemos creado en la celda anterior (líneas 18 a 20), para asegurarnos de que su salida coincidirá en shape con la salida del decoder de texto. Esta función devuelve el modelo creado (línea 22).

In [2]:
def create_vision_encoder(
    num_projection_layers, projection_dims, dropout_rate, trainable=False
):
    # Load the pre-trained Xception model to be used as the base encoder.
    xception = keras.applications.Xception(
        include_top=False, weights="imagenet", pooling="avg"
    )
    # Set the trainability of the base encoder.
    for layer in xception.layers:
        layer.trainable = trainable
    # Receive the images as inputs.
    inputs = layers.Input(shape=(299, 299, 3), name="image_input")
    # Preprocess the input image.
    xception_input = tf.keras.applications.xception.preprocess_input(inputs)
    # Generate the embeddings for the images using the xception model.
    embeddings = xception(xception_input)
    # Project the embeddings produced by the model.
    outputs = project_embeddings(
        embeddings, num_projection_layers, projection_dims, dropout_rate
    )
    # Create the vision encoder model.
    return keras.Model(inputs, outputs, name="vision_encoder")

La función *create_text_encoder* tiene la misma funcionalidad que *create_vision_encoder*, pero en este caso crea un embedding de los textos asociados a cada imagen, en lugar de un embedding de cada imagen. Mientras que en la función de visión usábamos el modelo xception, en el decoder de texto vamos a emplear BERT (líneas 5 a 13). Al igual que en la función anterior, aquí también habilitamos el entrenamiento de bert (línea 15). El resto de esta función es análogo (en texto) a la función anterior. Nótese que ambos encoders finalizan con el mismo modelo (creado con la función *project_embeddings*).

In [3]:
def create_text_encoder(
    num_projection_layers, projection_dims, dropout_rate, trainable=False
):
    # Load the BERT preprocessing module.
    preprocess = hub.KerasLayer(
        "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/2",
        name="text_preprocessing",
    )
    # Load the pre-trained BERT model to be used as the base encoder.
    bert = hub.KerasLayer(
    "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-4_H-512_A-8/1",
    name = "bert",
    )
    # Set the trainability of the base encoder.
    bert.trainable = trainable
    # Receive the text as inputs.
    inputs = layers.Input(shape=(), dtype=tf.string, name="text_input")
    # Preprocess the text.
    bert_inputs = preprocess(inputs)
    # Generate embeddings for the preprocessed text using the BERT model.
    embeddings = bert(bert_inputs)["pooled_output"]
    # Project the embeddings produced by the model.
    outputs = project_embeddings(
        embeddings, num_projection_layers, projection_dims, dropout_rate
    )
    # Create the text encoder model.
    return keras.Model(inputs, outputs, name="text_encoder")

El núcleo de nuestro programa se implementa en la clase *DualEncoder*, que crea un modelo (hereda de keras.model, en la línea 1). En su constructor (línea 2) lo más relevante es que se le pasa el encoder de imágenes y el encoder de textos. Cuando se crea una instancia de la clase se invoca al método *call* que asigna a una GPU diferente cada encoder (dado el procesamiento requerido, se asume la existencia de dos GPUs)<br>
La parte de entrenamiento de cada batch se procesa en la función *train_step*, que como es habitual invoca a *call* (vía *self*, en la línea 56). En nuestro caso, recibiendo los embeddings de texto y de imagen (*caption_embeddings, image_embeddings*); con estos embeddings se calcula la función de coste (línea 57) que se explicará después. De la manera estándard, a partir del loss se obtienen los gradientes (línea 59) y se actualizan los parámetros de la red neuronal (línea 60). La parte de testeo de cada batch se procesa en la función *test_step*, que realiza unas acciones similares a la función anterior, pero sin la parte de actualización de parámetros. <br><br>
En este punto solo queda por explicar la manera en la que se obtiene el valor de *loss* a partir de ambos embeddings (el de la imagen y el de sus textos asociados). La función *compute_loss* (línea 24) realiza esta tarea. Para entender esta función hay que tener en cuenta que estamos realizando un aprendizaje contrastivo, y que las imágenes y textos de cada batch se cruzan para crear las muestras. Nuestra predicción se obtiene haciendo el producto escalar del embedding de la imagen con el embedding de los textos (líneas 26 a 29). Cuando el embedding de la imagen coincida con el embedding de los textos, se espera un valor alto. Esta predicción debe ser acorde con las imágenes y textos de la muestra. Si las imágenes y los textos son similares y nuestra prediccion dice que son similares, el loss mejora (o si las imágenes y textos no son similares y nuestra predicción indica que no lo son). Las imágenes las comparamos entre sí en las líneas 31 a 33; los textos en las líneas 35 a 37. En las líneas 39 a 41 simplemente promediamos los resultados anteriores. Finalmente, en las líneas 43 a 45, comparamos los valores reales (targets) con los predichos (logits). La función *crossentropy* de la línea 43 se encarga de proporcionar un valor bajo cuando los valores reales y las predicciones coinciden (baja entropía). Nota: *from_logits=True* simplemente informa de que los datos no estan normalizados probabilísticamente (no se les ha aplicado previamente la función softmax).


In [6]:
class DualEncoder(keras.Model):
    def __init__(self, text_encoder, image_encoder, temperature=1.0, **kwargs):
        super().__init__(**kwargs)
        self.text_encoder = text_encoder
        self.image_encoder = image_encoder
        self.temperature = temperature
        self.loss_tracker = keras.metrics.Mean(name="loss")

    @property
    def metrics(self):
        return [self.loss_tracker]

    def call(self, features, training=False):
        # Place each encoder on a separate GPU (if available).
        # TF will fallback on available devices if there are fewer than 2 GPUs.
        with tf.device("/gpu:0"):
            # Get the embeddings for the captions.
            caption_embeddings = text_encoder(features["caption"], training=training)
        with tf.device("/gpu:1"):
            # Get the embeddings for the images.
            image_embeddings = vision_encoder(features["image"], training=training)
        return caption_embeddings, image_embeddings

    def compute_loss(self, caption_embeddings, image_embeddings):
        # logits[i][j] is the dot_similarity(caption_i, image_j).
        logits = (
            tf.matmul(caption_embeddings, image_embeddings, transpose_b=True)
            / self.temperature
        )
        # images_similarity[i][j] is the dot_similarity(image_i, image_j).
        images_similarity = tf.matmul(
            image_embeddings, image_embeddings, transpose_b=True
        )
        # captions_similarity[i][j] is the dot_similarity(caption_i, caption_j).
        captions_similarity = tf.matmul(
            caption_embeddings, caption_embeddings, transpose_b=True
        )
        # targets[i][j] = avarage dot_similarity(caption_i, caption_j) and dot_similarity(image_i, image_j).
        targets = keras.activations.softmax(
            (captions_similarity + images_similarity) / (2 * self.temperature)
        )
        # Compute the loss for the captions using crossentropy
        captions_loss = keras.losses.categorical_crossentropy(
            y_true=targets, y_pred=logits, from_logits=True
        )
        # Compute the loss for the images using crossentropy
        images_loss = keras.losses.categorical_crossentropy(
            y_true=tf.transpose(targets), y_pred=tf.transpose(logits), from_logits=True
        )
        # Return the mean of the loss over the batch.
        return (captions_loss + images_loss) / 2

    def train_step(self, features):
        with tf.GradientTape() as tape:
            # Forward pass
            caption_embeddings, image_embeddings = self(features, training=True)
            loss = self.compute_loss(caption_embeddings, image_embeddings)
        # Backward pass
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        # Monitor loss
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, features):
        caption_embeddings, image_embeddings = self(features, training=False)
        loss = self.compute_loss(caption_embeddings, image_embeddings)
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

En este texto creamos el encoder dual (el modelo completo) en la línea 10, e indicamos los parámetros de compilación del mismo (líneas 11 a 13). Al modelo *DualEncoder* (línea 10) hay que proporcionarle el encoder de vision y el de texto (constructor de la celda anterior (línea 2)). El *vision_encoder* se crea en las líneas 4 a 6, mientras que el *text_encoder* se creea en las líneas 7 a 9, empleando las funciones definidas anteriormente.

In [7]:
num_epochs = 5  # In practice, train for at least 30 epochs
batch_size = 256

vision_encoder = create_vision_encoder(
    num_projection_layers=1, projection_dims=256, dropout_rate=0.1
)
text_encoder = create_text_encoder(
    num_projection_layers=1, projection_dims=256, dropout_rate=0.1
)
dual_encoder = DualEncoder(text_encoder, vision_encoder, temperature=0.05)
dual_encoder.compile(
    optimizer=tfa.optimizers.AdamW(learning_rate=0.001, weight_decay=0.001)
)


Aquí se prepara todo para la ejecución del modelo (*fit*, en la línea 15). Se crean las variables *train_dataset* y *valid_dataset* con las muestras de entrenamiento y de testeo (líneas 5 y 6), y se establecen dos callabacks: *ReduceLROnPlateau* (líneas 8 a 10), e *EarlyStopping* (líneas 12 a 14).

In [8]:
print(f"Number of GPUs: {len(tf.config.list_physical_devices('GPU'))}")
print(f"Number of examples (caption-image pairs): {train_example_count}")
print(f"Batch size: {batch_size}")
print(f"Steps per epoch: {int(np.ceil(train_example_count / batch_size))}")
train_dataset = get_dataset(os.path.join(tfrecords_dir, "train-*.tfrecord"), batch_size)
valid_dataset = get_dataset(os.path.join(tfrecords_dir, "valid-*.tfrecord"), batch_size)
# Create a learning rate scheduler callback.
reduce_lr = keras.callbacks.ReduceLROnPlateau(
    monitor="val_loss", factor=0.2, patience=3
)
# Create an early stopping callback.
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)
history = dual_encoder.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=valid_dataset,
    callbacks=[reduce_lr, early_stopping],
)
print("Training completed. Saving vision and text encoders...")
vision_encoder.save("vision_encoder")
text_encoder.save("text_encoder")
print("Models are saved.")

Number of GPUs: 0


NameError: ignored

Sacamos una gráfica de la evolución de la función de coste en entrenamiento y en testeo.

In [None]:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.legend(["train", "valid"], loc="upper right")
plt.show()

Aquí generamos los embeddings de todas las imágenes (líneas 13 a 16), y los almacenamos en *image_embeddings*. Para obtener embeddings basta con hacer el feed forward (*predict*) en el modelo *vision_encoder* ya entrenado (línea 13).

In [None]:
print("Loading vision and text encoders...")
vision_encoder = keras.models.load_model("vision_encoder")
text_encoder = keras.models.load_model("text_encoder")
print("Models are loaded.")


def read_image(image_path):
    image_array = tf.image.decode_jpeg(tf.io.read_file(image_path), channels=3)
    return tf.image.resize(image_array, (299, 299))


print(f"Generating embeddings for {len(image_paths)} images...")
image_embeddings = vision_encoder.predict(
    tf.data.Dataset.from_tensor_slices(image_paths).map(read_image).batch(batch_size),
    verbose=1,
)
print(f"Image embeddings shape: {image_embeddings.shape}.")

La función *find_matches* encuentra las 'k' imágenes cuyos embeddings se aproximan más al embedding del texto de query. Para ello hay que obtener el embedding del texto haciendo uso de nuestro modelo *text_encoder* (línea 3). Tras una fase de normalización (líneas 6 y 7), se obtiene un vector que almacena los valores de similaridad entre el embedding del texto de query y cada uno de los embeddings de las imágenes disponibles (línea 9). En la línea 11 se extraen los índices de las 'k' imágenes cuyos embeddings se acercan más al del texto. Finalmente se devuelve una lista con todas esas imágenes (línea 13).

In [None]:
def find_matches(image_embeddings, queries, k=9, normalize=True):
    # Get the embedding for the query.
    query_embedding = text_encoder(tf.convert_to_tensor(queries))
    # Normalize the query and the image embeddings.
    if normalize:
        image_embeddings = tf.math.l2_normalize(image_embeddings, axis=1)
        query_embedding = tf.math.l2_normalize(query_embedding, axis=1)
    # Compute the dot product between the query and the image embeddings.
    dot_similarity = tf.matmul(query_embedding, image_embeddings, transpose_b=True)
    # Retrieve top k indices.
    results = tf.math.top_k(dot_similarity, k).indices.numpy()
    # Return matching image paths.
    return [[image_paths[idx] for idx in indices] for indices in results]

Finalmente, utilizamos la función anterior (línea 2) con una query cualquiera (línea 1)  y dibujamos el resultado.

In [None]:
query = "a family standing next to the ocean on a sandy beach with a surf board"
matches = find_matches(image_embeddings, [query], normalize=True)[0]

plt.figure(figsize=(20, 20))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(mpimg.imread(matches[i]))
    plt.axis("off")