In [None]:
from sentence_transformers import SentenceTransformer
# from sentence_transformers import util
import time
import pandas as pd
import torch
from torch import nn
import random
from  functions import get_IDF_weights


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"USING DEVICE: {device} / {torch.cuda.get_device_name() if torch.cuda.is_available() else None}")

USING DEVICE: cuda / NVIDIA GeForce RTX 5060


In [3]:
# Load dataset / model / hyperparameters
model = SentenceTransformer("all-MiniLM-L6-v2").to(device)

dataset = pd.read_csv("fra_cleaned.csv",sep = ";" , encoding= 'unicode_escape', on_bad_lines='skip')

IDF_weight = get_IDF_weights(dataset)
key = IDF_weight.keys()

layer_weights = torch.tensor([[0.8, 0.25, 0.1],
                            [0.25, 1., 0.35], 
                            [0.1, 0.35, 1.2]], device=device)
layer_weights = layer_weights / torch.sum(layer_weights)

iteration = 100 

In [4]:
class PreProcess:
    def __init__(self, model, device, dataset, IDF_weights = None):
        self.model = model
        self.dataset = dataset
        self.device = device
        self.note_embedding_cache = {}
        self.IDF_weight = IDF_weights if IDF_weights is not None else {note: 1. for note in key}

    def get_note_embedding(self, note):
        if note not in self.note_embedding_cache:
            self.note_embedding_cache[note] = self.model.encode(
                note, convert_to_tensor=True, device=self.device
            )
        return self.note_embedding_cache[note]
    
    def process_notes(self, note_dict, IDF_weights):
        processed_notes = {}
        for k , v in note_dict.items():
            temp = [n.strip() for n in v.split(",")] # Remove extra spaces
            weight = torch.tensor([IDF_weights[w] for w in temp]).unsqueeze(1) # Get IDF weights
            weight = weight / torch.sum(weight)  # Normalize weights
            weight = weight.to(self.device) # Move weight to device(GPU/CPU)
            embeddings = torch.stack([self.get_note_embedding(n) for n in temp])
            processed_notes[k] = torch.sum(embeddings * weight, dim=0) # Compute weighted average embeddings
        
        weighted_embeddings = torch.vstack(list(processed_notes.values()))
        return weighted_embeddings # shape = (3,384)

    def get_emb_dict(self):
        emb_DB = {}
        for idx in range(len(self.dataset)):
            name = self.dataset.at[idx, "Perfume"]
            notes = {
                "Top": self.dataset.at[idx, "Top"],
                "Middle": self.dataset.at[idx, "Middle"],
                "Base": self.dataset.at[idx, "Base"],
            }
            note_embeddings = self.process_notes(notes, self.IDF_weight)
            emb_DB[name] = note_embeddings
        return emb_DB
    
    def dict_to_tensor(self, emb_DB):
        names = list(emb_DB.keys())
        tensors = list(emb_DB.values())
        embeddings = torch.stack(tensors)
        embeddings = embeddings.to(self.device)
        return names, embeddings

preprocessor = PreProcess(model, device, dataset, IDF_weights = IDF_weight)
emb_DB = preprocessor.get_emb_dict()
names, DB_batch = preprocessor.dict_to_tensor(emb_DB)

In [5]:
class Perfume_Recommender(PreProcess):
    def __init__(self, dataset, model, device, layer_weights, DB_batch):
        self.dataset = dataset
        self.model = model
        self.device = device
        self.layer_weights = layer_weights
        self.note_embedding_cache = {}
        self.DB_batch = DB_batch

    def timer(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            print(f"Duration: {end_time - start_time:.4f} seconds")
            return result
        return wrapper
    
    '''
    def get_cosine_similarity(self, emb1, emb2, layer_weights):
        # emb.shape = (3, 384)
        cosine_scores = util.cos_sim(emb1, emb2)
        score = torch.sum(cosine_scores * layer_weights).item() / torch.sum(layer_weights).item()
        return score
    '''

    @timer
    def recommend_idx(self, query_idx = "random", n_recommendations=5):
        if query_idx == "random":
            query_idx = random.randint(0, len(self.dataset) - 1)

        target_name = self.dataset.at[query_idx, "Perfume"]
        target_notes = {"Top": self.dataset.at[query_idx, "Top"],
                  "Middle": self.dataset.at[query_idx, "Middle"],
                  "Base": self.dataset.at[query_idx, "Base"]}
        weighted_embeddings_target = self.process_notes(target_notes, IDF_weight)
        
        q_norm = nn.functional.normalize(weighted_embeddings_target, p=2, dim=1) # shape = (3,384)
        db_norm = nn.functional.normalize(self.DB_batch, p=2, dim=2) # shape = (n_of_DB,3,384)

        similarity_map = torch.einsum('id, bjd -> bij', q_norm, db_norm)  # shape = (n_of_DB, 3, 3)
        scores = (similarity_map * self.layer_weights).sum(dim=(1, 2))
        top_scores, top_indices = torch.topk(scores, k=n_recommendations+1)
        return target_notes, target_name, top_scores, top_indices # top_scores: tensor, top_indices: tensor
    
    def recommend(self, query_notes, n_recommendations=5):
        weighted_embeddings_target = self.process_notes(query_notes, IDF_weight)
        
        q_norm = nn.functional.normalize(weighted_embeddings_target, p=2, dim=1) # shape = (3,384)
        db_norm = nn.functional.normalize(self.DB_batch, p=2, dim=2) # shape = (n_of_DB,3,384)

        similarity_map = torch.einsum('id, bjd -> bij', q_norm, db_norm)  # shape = (n_of_DB, 3, 3)
        scores = (similarity_map * self.layer_weights).sum(dim=(1, 2))
        top_scores, top_indices = torch.topk(scores, k=n_recommendations+1)
        return query_notes, top_scores, top_indices # top_scores: tensor, top_indices: tensor

    
    def display_recommendations(self, target_notes, target_name, top_scores, top_indices, note_comparison=True):
        top_idx = top_indices.tolist()
        top_similars = [names[i]for i in top_idx]

        if target_name in top_similars:
            top_similars.remove(target_name)
        else:
            top_similars = top_similars[:-1]

        print("***" * 10, "results", "***" * 10)
        print(f">>Original Perfume: {target_name} @ {dataset.loc[dataset['Perfume'].str.strip() == target_name, 'Brand'].values[0] if target_name in dataset['Perfume'].values else 'Unknown Brand'}")
        print(f">>most similar perfume: {top_similars[0]} @ {dataset.loc[dataset['Perfume'].str.strip() == top_similars[0], 'Brand'].values[0]}")
        print("---" * 10)
        print(">>Top 5 similar perfumes:")
        for i, perfume in enumerate(top_similars):
            print(f"NO.{i+1} {perfume} / Similarity Score: {top_scores[i].item():.2f}")
        print()
        if note_comparison:
            print("---" * 10,"\n")
            print(f">>Most Similar Perfume except Original Perfume: {top_similars[0]}")
            print(f"Top Notes: {dataset.loc[dataset['Perfume'].str.strip() == top_similars[0], 'Top'].values[0]}")
            print(f"Middle Notes: {dataset.loc[dataset['Perfume'].str.strip() == top_similars[0], 'Middle'].values[0]}")
            print(f"Base Notes: {dataset.loc[dataset['Perfume'].str.strip() == top_similars[0], 'Base'].values[0]}")
            print("---" * 10)
            print(f">>Original Perfume: {target_name}")
            for k, v in target_notes.items():
                print(f"{k} Notes: {v}")
            print("---" * 10)



In [6]:
if __name__ == "__main__":

    query_notes = {
        "Top": "bergamot, lemon, orange",
        "Middle": "rose, jasmine, lily of the valley",
        "Base": "musk, amber, vanilla"
    }

    recommender = Perfume_Recommender(
            dataset = dataset,
            model = model,
            device = device,
            layer_weights = layer_weights,
            DB_batch = DB_batch)
    query_notes, top_scores, top_indices = recommender.recommend(query_notes=query_notes, n_recommendations=5)
    recommender.display_recommendations(query_notes, "Custom Query", top_scores, top_indices, note_comparison=True)
  



****************************** results ******************************
>>Original Perfume: Custom Query @ Unknown Brand
>>most similar perfume: velvet-imari @ avon
------------------------------
>>Top 5 similar perfumes:
NO.1 velvet-imari / Similarity Score: 0.79
NO.2 little-black-dress-party / Similarity Score: 0.77
NO.3 emilie-parfum / Similarity Score: 0.77
NO.4 vf-bloom / Similarity Score: 0.77
NO.5 noir-endurance / Similarity Score: 0.76

------------------------------ 

>>Most Similar Perfume except Original Perfume: velvet-imari
Top Notes: bergamot, mandarin orange
Middle Notes: rose, jasmine, lily-of-the-valley
Base Notes: amber, musk, vanilla
------------------------------
>>Original Perfume: Custom Query
Top Notes: bergamot, lemon, orange
Middle Notes: rose, jasmine, lily of the valley
Base Notes: musk, amber, vanilla
------------------------------
