In [None]:
import pandas as pd
import string
import random
import numpy as np

Initial Data Loading and Cleaning

In [None]:
# Define the columns and their data types
dtype = {
    'code': 'str',
    'product_name': 'str',
    'brands': 'str',
    'categories_en': 'str',
    'labels_en': 'str',
    'serving_size': 'str',
    'fat_100g': 'str',  # Initially set to str
    'carbohydrates_100g': 'str',  # Initially set to str
    'proteins_100g': 'str',  # Initially set to str
    'ingredients_text': 'str',
    'additives_en': 'str',
    'labels': 'str',
    'traces_en': 'str',
    'nutrition-score-fr_100g': 'str',  # Initially set to str
    'nutriscore_score': 'str',  # Initially set to str
    'completeness': 'str'  # Initially set to str
}

# Columns to be read
usecols = [
    'code',
    'product_name',
    'brands',
    'categories_en',
    'labels_en',
    'serving_size',
    'fat_100g',
    'carbohydrates_100g',
    'proteins_100g',
    'ingredients_text',
    'additives_en',
    'labels',
    'traces_en',
    'nutrition-score-fr_100g',
    'nutriscore_score',
    'completeness',
    'countries_en'  # Needed for filtering later
]

# Read the first 100 rows with Pandas
pandas_df = pd.read_csv('dataset.csv', delimiter='\t', dtype=dtype, usecols=usecols, on_bad_lines='skip')

# Filter the DataFrame for entries in the United States
us_food_df = pandas_df[pandas_df['countries_en'].str.contains('United States', na=False)]

# Select the relevant columns
selected_columns = [
    'code',
    'product_name',
    'brands',
    'categories_en',
    'labels_en',
    'serving_size',
    'fat_100g',
    'carbohydrates_100g',
    'proteins_100g',
    'ingredients_text',
    'additives_en',
    'labels',
    'traces_en',
    'nutrition-score-fr_100g',
    'nutriscore_score',
    'completeness'
]

us_food_df = us_food_df[selected_columns]

# Output the filtered DataFrame to a CSV file
us_food_df.to_csv('us_food_data.csv', index=False)
    
    
for col in ['fat_100g', 'carbohydrates_100g', 'proteins_100g', 'nutrition-score-fr_100g', 'nutriscore_score', 'completeness']:
    us_food_df[col] = pd.to_numeric(us_food_df[col], errors='coerce')


Cleaning the filtered CSV file 

In [None]:
food_df = pd.read_csv('us_food_data.csv', low_memory=False)

In [None]:
"""
cleaning up dataset 

1. selecting columns that are relevant to macro calculator 
2. removing any serving size from dataset 
3. removing any macros where all three are less than 0

"""

food_df_cleaned = food_df.copy()
food_df_cleaned = food_df_cleaned[food_df['serving_size'].notna()]

food_df_cleaned = food_df_cleaned[
    (food_df_cleaned['carbohydrates_100g'] + food_df_cleaned['proteins_100g'] + food_df_cleaned['fat_100g']) > 0
].reset_index(drop=True)

food_df_cleaned = food_df_cleaned[
    (food_df_cleaned['nutrition-score-fr_100g'] >= -15) & (food_df_cleaned['nutrition-score-fr_100g'] <= 18)
]


In [None]:
def categorize_diet(fats, carbohydrates, protein): 
    """
    Categorizes a food item into different diet types based on its macronutrient composition.

    This function calculates the percentage of fats, carbohydrates, and proteins in the total macronutrient content of a food item. 
    Based on these percentages, it categorizes the food item into one or more diet types: 'Balanced', 'Keto', 'High Protein', and 'Low Fat'.

    Args:
        fats (float): The amount of fats in 100 grams of the food item.
        carbohydrates (float): The amount of carbohydrates in 100 grams of the food item.
        protein (float): The amount of proteins in 100 grams of the food item.

    Returns:
        list: A list of diet types that the food item fits into. If no diet type is matched, it returns ['None'].

    Diet Type Criteria:
        - 'Balanced': 30% to 70% carbohydrates, 10% to 35% proteins, and 20% to 40% fats.
        - 'Keto': At least 60% fats and more protein than carbohydrates.
        - 'High Protein': At least 30% proteins.
        - 'Low Fat': Less than 20% fats.
    """
    total_macros = fats + carbohydrates + protein

    fats_ratio = (fats / total_macros) * 100 if total_macros > 0 else 0
    carbs_ratio = (carbohydrates / total_macros) * 100 if total_macros > 0 else 0
    protein_ratio = (protein / total_macros) * 100 if total_macros > 0 else 0

    diets = []

    if total_macros > 5 and carbs_ratio <= 70:  # Adjust this threshold as needed
        if 30 <= carbs_ratio <= 70 and 10 <= protein_ratio <= 35 and 20 <= fats_ratio <= 40:
            diets.append('Balanced')
        
        if fats_ratio >= 60 and protein >= carbohydrates:
            diets.append('Keto')
        
        if protein_ratio >= 30:
            diets.append('High Protein')
        
        if fats_ratio < 20:
            diets.append('Low Fat')


    return diets if diets else ['None']

food_df_cleaned['diet_types'] = food_df_cleaned.apply(lambda row: categorize_diet(row['fat_100g'], row['carbohydrates_100g'], row['proteins_100g']), axis=1)

food_df_cleaned_diet = food_df_cleaned[food_df_cleaned['diet_types'].apply(lambda x: 'None' not in x)]

In [None]:
import unidecode
import re

def clean_text(text):
    """ 
    Removes any accent marks within text to get only English words.

    Args:
        text (string): The string containing the words to be checked and cleaned.

    Returns:
        string: The cleaned text with only English characters and spaces.
    """
    text = unidecode.unidecode(str(text))

    text = re.sub(r'[^a-zA-Z\s]', '', text)
    return text

food_df_cleaned_diet_copy = food_df_cleaned_diet.copy()

for column in ['product_name', 'brands', 'categories_en', 'ingredients_text']:
    food_df_cleaned_diet_copy.loc[:, column] = food_df_cleaned_diet_copy[column].apply(clean_text)

food_df_cleaned_diet = food_df_cleaned_diet_copy

In [None]:
allergen_dict = {
    'nuts': ['almond', 'walnut', 'hazelnut', 'cashew', 'pistachio', 'brazil nut', 'pecan', 'macadamia', 'nut'],
    'peanuts': ['peanut'],
    'dairy': ['milk', 'cheese', 'butter', 'yogurt', 'cream', 'lactose', 'whey', 'casein'],
    'eggs': ['egg'],
    'soy': ['soy', 'soybean', 'tofu', 'tempeh', 'edamame'],
    'wheat': ['wheat', 'gluten', 'barley', 'rye', 'bread', 'pasta', 'flour'],
    'fish': ['fish', 'salmon', 'tuna', 'cod', 'haddock'],
    'shellfish': ['shrimp', 'crab', 'lobster', 'clam', 'mussel', 'oyster', 'scallop'],
    'sesame': ['sesame', 'tahini'],
}

diet_keywords = {
    'vegan': ['plant-based', 'vegan', 'dairy-free', 'egg-free'],
    'vegetarian': ['vegetarian', 'egg', 'milk', 'cheese', 'yogurt', 'plant-based'],
}


columns = ['product_name', 'categories_en', 'labels_en', 'ingredients_text', 'diet_types']

diet_perf_df = food_df_cleaned_diet[columns]

combined_allergens = [
    'almond', 'walnut', 'hazelnut', 'cashew', 'pistachio', 'brazil nut', 'pecan', 'macadamia', 'nut',
    'peanut', 'milk', 'cheese', 'butter', 'yogurt', 'cream', 'lactose', 'whey', 'casein', 'egg',
    'soy', 'soybean', 'tofu', 'tempeh', 'edamame', 'wheat', 'gluten', 'barley', 'rye', 'bread', 'pasta', 'flour',
    'fish', 'salmon', 'tuna', 'cod', 'haddock', 'shrimp', 'crab', 'lobster', 'clam', 'mussel', 'oyster', 'scallop',
    'sesame', 'tahini'
]

def may_contain_allergens(product_name, categories, labels, ingredients):
    """
    Combines relevant text fields and checks for the presence of common allergens.

    Args:
        product_name (string): The name of the product.
        categories (string): The categories the product belongs to.
        labels (string): The labels associated with the product.
        ingredients (string): The list of ingredients in the product.

    Returns:
        string: A warning message if common allergens are detected, otherwise a message indicating no common allergens.
    """

    combined_text = ' '.join(filter(None, [str(product_name).lower(), str(categories).lower(), str(labels).lower(), str(ingredients).lower()]))

    if any(allergen in combined_text for allergen in combined_allergens):
        return 'May contain common allergens'
    else:
        return 'No common allergens detected'

food_df_cleaned_allergens = food_df_cleaned_diet.copy()
food_df_cleaned_allergens['allergen_warning'] = food_df_cleaned_allergens.apply(
    lambda row: may_contain_allergens(
        row['product_name'], row['categories_en'], row['labels_en'], row['ingredients_text']
    ), axis=1
)


In [None]:
def categorize_meal(row):
    """
    Categorizes a food item based on keywords found in its product name, categories, and ingredients.

    Args:
        row (pandas.Series): A row of the DataFrame containing the product details.

    Returns:
        string: The meal category, which can be 'breakfast', 'snack', 'not_meal', or 'lunch_dinner'.
    """

    meal_keywords = {
        'breakfast': ['cereal', 'oatmeal', 'yogurt', 'eggs', 'granola', 'breakfast'],
        'snack': ['snack', 'nuts', 'cookie', 'chips', 'bar', 'crisps', 'biscuits', 'cookies', 'supplements', 'powder', 'beverage', 'drink', 'milk', 'soup', 'dessert'],
        'not_meal': ['canned', 'flour', 'seeds', 'cocoa', 'condiment', 'oil', 'sauce', 'dressing', 'condiment', 'spread', 'sauces', 'condiments']
    }
    
    combined_text = " ".join([str(row.get(column, '')).lower() for column in ['product_name', 'categories_en', 'ingredients_text']])

    if any(keyword in combined_text for keyword in meal_keywords['not_meal']):
        return 'not_meal'
    
    if any(keyword in combined_text for keyword in meal_keywords['snack']):
        return 'snack'
    
    if 'pasta' in combined_text or 'lentil' in combined_text:
        return 'lunch_dinner'
    
    if any(keyword in combined_text for keyword in meal_keywords['breakfast']):
        return 'breakfast'

    return 'lunch_dinner'

food_df_cleaned_meals = food_df_cleaned_diet.copy()
food_df_cleaned_meals['meal_category'] = food_df_cleaned_meals.apply(categorize_meal, axis=1)

food_df_cleaned_meals[['product_name', 'categories_en', 'ingredients_text', 'meal_category']].head(100)

food_types = ['lunch_dinner', 'breakfast', 'snack'] 

food_df_cleaned_meals = food_df_cleaned_meals[food_df_cleaned_meals['meal_category'].isin(food_types)]
food_df_cleaned_meals.reset_index(drop=True, inplace=True)


In [None]:
cspi_avoid_additives = [
    "Acesulfame potassium",
    "Aloe vera",
    "Aspartame",
    "Azodicarbonamide",
    "Brominated vegetable oil",
    "bva",
    "bho",
    "Butylated hydroxyanisole",
    "Caramel coloring",
    "Cyclamate",
    "Ginkgo biloba",
    "Olestra (olean)",
    "Potassium bromate",
    "Potassium iodate",
    "Propyl gallate",
    "Saccharin",
    "Sodium nitrate",
    "Sucralose",
    "TBHQ",
    "tert-butylhydroquinone",
    "Titanium dioxide",
    "Trans fat",
    "Red 40", "E129",
    "Yellow 5", "E102",
    "Yellow 6", "E110",
    "Blue 1", "E133",
    "Blue 2", "E132",
    "Green 3", "E143",
    "Orange B", "E110",
    "Red 3", "E127"
]

cspi_avoid_additives = [additive.lower() for additive in cspi_avoid_additives]

def check_additives(additives_str):
    """
    Checks if any additives in the provided string are on the CSPI avoid list.

    Args:
        additives_str (string): A comma-separated string of additives.

    Returns:
        bool: True if any additive is on the CSPI avoid list, otherwise False.
    """

    if pd.isna(additives_str):
        return False
    additives_list = [additive.strip().lower() for additive in additives_str.split(',')]
    for additive in additives_list:
        if any(avoid_additive in additive for avoid_additive in cspi_avoid_additives):
            return True
    return False

food_df_cleaned_additives = food_df_cleaned_meals.copy() 

food_df_cleaned_additives = food_df_cleaned_additives[food_df_cleaned_additives['additives_en'].apply(check_additives) == False]

NLP for Data Cleaning

In [None]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

In [None]:
def clean_categories(cat_str):
    """
    Cleans and processes a comma-separated string of categories.

    Args:
        cat_str (string): A comma-separated string containing category names.

    Returns:
        list: A list of cleaned category names with stopwords removed and words converted to lowercase.
    """
    if not isinstance(cat_str, str):
        return []
    categories = cat_str.lower().split(',')
    cleaned_categories = []

    stop_words = set(stopwords.words('english'))

    for category in categories:
        words = [word for word in category.strip().split() if word not in stop_words]
        unique_words = []
        [unique_words.append(word) for word in words if word not in unique_words]
        cleaned_category = ' '.join(unique_words)
        cleaned_categories.append(cleaned_category)

    return cleaned_categories

food_df_cleaned_cat = food_df_cleaned_additives.copy()
food_df_cleaned_cat['cleaned_categories'] = food_df_cleaned_cat['categories_en'].apply(clean_categories)

In [None]:
from nltk.stem import PorterStemmer
ps = PorterStemmer()

def stem_categories(categories):
    if isinstance(categories, list):
        categories = ' '.join(categories)
    
    return [ps.stem(word) for word in categories.split()]

#filter to non null values 
food_df_clean_cat_porter = food_df_cleaned_cat.dropna(subset=['categories_en']).copy()
food_df_clean_cat_porter['category_cleaned_stemmed'] = food_df_clean_cat_porter['cleaned_categories'].apply(stem_categories)

In [None]:
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')

In [None]:
lemmatizer = WordNetLemmatizer()

def lemmatize_categories(categories):
    if isinstance(categories, list):
        categories = ' '.join(categories)
    return [lemmatizer.lemmatize(word) for word in categories.split()]
food_df_clean_cat_lem = food_df_clean_cat_porter.copy()
food_df_clean_cat_lem['category_cleaned_lemmatizer'] = food_df_clean_cat_lem['cleaned_categories'].apply(stem_categories)

food_df_clean_cat_lem.drop(columns=['category_cleaned_lemmatizer', 'category_cleaned_stemmed'], inplace=True)

In [None]:
# Function to convert list of words back into a string
def join_words(word_list):
    return ' '.join(word_list)

food_df = food_df_clean_cat_lem.copy()
food_df['categories_str'] = food_df['cleaned_categories'].apply(join_words)

In [None]:
def contains_accent_marks(text):
    """
    Check if the text contains any accent marks.
    """
    accent_pattern = re.compile(r'[\u00C0-\u024F]')
    return bool(accent_pattern.search(str(text)))

food_df = food_df[~food_df['product_name'].apply(contains_accent_marks) & ~food_df['ingredients_text'].apply(contains_accent_marks)]


Using Machine Learning to Cluster Foods to Groups

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score


ngram_ranges = [(1, 1), (1, 2), (2, 2)]
max_dfs = [0.5, 0.75, 1.0]
min_dfs = [1, 2, 5]

best_score = -1
best_config = {}

for ngram_range in ngram_ranges:
    for max_df in max_dfs:
        for min_df in min_dfs:
            vectorizer = TfidfVectorizer(ngram_range=ngram_range, max_df=max_df, min_df=min_df)
            X = vectorizer.fit_transform(food_df['categories_str'])
            
            kmeans = KMeans(n_clusters=12, n_init=10, random_state=42)
            cluster_labels = kmeans.fit_predict(X)
            
            score = silhouette_score(X, cluster_labels)
            
            if score > best_score:
                best_score = score
                best_config = {
                    'ngram_range': ngram_range,
                    'max_df': max_df,
                    'min_df': min_df
                }

print(f"Best Score: {best_score}")
print(f"Best Configuration: {best_config}")


In [None]:
ngram_range = best_config['ngram_range']
max_df = best_config['max_df']
min_df = best_config['min_df']

tfidf = TfidfVectorizer(ngram_range=ngram_range, max_df=max_df, min_df=min_df)
X_tfidf = tfidf.fit_transform(food_df['categories_str'])

svd = TruncatedSVD(n_components=12, random_state=42)
X = svd.fit_transform(X_tfidf)

best_score = -1
best_k = 0
silhouette_scores = []

for k in range(5, 20):  
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    cluster_labels = kmeans.fit_predict(X)
    
    silhouette_avg = silhouette_score(X, cluster_labels)
    silhouette_scores.append(silhouette_avg)
    
    if silhouette_avg > best_score:
        best_score = silhouette_avg
        best_k = k

print(f"The best silhouette score is {best_score} for k = {best_k}")

In [None]:

from sklearn.decomposition import TruncatedSVD
from sklearn.metrics import davies_bouldin_score, calinski_harabasz_score

best_n_componenets = 12
svd = TruncatedSVD(n_components=best_n_componenets, random_state=42)


X_reduced = svd.fit_transform(X)

db_index = davies_bouldin_score(X_reduced, cluster_labels)
ch_score = calinski_harabasz_score(X_reduced, cluster_labels)
silhouette_avg = silhouette_score(X_reduced, cluster_labels)

print(f"Davies-Bouldin Index: {db_index}")
print(f"Calinski-Harabasz Score: {ch_score}")
print(f"The average silhouette_score is : {silhouette_avg}")


In [None]:
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans

best_ngram_range = best_config['ngram_range']  
best_max_df = best_config['max_df']      
best_min_df = best_config['min_df']     
best_n_clusters = best_k    

pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(ngram_range=best_ngram_range,
                              max_df=best_max_df,
                              min_df=best_min_df)),
    ('svd', TruncatedSVD(n_components=best_n_componenets, random_state=42)),  
])

pipeline.fit(food_df['categories_str'])

cluster_labels = pipeline.predict(food_df['categories_str'])

food_df['cluster'] = cluster_labels

# Validate clusters
for cluster in range(best_n_clusters):
    print(f"\nCluster {cluster}:")
    print(food_df[food_df['cluster'] == cluster]['product_name'].head(10)) 


In [None]:
def display_examples_from_clusters(df, num_examples=10):
    for cluster_id in range(best_n_clusters):
        print(f"Examples from Cluster {cluster_id}:")
        cluster_data = df[df['cluster'] == cluster_id]['categories_str'].head(num_examples)
        for example in cluster_data:
            print(f" - {example}")
        print("\n")


In [None]:
from collections import Counter

columns = ['cluster', 'diet_types', 'cleaned_categories']
cluster_label_df = food_df[columns].copy()

clusters = cluster_label_df['cluster'].unique()

clusters_top_kw = {}

for cluster in clusters:
    cluster_categories = cluster_label_df[cluster_label_df['cluster'] == cluster]['cleaned_categories']
    
    all_words = [] 
    for categories in cluster_categories:

        all_words.extend(categories)  

    word_count = Counter(all_words)

    top_words = word_count.most_common(10)

    clusters_top_kw[cluster] = top_words


Go through list of top keywords to name each cluster group, results will change if you change components and cluster groups

In [None]:
cluster_titles = [
    "Cheeses and Alternatives",  # 0
    "Mixed Plant-Based Foods",  # 1
    "Prepared Meats",  # 2
    "Fermented Dairy Desserts",  # 3
    "Plant-Based Fruits and Vegetables",  # 4
    "Frozen Foods",  # 5
    "Greek-Style Yogurts",  # 6
    "Meats",  # 7
    "Cheddar Cheeses",  # 8
    "Milks, Snacks, and Soups",  # 9
    "Sausages",  # 10
    "Frozen Poultry",  # 11
    "Eggs",  # 12
    "Whole Yogurts",  # 13
    "Skimmed Milks",  # 14
    "Nuts and Cereals",  # 15
    "Smoked Sausages",  # 16
    "Frozen Chicken Products",  # 17
    "Frozen Vegetables",  # 18
]

cluster_to_diet_map = {
    "Cheeses and Alternatives": ["Vegetarian"],  # 0
    "Mixed Plant-Based Foods": ["Vegan", "Vegetarian"],  # 1
    "Prepared Meats": ["Paleo"],  # 2
    "Fermented Dairy Desserts": ["Vegetarian"],  # 3
    "Plant-Based Fruits and Vegetables": ["Vegan", "Vegetarian"],  # 4
    "Frozen Foods": ["Vegan", "Vegetarian"],  # 5
    "Greek-Style Yogurts": ["Vegetarian"],  # 6
    "Meats": ["Paleo"],  # 7
    "Cheddar Cheeses": ["Vegetarian"],  # 8
    "Milks, Snacks, and Soups": ["Vegan", "Vegetarian"],  # 9
    "Sausages": ["Paleo"],  # 10
    "Frozen Poultry": ["Paleo"],  # 11
    "Eggs": ["Paleo"],  # 12
    "Whole Yogurts": ["Vegetarian"],  # 13
    "Skimmed Milks": ["Vegetarian"],  # 14
    "Nuts and Cereals": ["Vegan", "Vegetarian"],  # 15
    "Smoked Sausages": ["Paleo"],  # 16
    "Frozen Chicken Products": ["Paleo"],  # 17
    "Frozen Vegetables": ["Vegan", "Vegetarian"],  # 18
}

In [None]:
food_df = food_df.copy()
cluster_title_dict = {i: title for i, title in enumerate(cluster_titles)}

food_df['cluster_labels'] = food_df['cluster'].replace(cluster_title_dict)

food_df.reset_index(drop=True, inplace=True)

food_df['diet_restrictions'] = food_df['cluster_labels'].map(lambda x: cluster_to_diet_map.get(x, []))

def combine_diet_types(row):
    existing_diets = row['diet_types']
    diet_restrictions = row['diet_restrictions']
    combined_diets = list(set(existing_diets + diet_restrictions))
    combined_diets.append('Balanced')  
    return list(set(combined_diets))  
food_df['final_diet_types'] = food_df.apply(combine_diet_types, axis=1)

Output results into csv to use again when calculating meal plan

In [None]:
food_df = pd.to_csv('us_food_data_filtered.csv')