<a href="https://colab.research.google.com/github/jsvillalbat/image-to-image-similarity/blob/main/image_retrieval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Proyecto de Busqueda de productos por Fotos TUL
En este cuaderno se realiza una prueba de concepto para un buscador de productos por medio de imagenes. La idea es que el usuario tome una imagen de un producto y el usuario recibe imagenes de productos similares que se encuentran en el catalogo de TUL. Este proyecto es basado en este repositorio público de Github [Repositorio Artificio](https://github.com/ankonzoid/artificio/tree/master)

Instalar las dependencias y librerias necesarias para el proyecto

In [None]:
import os
import numpy as np
import tensorflow as tf
import skimage.io
from multiprocessing import Pool
from sklearn.neighbors import NearestNeighbors
from skimage.transform import resize

## Cargar imagenes de entrenamiento y test almacenadas de Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
absolute_path = "/content/drive/MyDrive/Colab Notebooks/Image Retrieval"
train_dataset_path = absolute_path + "/train_1"
test_dataset_path = absolute_path + "/test_1"
output_folder_path = absolute_path + "/output_1"
# Crear la carpeta donde estarán los resultados si no existe, en Google Drive
if not os.path.exists(output_folder_path):
    os.makedirs(output_folder_path)

Funciones para el cargue de imagenes en memoria

In [None]:
# Leer una imagen
def read_img(filePath):
    return skimage.io.imread(filePath, as_gray=False)

# Leer imagenes con extensiones iguales desde un directorio especifico
def read_imgs_dir(dirPath, extensions, parallel=True):
    args = [os.path.join(dirPath, filename)
            for filename in os.listdir(dirPath)
            if any(filename.lower().endswith(ext) for ext in extensions)]
    if parallel:
        pool = Pool()
        imgs = pool.map(read_img, args)
        pool.close()
        pool.join()
    else:
        imgs = [read_img(arg) for arg in args]
    return imgs

# Guardar imagen en un archivo
def save_img(filePath, img):
    skimage.io.imsave(filePath, img)

In [None]:
# Read images
extensions = [".jpg", ".jpeg"]
print("Leyendo imagenes de entrenamiento en: '{}'...".format(train_dataset_path))
imgs_train = read_imgs_dir(train_dataset_path, extensions)
print("Leyendo imagenes de test en: '{}'...".format(test_dataset_path))
imgs_test = read_imgs_dir(test_dataset_path, extensions)
shape_img = imgs_train[0].shape
print("Tamaño de la imagen = {}".format(shape_img))

Leyendo imagenes de entrenamiento en: '/content/drive/MyDrive/Colab Notebooks/Image Retrieval/train_1'...
Leyendo imagenes de test en: '/content/drive/MyDrive/Colab Notebooks/Image Retrieval/test_1'...
Tamaño de la imagen = (91, 162, 3)


## Preprocesamiento de imagenes

Funciones para el procesamiento de imagenes, todas las imagenes deben tener el mismo tamaño y tiene que ser normalizadas para un mejor performance del modelo

In [None]:
# Clase para realizar la transformacion de las imagenes y ponerlas en el mismo formato
class ImageTransformer(object):

    def __init__(self, shape_resize):
        self.shape_resize = shape_resize

    def __call__(self, img):
        img_transformed = self.resize_img(img, self.shape_resize)
        img_transformed = self.normalize_img(img_transformed)
        return img_transformed

    # Normalize image data [0, 255] -> [0.0, 1.0]
    def normalize_img(self, img):
        return img / 255.

    # Resize de la imagen
    def resize_img(self, img, shape_resized):
        img_resized = resize(img, shape_resized,
                            anti_aliasing=True,
                            preserve_range=True)
        assert img_resized.shape == shape_resized
        return img_resized

    # Aplanar imagen
    def flatten_img(self, img):
        return img.flatten("C")

In [None]:
# Apicar las transformaciones a multiples imagenes
def apply_transformer(imgs, transformer, parallel=True):
    if parallel:
        pool = Pool()
        imgs_transform = pool.map(transformer, [img for img in imgs])
        pool.close()
        pool.join()
    else:
        imgs_transform = [transformer(img) for img in imgs]
    return imgs_transform

## Carga de modelo pre-entrenado

En este notebook se realiza el proceso de recuperación de imagenes (image retrieval) usando aprendizaje por transferencia sobre un clasificador de imagenes pre-entrenado llamado VGG19 [Documentación Modelo VGG19](https://www.tensorflow.org/api_docs/python/tf/keras/applications/vgg19/VGG19)

In [None]:
# Run mode: (autoencoder -> simpleAE, convAE) or (transfer learning -> vgg19)
modelName = "vgg19"  # try: "simpleAE", "convAE", "vgg19"
trainModel = True
parallel = True  # use multicore processing


if modelName in ["vgg19"]:

    # Load pre-trained VGG19 model + higher level layers
    print("Loading VGG19 pre-trained model...")
    model = tf.keras.applications.VGG19(weights='imagenet', include_top=False,
                                        input_shape=shape_img)
    model.summary()

    shape_img_resize = tuple([int(x) for x in model.input.shape[1:]])
    input_shape_model = tuple([int(x) for x in model.input.shape[1:]])
    output_shape_model = tuple([int(x) for x in model.output.shape[1:]])
    n_epochs = None

else:
    raise Exception("Invalid modelName!")

# Print some model info
print("input_shape_model = {}".format(input_shape_model))
print("output_shape_model = {}".format(output_shape_model))

Se aplican las transformaciones necesarias a las imagenes de entrenamiento y test

In [None]:
transformer = ImageTransformer(shape_img_resize)
print("Aplicar transformacion a las imagenes de entrenamiento ...")
imgs_train_transformed = apply_transformer(imgs_train, transformer, parallel=True)
print("Aplicar transformación a las imagenes de test ...")
imgs_test_transformed = apply_transformer(imgs_test, transformer, parallel=parallel)

Aplicar transformacion a las imagenes de entrenamiento ...
Aplicar transformación a las imagenes de test ...


In [None]:
# Convert images to numpy array
X_train = np.array(imgs_train_transformed).reshape((-1,) + input_shape_model)
X_test = np.array(imgs_test_transformed).reshape((-1,) + input_shape_model)
print(" -> X_train.shape = {}".format(X_train.shape))
print(" -> X_test.shape = {}".format(X_test.shape))

 -> X_train.shape = (25, 91, 162, 3)
 -> X_test.shape = (5, 91, 162, 3)


In [None]:
model.compile(loss="binary_crossentropy", optimizer="adam")

In [None]:
# Crear los embedings de las imagenes de entrenamiento
print("Inferencia de los embeddings de las imagenes usando el modelo pre-entrenado ...")
E_train = model.predict(X_train)
E_train_flatten = E_train.reshape((-1, np.prod(output_shape_model)))
E_test = model.predict(X_test)
E_test_flatten = E_test.reshape((-1, np.prod(output_shape_model)))
print(" -> E_train.shape = {}".format(E_train.shape))
print(" -> E_test.shape = {}".format(E_test.shape))
print(" -> E_train_flatten.shape = {}".format(E_train_flatten.shape))
print(" -> E_test_flatten.shape = {}".format(E_test_flatten.shape))

Inferencia de los embeddings de las imagenes usando el modelo pre-entrenado ...
 -> E_train.shape = (25, 2, 5, 512)
 -> E_test.shape = (5, 2, 5, 512)
 -> E_train_flatten.shape = (25, 5120)
 -> E_test_flatten.shape = (5, 5120)


## Funciones de utilidad para graficas de resultados

Funciones de utilidad para ver los resultados visualmente, con una imagen del ranking de similitud y una imagen para ver los clusters de imagenes y como se agrupan.

In [None]:
import matplotlib.pyplot as plt
from matplotlib import offsetbox
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from sklearn import manifold


# Graficar una imagen
def plot_img(img, range=[0, 255]):
    plt.imshow(img, vmin=range[0], vmax=range[1])
    plt.xlabel("xpixels")
    plt.ylabel("ypixels")
    plt.tight_layout()
    plt.show()
    plt.close()

# Graficar imagenes en 2 filas: Arriba es la del usuario, y abajo es la respuesta del modelo
def plot_query_retrieval(img_query, imgs_retrieval, outFile):
    n_retrieval = len(imgs_retrieval)
    fig = plt.figure(figsize=(2*n_retrieval, 4))
    fig.suptitle("Image Retrieval (k={})".format(n_retrieval), fontsize=25)

    # Plot query image
    ax = plt.subplot(2, n_retrieval, 0 + 1)
    plt.imshow(img_query)
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    for axis in ['top', 'bottom', 'left', 'right']:
        ax.spines[axis].set_linewidth(4)  # increase border thickness
        ax.spines[axis].set_color('black')  # set to black
    ax.set_title("query",  fontsize=14)  # set subplot title

    # Plot retrieval images
    for i, img in enumerate(imgs_retrieval):
        ax = plt.subplot(2, n_retrieval, n_retrieval + i + 1)
        plt.imshow(img)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        for axis in ['top', 'bottom', 'left', 'right']:
            ax.spines[axis].set_linewidth(1)  # set border thickness
            ax.spines[axis].set_color('black')  # set to black
        ax.set_title("Rank #%d" % (i+1), fontsize=14)  # set subplot title

    if outFile is None:
        plt.show()
    else:
        plt.savefig(outFile, bbox_inches='tight')
    plt.close()

# Grafica de distribución de los vecinos t-SNE
def plot_tsne(X, imgs, outFile):

    def imscatter(x, y, images, ax=None, zoom=1.0):
        if ax is None:
            ax = plt.gca()
        x, y = np.atleast_1d(x, y)
        artists = []
        for x0, y0, img0 in zip(x, y, images):
            im = OffsetImage(img0, zoom=zoom)
            ab = AnnotationBbox(im, (x0, y0), xycoords='data', frameon=True)
            artists.append(ax.add_artist(ab))
        ax.update_datalim(np.column_stack([x, y]))
        ax.autoscale()
        return artists

    def plot_embedding(X, imgs, title=None):
        x_min, x_max = np.min(X, 0), np.max(X, 0)
        X = (X - x_min) / (x_max - x_min)

        plt.figure()
        ax = plt.subplot(111)
        for i in range(X.shape[0]):
            plt.text(X[i, 0], X[i, 1], ".", fontdict={'weight': 'bold', 'size': 9})
        if hasattr(offsetbox, 'AnnotationBbox'):
            imscatter(X[:,0], X[:,1], imgs, zoom=0.3, ax=ax)

        plt.xticks([]), plt.yticks([])
        if title is not None:
            plt.title(title, fontsize=18)

    tsne = manifold.TSNE(n_components=2, init='pca', random_state=0)
    X_tsne = tsne.fit_transform(X)
    plot_embedding(X_tsne, imgs, "t-SNE embeddings")
    if outFile is None:
        plt.show()
    else:
        plt.savefig(outFile, bbox_inches='tight')
    plt.close()

## Evaluacion con las imagenes de test y traer los 5 vecinos más cercanos

Se utiliza el algoritmo de vecino más cercano (Nearest Neighbors) para agrupar las imagenes más similares

In [None]:
# Entrenar modelo kNN sobre las imagenes de entrenamiento
print("Se hace el fit de un modelo de vecino más cercanos usando las imagenes de entrenamiento ...")
knn = NearestNeighbors(n_neighbors=5, metric="cosine")
knn.fit(E_train_flatten)


# Se realiza la recuperación de las imagenes de test (imagenes tomadas por el usuario)
print("Realizando la recuperación de las imagenes de test ...")
for i, emb_flatten in enumerate(E_test_flatten):
    _, indices = knn.kneighbors([emb_flatten]) # Encontrar los k vecinos mas cercanos en el entrenamiento
    img_query = imgs_test[i] # Imagen de test i
    imgs_retrieval = [imgs_train[idx] for idx in indices.flatten()] # Imagenes más similares

    # Guardar rankeo de imagenes en la carpeta /output
    outFile = os.path.join(output_folder_path, "{}_retrieval_{}.png".format(modelName, i))
    plot_query_retrieval(img_query, imgs_retrieval, outFile)

Se hace el fit de un modelo de vecino más cercanos usando las imagenes de entrenamiento ...
Realizando la recuperación de las imagenes de test ...


In [None]:
# Graficar de la visualización t-SNE (Grafica de vecinos entre los embeddings)
print("Visualizacion t-SNE sobre las imagenes de entrenamiento ...")
outFile = os.path.join(output_folder_path, "{}_tsne.png".format(modelName))
plot_tsne(E_train_flatten, imgs_train, outFile)