# Extracción de Embeddings y Clasificación con k-NN usando FAISS y CLIP

En este notebook, vamos a realizar la extracción de embeddings de imágenes utilizando el modelo CLIP y almacenarlos en un índice FAISS para hacer búsquedas rápidas de vecinos más cercanos. Finalmente, implementamos un clasificador k-NN para predecir la clase de una imagen utilizando los vecinos más cercanos.
## Contenido:
- Preprocesamiento y carga de imágenes
- Extracción de embeddings con CLIP
- Almacenamiento de embeddings en FAISS
- Clasificación k-NN basada en los embeddings
- Ejemplo de uso del clasificador k-NN

In [1]:
# importamos las librerías necesarias
import os
import torch
import clip
from PIL import Image
import faiss
import csv
import numpy as np
from tqdm import tqdm
from sklearn.manifold import TSNE
import plotly.express as px
import pandas as pd

### 1. Preprocesamiento de imágenes y modelo CLIP

En esta sección, cargamos y preprocesamos las imágenes utilizando PIL y realizamos la extracción de los embeddings de las imágenes usando el modelo CLIP preentrenado. El modelo CLIP está diseñado para aprender representaciones de imágenes y textos de manera conjunta.

Nos cambiamos a la carpeta anterior, porque luego para estar manipulando las imágenes es mucho más fácil meternos a nivel de la carpeta "data"

In [2]:
os.chdir("../")

In [7]:
# Carga el modelo clip https://github.com/openai/CLIP
device = 'cpu'
clip_model, preprocess = clip.load("ViT-B/32", device=device)

In [8]:
# Inicializa un indice de IndexFlatL2 de faiss sabiendo que el tamaño de los embeddings es de 512
# https://github.com/facebookresearch/faiss/wiki/getting-started#in-python-1
embedding_size = 512
faiss_index = index = faiss.IndexFlatL2(embedding_size)

# Folder containing the dataset
dataset_folder = "./data/"

# CSV to store mapping between embedding ID and class
csv_file = "./faiss/embeddings_mapping.csv"
csv_data = []

#### Construcción del índice FAISS

Iremos recorriendo las carpetas una por una, obteniendo los embeddings y guardandolos en faiss.
Como faiss no es una base de datos vectorial, hay que llevar un csv que mapea el identificador del embedding insertado en faiss con la clase a la que pertence

In [9]:
# Itera sobre las carpetas de las clases
embedding_id = 0
class_folders = [f for f in os.listdir(dataset_folder) if os.path.isdir(os.path.join(dataset_folder, f))]

# Use tqdm to show progress over class folders
for class_folder in tqdm(class_folders, desc="Processing classes"):
    class_path = os.path.join(dataset_folder, class_folder)

    img_files = os.listdir(class_path)

    # Use tqdm to show progress over images in each class folder
    for img_file in tqdm(img_files, desc=f"Processing images in {class_folder}", leave=False):
        img_path = os.path.join(class_path, img_file)

        try:
            # Carga la imagen con PIL
            img = Image.open(img_path)
            # Preprocesa la imagen con CLIP
            img_preprocessed = preprocess(img).unsqueeze(0).to(device)

            # Calcula el embedding de la imagen con CLIP
            with torch.no_grad():
                img_embedding = clip_model.encode_image(img_preprocessed)
            # Añade el embeddings a faiss
            faiss_index.add(img_embedding.cpu().numpy())

            # Add mapping of embedding ID to class in the CSV
            csv_data.append([embedding_id, class_folder])
            embedding_id += 1
        except Exception as e:
            print(f"Error processing {img_path}: {e}")


Processing classes: 100%|██████████| 5/5 [00:35<00:00,  7.12s/it]


In [10]:
# Guarda el indice en esta ruta ./faiss/image_embeddings.index"
faiss.write_index(faiss_index, "./faiss/image_embeddings.index")

# Write CSV file with mappings
with open(csv_file, mode='w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['EmbeddingID', 'Class'])  # Header
    writer.writerows(csv_data)

print(f"Finished processing. FAISS index saved as 'image_embeddings.index' and CSV saved as '{csv_file}'.")

Finished processing. FAISS index saved as 'image_embeddings.index' and CSV saved as './faiss/embeddings_mapping.csv'.


#### Exploración de resultados

Una vez los embeddings están obtenidos y guardados en FAISS, vamos a dibujarlos en un gráfico para ver qué pinta tendrían en 2 dimensiones
Para eso primero hay que reducir de 512 a 2 dimensiones, lo haremos con tsne.

In [14]:
# Recupera los embeddings de dentro del índice
all_embeddings = []
for embedding_id in range(faiss_index.ntotal):
    embedding = faiss_index.reconstruct(embedding_id)
    all_embeddings.append(embedding)

all_embeddings = np.array(all_embeddings)

# Reduce a dos dimensiones con TSNE: https://scikit-learn.org/0.16/modules/generated/sklearn.manifold.TSNE.html
tsne = TSNE(n_components=2)
embeddings_2d = tsne.fit_transform(all_embeddings)

In [15]:
# Mapeamos la clase al embedding
csv_file = "./faiss/embeddings_mapping.csv"
class_labels = {}
with open(csv_file, mode='r') as f:
    reader = csv.reader(f)
    next(reader)  # Skip header
    for row in reader:
        embedding_id, class_label = row
        class_labels[int(embedding_id)] = class_label

In [16]:
# Ploteamos los resultados
df = pd.DataFrame(embeddings_2d, columns=["x", "y"])
df['Class'] = [class_labels[i] for i in range(len(all_embeddings))]
fig = px.scatter(df, x="x", y="y", color="Class", title="Image Embeddings")
fig.show()

## Caso de uso, encotramos las fotos más similares

### Similitud Coseno

La **similitud coseno** es una medida de similitud entre dos vectores que mide el coseno del ángulo entre ellos. Se utiliza comúnmente en problemas de recuperación de información y procesamiento del lenguaje natural para medir cuán similares son dos vectores en un espacio de características, independientemente de su magnitud.

La fórmula de la similitud coseno entre dos vectores A y B es:

Similitud Coseno(A, B) = (A · B) / (||A|| * ||B||)


Donde:
- `A · B` es el producto punto entre los vectores A y B.
- `||A||` y `||B||` son las normas (longitudes) de los vectores A y B.

El valor de la similitud coseno varía entre -1 y 1, donde:
- **1** indica que los vectores son idénticos en dirección (máxima similitud),
- **0** indica que los vectores son ortogonales (no tienen similitud),
- **-1** indica que los vectores son opuestos en dirección (máxima disimilitud).


In [18]:
# cargamos el mapeo de todo
id_to_class = {}

with open(csv_file, mode='r') as f:
    reader = csv.reader(f)
    next(reader)  # Skip header
    for row in reader:
        embedding_id = int(row[0])
        class_name = row[1]
        id_to_class[embedding_id] = class_name

In [22]:
# Función para buscar imágenes similares
def search_similar_images(input_image_path, k=5):
    # Cargamos la imagen con PIL
    img = Image.open(input_image_path)

    # La preprocesamos con CLIP
    img_preprocessed = preprocess(img).unsqueeze(0).to(device)

    # Calculamos el embedding con clip
    with torch.no_grad():
        input_embedding = clip_model.encode_image(img_preprocessed)

    # utilizamos la busqueda por similitud de faiss
    distances, indices = faiss_index.search(input_embedding, k)

    # Retrieve the top-k most similar classes and distances
    results = []
    for i, index in enumerate(indices[0]):
        class_name = id_to_class[index]
        distance = distances[0][i]
        results.append((class_name, distance))

    return results

Utilizamos la función con una imagen de prueba

In [32]:
input_image_path = "test_images/00000075.jpg"  # Path to the input image
top_k_results = search_similar_images(input_image_path, k=5)

for i, (class_name, distance) in enumerate(top_k_results):
    print(f"Rank {i+1}: Class: {class_name}, Distance: {distance}")

Rank 1: Class: pikachu, Distance: 29.771406173706055
Rank 2: Class: pikachu, Distance: 30.32343292236328
Rank 3: Class: pikachu, Distance: 34.95173645019531
Rank 4: Class: pikachu, Distance: 36.06517028808594
Rank 5: Class: pikachu, Distance: 36.94024658203125


In [33]:
from collections import Counter

# Function to perform k-NN classification
def knn_classify(input_image_path, k=5):
    # Load and preprocess input image
    img = Image.open(input_image_path)
    img_preprocessed = preprocess(img).unsqueeze(0).to(device)

    # Calculate the embedding for the input image
    with torch.no_grad():
        input_embedding = clip_model.encode_image(img_preprocessed).cpu().numpy().astype(float)

    # Perform similarity search in FAISS index
    distances, indices = faiss_index.search(input_embedding, k)

    # Retrieve the classes of the k nearest neighbors
    nearest_classes = [id_to_class[idx] for idx in indices[0]]

    # Perform majority voting among the top-k classes
    class_counter = Counter(nearest_classes)
    predicted_class = class_counter.most_common(1)[0][0]  # Get the most common class

    return predicted_class, nearest_classes, distances[0]


In [34]:
# Example usage:
input_image_path = "test_images/00000005.png"  # Path to the input image
predicted_class, nearest_classes, distances = knn_classify(input_image_path, k=5)

# Display the results
print(f"Predicted Class: {predicted_class}")
print(f"Nearest Neighbors Classes: {nearest_classes}")
print(f"Distances: {distances}")

Predicted Class: pikachu
Nearest Neighbors Classes: ['pikachu', 'pikachu', 'pikachu', 'pikachu', 'pikachu']
Distances: [5.546635 7.808233 8.264306 8.264927 8.570351]
