In [1]:
import os
import pandas as pd
import json
import requests
import time
from tqdm.auto import tqdm
from google import genai
from dotenv import load_dotenv
import pickle
import random

load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

# Initialize Google Gemini client
client = genai.Client(api_key=GOOGLE_API_KEY)


In [2]:
user_items = pd.read_csv('/Users/vince/Salk/mCC_Analysis/data/compliant_user_items.csv')

In [8]:
# Load JSON file containing food corrections
with open('/Users/vince/Salk/mCC_Analysis/food_corrections.json', 'r') as f:
    food_corrections = json.load(f)

# Create a new column with corrected food names
user_items['corrected_food'] = user_items['parsing_result'].map(food_corrections)


In [10]:
user_items['corrected_food'] = user_items['corrected_food'].str.lower()

In [None]:
from google import genai
from pydantic import BaseModel, Field
from typing import List, Optional
import os
import json
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

# Initialize Google Gemini client
client = genai.Client(api_key=GOOGLE_API_KEY)

# Define nested models
class ServingSize(BaseModel):
    grams: float = Field(description="Serving size in grams")
    household: str = Field(description="Household measure (e.g., '1 cup')")

class Macronutrients(BaseModel):
    calories: float = Field(description="Energy in kcal")
    protein: float = Field(description="Protein content in grams")
    fat: float = Field(description="Fat content in grams")
    carbohydrates: float = Field(description="Carbohydrate content in grams")
    fiber: float = Field(description="Dietary fiber in grams")
    sugar: Optional[float] = Field(description="Sugar content in grams")

# Main food info model - no default values
class FoodInfo(BaseModel):
    food_name: str = Field(description="Standardized name of the food item")
    description: str = Field(description="Brief description of what this food is")
    serving_size: ServingSize = Field(description="Standard serving size information")
    macronutrients_per_100g: Macronutrients = Field(description="Nutrient content per 100g")
    taxonomy: List[str] = Field(description="Hierarchical food categories from general to specific")
    ingredients: List[str] = Field(description="Typical ingredients if it's a prepared food")
    variants: List[str] = Field(description="Common variants or alternative names")

def analyze_food_item(food_label):
    """
    Use Gemini to analyze a food item with a strictly enforced response schema
    
    Args:
        food_label: The name of the food item to analyze
        
    Returns:
        FoodInfo object or None if analysis fails
    """
    prompt = f"""
    Analyze this food item in detail: {food_label}
    
    Provide complete nutritional database information:
    - Name and description
    - Standard serving sizes
    - Macronutrients per 100g
    - Hierarchical food categories
    - Ingredients (if applicable)
    - Common variants or alternate names
    """
    
    try:
        # Request analysis from Gemini with schema enforcement
        response = client.models.generate_content(
            model="gemini-2.0-flash",
            contents=prompt,
            config={
                'response_mime_type': 'application/json',
                'response_schema': FoodInfo,
                'system_instruction': 'You are a nutrition database expert. Provide accurate, detailed food information following database standards.',
                'temperature': 0.0
            }
        )
        
        # Return the parsed response (already validated against the schema)
        return response.parsed
    
    except Exception as e:
        print(f"Error analyzing food item '{food_label}': {e}")
        return None

# Example usage with error handling
def get_food_info(food_label):
    result = analyze_food_item(food_label)
    if result:
        print(f"Successfully analyzed: {result.food_name}")
        return result
    else:
        print(f"Failed to analyze: {food_label}")
        return None

# Test with a food item
food_info = get_food_info("balsamic vinegar")
if food_info:
    print(json.dumps(food_info.model_dump(), indent=2))

Successfully analyzed: Balsamic Vinegar
{
  "food_name": "Balsamic Vinegar",
  "description": "A dark, concentrated vinegar produced from white Trebbiano grape juice, aged in wooden barrels.",
  "serving_size": {
    "grams": 15.0,
    "household": "1 tbsp"
  },
  "macronutrients_per_100g": {
    "calories": 88.0,
    "protein": 0.0,
    "fat": 0.0,
    "carbohydrates": 17.0,
    "fiber": 0.0,
    "sugar": 17.0
  },
  "taxonomy": [
    "Condiments",
    "Vinegars",
    "Balsamic Vinegars"
  ],
  "ingredients": [
    "Grape must",
    "Wine vinegar"
  ],
  "variants": [
    "Balsamic Vinegar of Modena",
    "Traditional Balsamic Vinegar",
    "White Balsamic Vinegar"
  ]
}


In [25]:
from google import genai
from pydantic import BaseModel, Field
from typing import List, Optional
import os
import json
import time
import random
from tqdm.auto import tqdm

# Define nested models
class ServingSize(BaseModel):
    grams: float = Field(description="Serving size in grams")
    household: str = Field(description="Household measure (e.g., '1 cup')")

class Macronutrients(BaseModel):
    calories: float = Field(description="Energy in kcal")
    protein: float = Field(description="Protein content in grams")
    fat: float = Field(description="Fat content in grams")
    carbohydrates: float = Field(description="Carbohydrate content in grams")
    fiber: float = Field(description="Dietary fiber in grams")
    sugar: Optional[float] = Field(description="Sugar content in grams")

# Main food info model - no default values
class FoodInfo(BaseModel):
    food_name: str = Field(description="Standardized name of the food item")
    description: str = Field(description="Brief description of what this food is")
    serving_size: ServingSize = Field(description="Standard serving size information")
    macronutrients_per_100g: Macronutrients = Field(description="Nutrient content per 100g")
    taxonomy: List[str] = Field(description="Hierarchical food categories from general to specific")
    ingredients: List[str] = Field(description="Typical ingredients if it's a prepared food")
    variants: List[str] = Field(description="Common variants or alternative names")

def analyze_food_item(food_label, client, output_dir="food_info", base_delay=1.0, max_retries=3):
    """
    Use Gemini to analyze a food item with caching and exponential backoff
    
    Args:
        food_label: The name of the food item to analyze
        client: Initialized Gemini client
        output_dir: Directory to save JSON results
        base_delay: Base delay between retries in seconds
        max_retries: Maximum number of retry attempts
        
    Returns:
        FoodInfo object or None if analysis fails
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Create a safe filename
    safe_filename = "".join([c if c.isalnum() else "_" for c in food_label]).lower()
    json_path = os.path.join(output_dir, f"{safe_filename}.json")
    
    # Check if we already have results for this food
    if os.path.exists(json_path):
        try:
            with open(json_path, 'r') as f:
                existing_data = json.load(f)
                # Convert the JSON back to a FoodInfo object
                return FoodInfo(**existing_data)
        except (json.JSONDecodeError, ValueError) as e:
            # If the file is corrupted or invalid, we'll re-analyze
            print(f"Found invalid cached data for '{food_label}', re-analyzing: {e}")
    
    prompt = f"""
    Analyze this food item in detail: {food_label}
    
    Provide complete nutritional database information:
    - Name and description
    - Standard serving sizes (in grams and household measures)
    - Macronutrients per 100g (calories, protein, fat, carbs, fiber, sugar)
    - Hierarchical food categories from general to specific
    - Ingredients (if applicable)
    - Common variants or alternate names
    """
    
    # Try with exponential backoff
    for attempt in range(max_retries + 1):
        try:
            # Request analysis from Gemini with schema enforcement
            response = client.models.generate_content(
                model="gemini-2.0-flash",
                contents=prompt,
                config={
                    'response_mime_type': 'application/json',
                    'response_schema': FoodInfo,
                    'system_instruction': 'You are a nutrition database expert. Provide accurate, detailed food information following database standards.',
                    'temperature': 0.0
                }
            )
            
            # Get the parsed result
            result = response.parsed
            
            # Save the result to a JSON file
            with open(json_path, 'w') as f:
                json.dump(result.model_dump(), f, indent=2)
                
            return result
            
        except Exception as e:
            if attempt < max_retries:
                # Calculate delay with exponential backoff and a bit of jitter
                delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5)
                print(f"Error analyzing '{food_label}' (attempt {attempt+1}/{max_retries+1}): {e}")
                print(f"Retrying in {delay:.1f} seconds...")
                time.sleep(delay)
            else:
                print(f"Failed to analyze '{food_label}' after {max_retries+1} attempts: {e}")
                return None

def process_food_list(food_items, client, output_dir="food_info", batch_size=10):
    """
    Process a list of food items, saving results as JSON files
    
    Args:
        food_items: List of food items to analyze
        client: Initialized Gemini client
        output_dir: Directory to save JSON results
        batch_size: How many items to process before saving a summary
        
    Returns:
        Dictionary mapping food items to their analysis results
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Initialize results dictionary
    results = {}
    
    # Check for existing summary file
    summary_path = os.path.join(output_dir, "summary.json")
    if os.path.exists(summary_path):
        try:
            with open(summary_path, 'r') as f:
                # Load just the keys (food names) that we've already processed
                existing_results = json.load(f)
                print(f"Found existing summary with {len(existing_results)} foods")
                
                # Filter out foods that have already been processed
                food_items = [food for food in food_items if food not in existing_results]
                print(f"Remaining foods to process: {len(food_items)}")
        except json.JSONDecodeError:
            print("Error reading summary file, starting fresh")
    
    # Process all food items with progress tracking
    with tqdm(total=len(food_items), desc="Processing foods") as progress:
        for i, food in enumerate(food_items):
            # Analyze the food item
            food_info = analyze_food_item(food, client, output_dir)
            
            if food_info:
                results[food] = food_info.model_dump()
                
            # Update progress
            progress.update(1)
            
            # Save summary periodically
            if (i + 1) % batch_size == 0 or i == len(food_items) - 1:
                # Update summary file with all processed foods
                with open(summary_path, 'w') as f:
                    # Merge with any existing results
                    all_results = {}
                    if os.path.exists(summary_path):
                        try:
                            with open(summary_path, 'r') as existing:
                                all_results = json.load(existing)
                        except json.JSONDecodeError:
                            pass
                    
                    # Update with new results
                    all_results.update(results)
                    json.dump(all_results, f, indent=2)
                
                progress.set_description(f"Processed {i+1}/{len(food_items)} foods")
    
    print(f"Completed processing {len(results)} food items")
    return results

    
# Process a list of foods (could be from your DataFrame)
food_list = user_items['corrected_food'].unique().tolist()

# Process the foods with caching and exponential backoff
results = process_food_list(food_list, client)

Processing foods:   0%|          | 0/2663 [00:00<?, ?it/s]

Error analyzing 'atkins shake' (attempt 1/4): 'NoneType' object has no attribute 'model_dump'
Retrying in 1.2 seconds...
Error analyzing 'atkins shake' (attempt 2/4): 'NoneType' object has no attribute 'model_dump'
Retrying in 2.2 seconds...
Completed processing 2663 food items


In [19]:
food_info = get_food_info("frappucino")
if food_info:
    print(json.dumps(food_info.model_dump(), indent=2))

Successfully analyzed: Frappuccino
{
  "food_name": "Frappuccino",
  "description": "A blended coffee drink, typically made with coffee, ice, milk, and flavored syrups, topped with whipped cream and sauces.",
  "serving_size": {
    "grams": 354.0,
    "household": "12 fl oz (Starbucks Tall)"
  },
  "macronutrients_per_100g": {
    "calories": 82.0,
    "protein": 1.2,
    "fat": 1.9,
    "carbohydrates": 15.0,
    "fiber": 0.0,
    "sugar": 13.0
  },
  "taxonomy": [
    "Beverages",
    "Coffee",
    "Blended Coffee Drinks",
    "Frappuccino"
  ],
  "ingredients": [
    "Ice",
    "Coffee",
    "Milk",
    "Sugar",
    "Flavored Syrup (e.g., vanilla, caramel)",
    "Whipped Cream",
    "Sauce (e.g., caramel, chocolate)"
  ],
  "variants": [
    "Coffee Frappuccino",
    "Caramel Frappuccino",
    "Mocha Frappuccino",
    "Vanilla Bean Frappuccino",
    "Other flavored Frappuccinos"
  ]
}


In [23]:
food_info = get_food_info("egg")
if food_info:
    print(json.dumps(food_info.model_dump(), indent=2))

Successfully analyzed: Egg, Chicken, Whole, Raw
{
  "food_name": "Egg, Chicken, Whole, Raw",
  "description": "A whole, raw chicken egg, commonly used as a food source.",
  "serving_size": {
    "grams": 50.0,
    "household": "1 large egg"
  },
  "macronutrients_per_100g": {
    "calories": 143.0,
    "protein": 12.6,
    "fat": 9.51,
    "carbohydrates": 0.72,
    "fiber": 0.0,
    "sugar": 0.5
  },
  "taxonomy": [
    "Animal Products",
    "Poultry",
    "Eggs"
  ],
  "ingredients": [
    "Egg"
  ],
  "variants": [
    "Chicken Egg",
    "Hen's Egg",
    "Whole Egg"
  ]
}


In [21]:
user_items['corrected_food'].value_counts()

corrected_food
coffee            192480
tea                64324
cheese             55573
egg                55086
salad              54695
                   ...  
realemon               1
buttermint             1
hershey's lite         1
pão de queijo          1
sapodilla              1
Name: count, Length: 2663, dtype: int64

In [None]:

# # To process multiple items:
# def process_foods(food_items):
#     results = {}
#     for food in food_items:
#         results[food] = analyze_food_item(food)
#     return results