In [None]:
from abc import ABC, abstractmethod
import pandas as pd
from typing import List, Dict, Any, Callable
from dataclasses import dataclass, field
from sentence_transformers import SentenceTransformer, util
from functools import lru_cache

class OpenFoodFactsProduct:
    def __init__(self, code: str, df: pd.DataFrame) -> None:
        self.code = code
        self.product = df.loc[df['code'] == code].copy()

    def get_main_category(self) -> str:
        if self.product.empty:
            raise ValueError(f"No product found with code {self.code}")
            
        if self.product["categories_en"].str.contains("beverages").any():
            return "beverage"
        return "solid"

  from tqdm.autonotebook import tqdm, trange


In [2]:
class NutritionalRatingSystem(ABC):
    def __init__(self, name="nutriscore", required_factors=["energy_100g", "energy_kcal_100g", "proteins_100g", "fiber_100g", "saturated-fat_100g", "sugars_100g", "fruits-vegetables-nuts-estimate-from-ingredients_100g ", "salt_100g"]):
        self.name = name,
        self.required_factors = required_factors
    
    @abstractmethod
    def calculate_score(self, product_code):
        pass
    
    @abstractmethod
    def rate(self, product_code):
        pass

In [3]:
class Nutriscore(NutritionalRatingSystem):
    def __init__(self):
        super().__init__()
    

     # Progi punktowe dla różnych kategorii
        self.ENERGY_THRESHOLDS = {
            "solid": [80, 160, 240, 320, 400, 480, 560, 640, 720, 800],
            "beverage": [7.2, 14.3, 21.5, 28.5, 35.9, 43.0, 50.2, 57.4, 64.5, float('inf')]
        }
        
        self.SUGAR_THRESHOLDS = {
            "solid": [4.5, 9, 13.5, 18, 22.5, 27, 31, 36, 40, 45],
            "beverage": [0, 1.5, 3.0, 4.5, 6.0, 7.5, 9.0, 10.5, 12.0, 13.5]
        }
        
        self.SATURATED_FAT_THRESHOLDS = {
            "cooking_fats": [10, 16, 22, 28, 34, 40, 46, 52, 58, 64],
            "default": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        }
        
        self.SODIUM_THRESHOLDS = [90, 180, 270, 360, 450, 540, 630, 720, 810, 900]
        self.FIBER_THRESHOLDS = [0.7, 1.4, 2.1, 2.8, 3.5]
        self.PROTEIN_THRESHOLDS = [1.6, 3.2, 4.8, 6.4, 8.0]

    def _score_based_on_thresholds(self, value, thresholds, max_score):
        """Pomocnicza metoda do obliczania punktów na podstawie progów"""
        for i, threshold in enumerate(thresholds):
            if value <= threshold:
                return i
        return max_score

    def calculate_score(self, product: OpenFoodFactsProduct):
        if product.empty:
            raise ValueError("Product not found")
        
        product_category = product.get_main_category()
        
        # Punkty negatywne
        negative_points = sum([
            self.score_energy(product["energy_100g"].values[0], product_category),
            self.score_sugars(product["sugars_100g"].values[0], product_category),
            self.score_saturated_fat(product["saturated-fat_100g"].values[0], product_category),
            self.score_sodium(product["salt_100g"].values[0], product_category)
        ])
        
        # Punkty pozytywne
        positive_points = sum([
            self.score_fiber(product["fiber_100g"].values[0]),
            self.score_protein(product["proteins_100g"].values[0]),
            self.score_fruits_vegetables_nuts(
                product["fruits-vegetables-nuts-estimate-from-ingredients_100g"].values[0],
                product_category
            )
        ])
        
        return negative_points - positive_points

    def score_energy(self, energy, product_category):
        thresholds = self.ENERGY_THRESHOLDS[product_category]
        return self._score_based_on_thresholds(energy, thresholds, 10)

    def score_sugars(self, sugars, product_category):
        thresholds = self.SUGAR_THRESHOLDS[product_category]
        return self._score_based_on_thresholds(sugars, thresholds, 10)

    def score_saturated_fat(self, saturated_fat, product_category):
        category = "cooking_fats" if product_category == "cooking_fats" else "default"
        thresholds = self.SATURATED_FAT_THRESHOLDS[category]
        return self._score_based_on_thresholds(saturated_fat, thresholds, 10)

    def score_sodium(self, sodium, product_category=None):
        return self._score_based_on_thresholds(sodium, self.SODIUM_THRESHOLDS, 10)

    def score_fiber(self, fiber):
        return self._score_based_on_thresholds(fiber, self.FIBER_THRESHOLDS, 5)

    def score_protein(self, protein):
        return self._score_based_on_thresholds(protein, self.PROTEIN_THRESHOLDS, 5)

    def score_fruits_vegetables_nuts(self, fruits_vegetables_nuts, product_category):
        if product_category == "beverage":
            if fruits_vegetables_nuts <= 40:
                return 0
            elif fruits_vegetables_nuts <= 60:
                return 2
            elif fruits_vegetables_nuts <= 80:
                return 4
            return 10
        else:
            if fruits_vegetables_nuts <= 40:
                return 0
            elif fruits_vegetables_nuts <= 60:
                return 1
            elif fruits_vegetables_nuts <= 80:
                return 2
            return 5

    def rate(self, product: OpenFoodFactsProduct):
        score = self.calculate_score(product)
        
        RATING_RANGES = {
            "solid": {
                (-float('inf'), -1): "A",
                (-1, 2): "B",
                (2, 10): "C",
                (10, 18): "D",
                (18, float('inf')): "E"
            },
            "beverage": {
                (-float('inf'), 0): "A",
                (0, 1): "B",
                (1, 5): "C",
                (5, 9): "D",
                (9, float('inf')): "E"
            }
        }
        
        ranges = RATING_RANGES[product.get_main_category()]
        for (min_score, max_score), grade in ranges.items():
            if min_score < score <= max_score:
                return grade

In [4]:
class RecommendationFactor:
    def __init__(self, name: str, weight: int, threshold: int, content: List[str], avoid = False) -> None:
        self.name = name
        self.weight = weight
        self.content = content
        self.avoid = avoid
        self.threshold = threshold

# self.nutritional_rating_systems = nutritional_rating_systems
# self.labels_to_avoid = labels_to_avoid
# self.allergens_to_avoid = allergens_to_avoid
# self.additives_to_avoid = additives_to_avoid
# self.traces_to_avoid = traces_to_avoid
# self.ingriedients_to_avoid = ingriedients_to_avoid
# self.ingriedients_preferred = 

In [5]:
@dataclass
class RecommendationStrategy:
    recommendation_factors : List[RecommendationFactor] = field(default_factory=list)
    nutritional_rating_systems : List[NutritionalRatingSystem] = field(default_factory=list)

In [6]:
class CategoriesComparator(ABC):
    @abstractmethod
    def compare(self, product_categories: str, user_categories: str) -> float:
        pass

In [7]:
class SentenceTransformerComparator(CategoriesComparator):
    def __init__(self, model: SentenceTransformer = SentenceTransformer('all-MiniLM-L6-v2')) -> None:
        self.model = model

    def compare(self, product_categories: str, user_categories: str) -> float:
        product_categories_embedding = self.model.encode(product_categories, convert_to_tensor=True)
        user_categories_embedding = self.model.encode(user_categories, convert_to_tensor=True)

        similarity = util.pytorch_cos_sim(product_categories_embedding, user_categories_embedding)

        return similarity.item()

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [None]:
class RecommendationSystem:
    def __init__(self, recommendation_strategy: RecommendationStrategy, categories_comparator: CategoriesComparator = SentenceTransformerComparator()) -> None:
        self.recommendation_strategy = recommendation_strategy
        self.categories_comparator = categories_comparator

    def recommend(self, _df: pd.DataFrame, product: OpenFoodFactsProduct, n=1) -> List[int]:

        _df = _df.copy()

        product_categories = product["categories_en"].values

        filtered_df = (filtered_df
                    .pipe(self.__filter_categories, product_categories)
                    .pipe(self.__exclude_factors))

        recommendations = []
            
        return recommendations



    @lru_cache(maxsize=1000)
    def __compare_categories(self, product_categories, target_categories):
        return self.categories_comparator.compare(product_categories, target_categories)

    def __filter_categories(self, df: pd.DataFrame, product_categories, threshold: float = 0.9) -> pd.DataFrame:
        df['similarity'] = df['categories_en'].apply(
            lambda x: self.__compare_categories(x, product_categories)
        )
        return df[df['similarity'] >= threshold].reset_index(drop=True)
        

    def __exclude_factors(self,df: pd.DataFrame) -> pd.DataFrame:
        
        for factor in self.recommendation_strategy.recommendation_factors:
            if factor.avoid:
                mask = ~df[factor.name].str.contains('|'.join(factor.content), case=False, na=False)
                df = df[mask]
        
        return df
