In [None]:
import pandas as pd
import re
from transformers import CLIPTokenizerFast, CLIPProcessor, CLIPModel
import torch
from PIL import Image
import PIL
import numpy as np

## Load Model

In [None]:
# initializing the CLIP model
from huggingface_hub import snapshot_download

# if you have CUDA or MPS, set it to the active device like this
device = "cuda" if torch.cuda.is_available() else \
         ("mps" if torch.backends.mps.is_available() else "cpu")
model_id = "openai/clip-vit-base-patch32"

# we initialize a tokenizer, image processor, and the model itself
tokenizer = CLIPTokenizerFast.from_pretrained(model_id)
processor = CLIPProcessor.from_pretrained(model_id)
model = CLIPModel.from_pretrained(model_id).to(device)

# Get the cache directory for the model
local_model_path = snapshot_download(repo_id=model_id)

print(f"Model Local Path: {local_model_path}")

## Generate Embeddings

In [11]:
import os

def get_image_paths(directory):
    # Define a set of valid image extensions
    valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp'}
    # Get all files in the directory
    all_files = os.listdir(directory)
    file_path_dict = {}

    for file in all_files:
        if os.path.isfile(os.path.join(directory, file)) \
            and os.path.splitext(file)[1].lower() in valid_extensions:
            file_path = os.path.join(directory, file)
            file_path_dict[file] = file_path

    return file_path_dict


directory = "images"
image_paths_dict = get_image_paths(directory)


In [20]:
import json

# Write dictionary to the file as JSON
with open("paths_dict.json", 'w') as file:
    json.dump(image_paths_dict, file)

In [None]:
image_paths_dict

In [None]:
len(list(image_paths_dict.keys()))

## Utils

In [15]:

def encode_text(prompt):

  # create transformer-readable tokens
  inputs = tokenizer(prompt, return_tensors="pt")   # pt: it will return  pytorch sensors

  # use CLIP to encode tokens into a meaningful embedding
  text_emb = model.get_text_features(**inputs)

  return text_emb


def get_image_embd(path):
  image = Image.open(path)
  image = processor(
      text=None,
      images=image,
      return_tensors='pt'
  )['pixel_values'].to(device)
  img_emb = model.get_image_features(image)

  return img_emb

In [16]:
db_rows_list = []
for file in list(image_paths_dict.keys()):

    embeddings = get_image_embd(image_paths_dict[file]).detach().numpy()[0]

    db_row_dict = {
        'product_name': file,
        'image_embed': embeddings}

    db_rows_list.append(db_row_dict)


In [58]:
vector_storage = pd.DataFrame(db_rows_list)

# Convert numpy arrays to string representation
vector_storage['image_embed'] = vector_storage['image_embed'].apply(lambda x: np.array2string(x, separator=',')[1:-1])  # Remove brackets

# Save DataFrame to CSV
vector_storage.to_csv('vectors.csv', index=False)

## Utils for Search (Top K matching)

In [64]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def top_k_cosine_similarity(df, input_vector, k):
    """
    Calculate cosine similarity with all vectors in the DataFrame and return the top k matches.

    Parameters:
        df (pd.DataFrame): DataFrame containing product names and image embeddings.
                           Schema: ['product_name', 'image_embed']
        input_vector (list or np.array): Input vector for comparison.
        k (int): Number of top matches to return.

    Returns:
        pd.DataFrame: DataFrame containing top k matching products with their similarity scores.
    """
    # Convert input_vector to a 2D array (required for cosine_similarity)
    input_vector = np.array(input_vector).reshape(1, -1)
    
    # Convert all image embeddings to a 2D numpy array
    embeddings = np.vstack(df['image_embed'].values)
    
    # Compute cosine similarities
    similarities = cosine_similarity(input_vector, embeddings).flatten()
    
    # Add similarity scores to the DataFrame
    df['similarity'] = similarities
    
    # Get the top k matches based on similarity scores
    top_k_matches = df.nlargest(k, 'similarity')
    
    # Drop the similarity column to keep the DataFrame clean (optional)
    return top_k_matches[['product_name', 'similarity']]

In [63]:
df = pd.read_csv('vectors.csv')

# Convert string representation of vectors back to numpy arrays
df['image_embed'] = df['image_embed'].apply(lambda x: np.array(ast.literal_eval(f'[{x}]')))  # Wrap string in brackets


In [None]:
df.head(2)

In [None]:

text_querry =  encode_text(prompt="I am lookin for boots").detach().numpy()[0]
results = top_k_cosine_similarity(df=df, input_vector=text_querry, k=10)

results

In [None]:

from IPython.display import display


def display_results(file_names):
    for file_name in file_names:
        print(file_name)
        im = Image.open(image_paths_dict[file_name])
        display(im)
display_results(results['product_name'].values)