<a href="https://colab.research.google.com/github/pradraju/re/blob/main/ingredients_brain.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install openai google-genai aiohttp pydantic requests



DeepSeek client initialized successfully.
✅ DeepSeek working: Hello from DeepSeek
Call count: 1


In [None]:
import json
import asyncio
from typing import List, Tuple, Dict, Any, Optional
from datetime import datetime
import os
from pathlib import Path
from openai import AsyncOpenAI

class DeepSeekService:
    def __init__(self, client=None):
        self.client = client
        self.call_count = 0
        self.feedback_enabled = True

        if self.client:
            print("DeepSeekService initialized with provided client for testing.")
            return

        try:
            from google.colab import userdata
            api_key = userdata.get('DEEPSEEK_API_KEY')
            if not api_key:
                print("DEEPSEEK_API_KEY not found in Colab secrets.")
            else:
                self.client = AsyncOpenAI(
                    api_key=api_key, base_url="https://api.deepseek.com"
                )
                print("DeepSeek client initialized successfully.")
        except Exception as e:
            print(f"Failed to initialize DeepSeek client: {e}")

    async def invoke_async(self, prompt: str) -> str:
        """Async invoke using AsyncOpenAI with DeepSeek base_url."""
        if not self.client:
            raise Exception("Cannot invoke DeepSeek, client not initialized.")

        resp = await self.client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.2,
        )
        self.call_count += 1
        return resp.choices[0].message.content

class OpenAIService:
    def __init__(self, client=None):
        self.client = client
        self.call_count = 0
        self.feedback_enabled = True

        if self.client:
            print("OpenAIService initialized with provided client for testing.")
            return

        try:
            from google.colab import userdata
            api_key = userdata.get('OPENAI_API_KEY')
            if not api_key:
                print("OPENAI_API_KEY not found in Colab secrets.")
            else:
                self.client = AsyncOpenAI(api_key=api_key)
                print("OpenAI client initialized successfully.")
        except Exception as e:
            print(f"Failed to initialize OpenAI client: {e}")

    async def invoke_async(self, prompt: str) -> str:
        """Async invoke using AsyncOpenAI."""
        if not self.client:
            raise Exception("Cannot invoke OpenAI, client not initialized.")

        resp = await self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.2,
        )
        self.call_count += 1
        return resp.choices[0].message.content

class GeminiService:
    def __init__(self, client=None):
        self.client = client
        self.call_count = 0
        self.feedback_enabled = True

        if self.client:
            print("GeminiService initialized with provided client for testing.")
            return

        try:
            from google.colab import userdata
            api_key = userdata.get('GEMINI_API_KEY')
            if not api_key:
                print("GEMINI_API_KEY not found in Colab secrets.")
                return

            from google import genai
            self.client = genai.Client(api_key=api_key)
            print("Gemini client initialized successfully.")
        except Exception as e:
            print(f"Failed to initialize Gemini client: {e}")
            self.client = None

    async def invoke_async(self, prompt: str) -> str:
        """Async invoke using google-genai client."""
        if not self.client:
            raise Exception("Cannot invoke Gemini, client not initialized.")

        try:
            resp = await self.client.aio.models.generate_content(
                model="gemini-1.5-flash",
                contents=prompt,
            )
            self.call_count += 1
            return getattr(resp, "text", "")
        except Exception as e:
            raise Exception(f"Gemini async invoke failed: {e}")

# Test all services
async def test_services():
    services = {}

    # Test DeepSeek
    try:
        deepseek = DeepSeekService()
        if deepseek.client:
            result = await deepseek.invoke_async("Say 'Hello from DeepSeek'")
            services['deepseek'] = deepseek
            print(f"✅ DeepSeek working: {result}")
    except Exception as e:
        print(f"❌ DeepSeek failed: {e}")

    # Test OpenAI
    try:
        openai = OpenAIService()
        if openai.client:
            result = await openai.invoke_async("Say 'Hello from OpenAI'")
            services['openai'] = openai
            print(f"✅ OpenAI working: {result}")
    except Exception as e:
        print(f"❌ OpenAI failed: {e}")

    # Test Gemini
    try:
        gemini = GeminiService()
        if gemini.client:
            result = await gemini.invoke_async("Say 'Hello from Gemini'")
            services['gemini'] = gemini
            print(f"✅ Gemini working: {result}")
    except Exception as e:
        print(f"❌ Gemini failed: {e}")

    print(f"\nAvailable services: {list(services.keys())}")
    return services

# Run the test
services = await test_services()

DeepSeek client initialized successfully.
✅ DeepSeek working: Hello from DeepSeek! 😊
OpenAI client initialized successfully.
✅ OpenAI working: Hello from OpenAI! How can I assist you today?
Gemini client initialized successfully.
✅ Gemini working: Hello from Gemini


Available services: ['deepseek', 'openai', 'gemini']


In [None]:
from openai import OpenAI

class OpenAIService:
    def __init__(self, client=None):
        self.client = client
        self.call_count = 0

        if self.client:
            print("OpenAIService initialized with provided client for testing.")
            return

        try:
            from google.colab import userdata
            api_key = userdata.get('OPENAI_API_KEY')
            if not api_key:
                print("OPENAI_API_KEY not found in Colab secrets.")
            else:
                self.client = OpenAI(api_key=api_key)
                print("OpenAI client initialized successfully.")
        except Exception as e:
            print(f"Failed to initialize OpenAI client: {e}")

    def invoke_openai(self, prompt):
        """Invoke OpenAI API."""
        if not self.client:
            raise Exception("Cannot invoke OpenAI, client not initialized.")

        response = self.client.chat.completions.create(
            model="gpt-4o",  # Using o4-mini-high for coding
            messages=[{"role": "user", "content": prompt}],
            temperature=0.2
        )
        return response.choices[0].message.content

# Test OpenAI
try:
    openai = OpenAIService()

    # Test with a simple prompt
    test_prompt = "Say 'Hello from OpenAI' and nothing else."
    result = openai.invoke_openai(test_prompt)
    print(f"✅ OpenAI working: {result}")

except Exception as e:
    print(f"❌ OpenAI setup failed: {e}")

OpenAI client initialized successfully.
✅ OpenAI working: Hello from OpenAI


🔄 Loading ingredients database...
❌ Invalid database entry: 2 validation errors for DatabaseEntry
id
  Field required [type=missing, input_value={'parents': [], 'names': ...rue, 'ecodes': ['E100']}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
names
  Input should be a valid string [type=string_type, input_value={'en': ['e100', 'curcumin...: ['e100', '姜黄素']}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
❌ Invalid database entry: 2 validation errors for DatabaseEntry
id
  Field required [type=missing, input_value={'parents': [], 'names': ...rue, 'ecodes': ['E106']}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
names
  Input should be a valid string [type=string_type, input_value={'en': ['e106', 'flavin m..., 'sv': ['e106', 'fmn']}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
❌ Invalid 

In [None]:
def load_ingredients_from_file(file_path: str):
    """Load ingredients from uploaded JSON file"""
    print(f"Loading ingredients from {file_path}...")

    with open(file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    ingredients = []
    skipped_count = 0

    for item in data:
        try:
            # Skip ingredients with null or empty names
            names = item.get('names')
            if not names or not isinstance(names, list) or len(names) == 0:
                skipped_count += 1
                continue

            # Keep id and all names
            ingredient = {
                'id': item['id'],
                'names': names  # Keep all names
            }
            ingredients.append(ingredient)

        except Exception as e:
            print(f"Error parsing ingredient {item.get('id', 'unknown')}: {e}")
            skipped_count += 1

    print(f"Loaded {len(ingredients)} ingredients, skipped {skipped_count} due to missing names")
    return ingredients

ingredients = load_ingredients_from_file('./exported_ingredients.json')
print(f"found ingredients {len(ingredients)}")

Loading ingredients from ./exported_ingredients.json...


FileNotFoundError: [Errno 2] No such file or directory: './exported_ingredients.json'

In [None]:
from pydantic import BaseModel, Field
from typing import Optional

# Pydantic model for validation
class IngredientDietaryAnalysis(BaseModel):
    """Pydantic model for ingredient dietary analysis results"""
    id: str = Field(..., description="Ingredient ID from database")
    name: str = Field(..., description="Primary ingredient name")

    # Diet compatibility
    keto: Optional[bool] = Field(None, description="Keto diet compatible")
    paleo: Optional[bool] = Field(None, description="Paleo diet compatible")
    whole30: Optional[bool] = Field(None, description="Whole30 diet compatible")
    vegan: Optional[bool] = Field(None, description="Vegan diet compatible")
    vegetarian: Optional[bool] = Field(None, description="Vegetarian diet compatible")

    # Source classification
    source: Optional[str] = Field(None, description="Primary source type of ingredient")

    # Allergen information
    contains_dairy: Optional[bool] = Field(None, description="Contains dairy products")
    contains_eggs: Optional[bool] = Field(None, description="Contains eggs")
    contains_gluten: Optional[bool] = Field(None, description="Contains gluten")
    contains_meat: Optional[bool] = Field(None, description="Contains meat")
    contains_soy: Optional[bool] = Field(None, description="Contains soy")
    contains_nuts: Optional[bool] = Field(None, description="Contains nuts")
    from_palm_oil: Optional[bool] = Field(None, description="Derived from palm oil")

    # Analysis metadata
    confidence: float = Field(..., description="Confidence score (0.0-1.0)")
    reasoning: str = Field(..., description="Brief explanation of the classification")

# Prompt constants
BASE_PROMPT = """You are a food science expert specializing in ingredient dietary classification and allergen analysis.

TASK: Analyze the following batch of food ingredients/additives and provide comprehensive dietary classification for each one.

ANALYSIS REQUIREMENTS:
For each ingredient, analyze and determine:

1. **Diet Compatibility:**
   - keto: Compatible with ketogenic diet (very low carb, high fat)
   - paleo: Compatible with paleolithic diet (no processed foods, grains, legumes, dairy)
   - whole30: Compatible with Whole30 program (no grains, dairy, legumes, sugar, alcohol)
   - vegan: Contains no animal products whatsoever
   - vegetarian: Contains no meat/fish (but may contain dairy/eggs)

2. **Source Classification:**
   Classify the PRIMARY source type. Suggested values: plant, animal, meat, dairy, insect, fruit, vegetable, grain, legume, nut, seed, spice, synthetic, mineral, unknown. You may use other appropriate descriptive terms if none of these fit.

3. **Allergen Analysis:**
   - contains_dairy: Contains milk, cheese, butter, whey, casein, etc.
   - contains_eggs: Contains eggs or egg-derived ingredients
   - contains_gluten: Contains wheat, barley, rye, or gluten-containing grains
   - contains_meat: Contains meat, poultry, fish, or meat-derived ingredients
   - contains_soy: Contains soybeans or soy-derived ingredients
   - contains_nuts: Contains tree nuts or nut-derived ingredients
   - from_palm_oil: Derived from or contains palm oil

4. **Confidence & Reasoning:**
   - Provide confidence score (0.0-1.0) based on certainty of classification
   - Give brief reasoning explaining the classification

IMPORTANT GUIDELINES:
- Use ingredient names for classification
- Consider that additives like E-codes are often synthetic
- Be conservative with allergen classifications (when in doubt, mark as true)
- For diet compatibility, consider processing and source ingredients
- Provide confidence score reflecting your certainty"""

OUTPUT_FORMAT = """OUTPUT FORMAT:
Return a valid JSON array with objects, each following this exact structure:

[
  {
    "id": "ingredient_id_from_input",
    "name": "ingredient_name",
    "keto": true/false/null,
    "paleo": true/false/null,
    "whole30": true/false/null,
    "vegan": true/false/null,
    "vegetarian": true/false/null,
    "source": "descriptive_source_type_string",
    "contains_dairy": true/false/null,
    "contains_eggs": true/false/null,
    "contains_gluten": true/false/null,
    "contains_meat": true/false/null,
    "contains_soy": true/false/null,
    "contains_nuts": true/false/null,
    "from_palm_oil": true/false/null,
    "confidence": 0.95,
    "reasoning": "Brief explanation of classification"
  }
]

Analyze each ingredient thoroughly and return the complete JSON array."""

def create_dietary_batch_prompt(ingredients: List[Dict]) -> str:
    """Create a batch prompt for dietary analysis of multiple ingredients"""

    batch_count = len(ingredients)

    prompt = f"""{BASE_PROMPT}

INPUT BATCH DATA:
{json.dumps(ingredients, indent=2)}

The output array must have exactly {batch_count} objects, one for each input ingredient.

{OUTPUT_FORMAT}"""

    return prompt

# Test the prompt builder
test_ingredients = [
    {"id": "test1", "names": ["salt"]},
    {"id": "test2", "names": ["sugar", "sucrose"]}
]

test_prompt = create_dietary_batch_prompt(test_ingredients)
print(f"Prompt length: {len(test_prompt)} characters")
print("Sample prompt (first 500 chars):")
print(test_prompt[:500] + "...")

Prompt length: 2995 characters
Sample prompt (first 500 chars):
You are a food science expert specializing in ingredient dietary classification and allergen analysis. 

TASK: Analyze the following batch of food ingredients/additives and provide comprehensive dietary classification for each one.

ANALYSIS REQUIREMENTS:
For each ingredient, analyze and determine:

1. **Diet Compatibility:**
   - keto: Compatible with ketogenic diet (very low carb, high fat)
   - paleo: Compatible with paleolithic diet (no processed foods, grains, legumes, dairy)
   - whole30: ...


In [None]:
import random
from typing import Tuple

def parse_llm_response(response_text: str, expected_count: int) -> Tuple[Optional[List[IngredientDietaryAnalysis]], List[str]]:
    """Parse LLM response into IngredientDietaryAnalysis objects"""
    errors = []

    try:
        # Clean and extract JSON
        response_text = response_text.strip()
        if response_text.startswith('```json'):
            response_text = response_text[7:]
        if response_text.endswith('```'):
            response_text = response_text[:-3]
        response_text = response_text.strip()

        # Parse JSON
        json_data = json.loads(response_text)

        if not isinstance(json_data, list):
            return None, ["Response is not a JSON array"]

        if len(json_data) != expected_count:
            return None, [f"Expected {expected_count} results, got {len(json_data)}"]

        # Parse each result
        results = []
        for i, item in enumerate(json_data):
            try:
                result = IngredientDietaryAnalysis(**item)
                results.append(result)
            except Exception as e:
                errors.append(f"Error parsing item {i}: {str(e)}")

        if errors:
            return None, errors

        return results, []

    except json.JSONDecodeError as e:
        return None, [f"JSON parsing error: {str(e)}"]
    except Exception as e:
        return None, [f"Unexpected error: {str(e)}"]

class BatchProcessor:
    def __init__(self, services: Dict, max_concurrent: int = 5, batch_size: int = 50):
        self.services = services
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.batch_size = batch_size
        self.retry_queue = []
        self.completed_batches = []
        self.service_names = list(services.keys())

    def get_next_service(self) -> str:
        """Round-robin service selection"""
        return random.choice(self.service_names)

    async def process_single_batch(self, batch: List[Dict], batch_num: int, retry_count: int = 0) -> Dict:
        """Process a single batch with service round-robin"""
        async with self.semaphore:
            prompt = create_dietary_batch_prompt(batch)

            # Try up to 2 different services for this batch
            for attempt in range(2):
                service_name = self.get_next_service()
                service = self.services[service_name]

                try:
                    print(f"  Batch {batch_num} (retry {retry_count}, attempt {attempt+1}): Processing with {service_name}...")

                    response = await service.invoke_async(prompt)
                    results, errors = parse_llm_response(response, len(batch))

                    if results:
                        print(f"  ✅ Batch {batch_num} {service_name}: Successfully processed {len(results)} ingredients")
                        return {
                            'batch_number': batch_num,
                            'batch_size': len(batch),
                            'ingredient_ids': [ing['id'] for ing in batch],
                            'service_used': service_name,
                            'retry_count': retry_count,
                            'attempt': attempt + 1,
                            'success': True,
                            'results': [result.model_dump() for result in results],
                            'timestamp': datetime.now().isoformat()
                        }
                    else:
                        print(f"  ❌ Batch {batch_num} {service_name}: Parsing failed - {errors}")

                except Exception as e:
                    print(f"  ❌ Batch {batch_num} {service_name}: API failed - {str(e)}")

            # All attempts failed
            print(f"  💀 Batch {batch_num}: All attempts failed, adding to retry queue")
            return {
                'batch_number': batch_num,
                'batch_size': len(batch),
                'ingredient_ids': [ing['id'] for ing in batch],
                'service_used': None,
                'retry_count': retry_count,
                'success': False,
                'timestamp': datetime.now().isoformat()
            }

print("✅ Batch processor setup complete")

✅ Batch processor setup complete


In [None]:
async def process_all_ingredients(ingredients: List[Dict], batch_size: int = 50, max_concurrent: int = 5, max_retries: int = 2):
    """Process all ingredients in batches with retry logic"""

    processor = BatchProcessor(services, max_concurrent, batch_size)

    # Create initial batches
    total_batches = (len(ingredients) + batch_size - 1) // batch_size
    initial_batches = []

    for i in range(0, len(ingredients), batch_size):
        batch = ingredients[i:i + batch_size]
        batch_num = i // batch_size + 1
        initial_batches.append((batch, batch_num))
        print(f"Created batch {batch_num}/{total_batches} ({len(batch)} ingredients)")

    print(f"\nProcessing {total_batches} batches with max {max_concurrent} concurrent requests...")

    # Process initial batches
    tasks = []
    for batch, batch_num in initial_batches:
        task = processor.process_single_batch(batch, batch_num, 0)
        tasks.append(task)

    # Execute all batches concurrently
    batch_results = await asyncio.gather(*tasks, return_exceptions=True)

    # Collect results and failed batches
    successful_batches = []
    failed_batches = []

    for i, result in enumerate(batch_results):
        if isinstance(result, Exception):
            print(f"❌ Batch {i+1} crashed: {result}")
            batch, batch_num = initial_batches[i]
            failed_batches.append((batch, batch_num))
        elif result['success']:
            successful_batches.append(result)
            print(f"✅ Batch {result['batch_number']} completed successfully")
        else:
            batch, batch_num = initial_batches[i]
            failed_batches.append((batch, batch_num))

    # Retry failed batches
    retry_round = 1
    while failed_batches and retry_round <= max_retries:
        print(f"\n🔄 Retry round {retry_round}: {len(failed_batches)} failed batches")

        retry_tasks = []
        current_failed = failed_batches[:]
        failed_batches = []

        for batch, batch_num in current_failed:
            task = processor.process_single_batch(batch, batch_num, retry_round)
            retry_tasks.append(task)

        retry_results = await asyncio.gather(*retry_tasks, return_exceptions=True)

        for i, result in enumerate(retry_results):
            if isinstance(result, Exception):
                print(f"❌ Retry batch {current_failed[i][1]} crashed: {result}")
                failed_batches.append(current_failed[i])
            elif result['success']:
                successful_batches.append(result)
                print(f"✅ Retry batch {result['batch_number']} completed successfully")
            else:
                failed_batches.append(current_failed[i])

        retry_round += 1

    # Final summary
    print(f"\n📊 PROCESSING SUMMARY:")
    print(f"Total ingredients: {len(ingredients)}")
    print(f"Total batches: {total_batches}")
    print(f"Successful batches: {len(successful_batches)}")
    print(f"Failed batches: {len(failed_batches)}")

    if failed_batches:
        print(f"⚠️  {len(failed_batches)} batches failed after {max_retries} retries")

    return {
        'total_ingredients': len(ingredients),
        'total_batches': total_batches,
        'successful_batches': successful_batches,
        'failed_batches': [{'batch_number': batch_num, 'ingredient_count': len(batch)} for batch, batch_num in failed_batches],
        'processing_timestamp': datetime.now().isoformat()
    }

print("✅ Main execution function ready")

✅ Main execution function ready


In [None]:
# Process all ingredients
results = await process_all_ingredients(ingredients, batch_size=20, max_concurrent=5)

# Extract analyses from successful batches
all_analyses = []
for batch in results['successful_batches']:
    all_analyses.extend(batch['results'])

# Save results
output_file = "dietary_analysis_results.json"
with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(all_analyses, f, indent=2, ensure_ascii=False)

print(f"💾 Saved {len(all_analyses)} analyses to {output_file}")
if all_analyses:
    print(f"Sample: {all_analyses[0]['name']} - Vegan: {all_analyses[0]['vegan']}")

Created batch 1/283 (20 ingredients)
Created batch 2/283 (20 ingredients)
Created batch 3/283 (20 ingredients)
Created batch 4/283 (20 ingredients)
Created batch 5/283 (20 ingredients)
Created batch 6/283 (20 ingredients)
Created batch 7/283 (20 ingredients)
Created batch 8/283 (20 ingredients)
Created batch 9/283 (20 ingredients)
Created batch 10/283 (20 ingredients)
Created batch 11/283 (20 ingredients)
Created batch 12/283 (20 ingredients)
Created batch 13/283 (20 ingredients)
Created batch 14/283 (20 ingredients)
Created batch 15/283 (20 ingredients)
Created batch 16/283 (20 ingredients)
Created batch 17/283 (20 ingredients)
Created batch 18/283 (20 ingredients)
Created batch 19/283 (20 ingredients)
Created batch 20/283 (20 ingredients)
Created batch 21/283 (20 ingredients)
Created batch 22/283 (20 ingredients)
Created batch 23/283 (20 ingredients)
Created batch 24/283 (20 ingredients)
Created batch 25/283 (20 ingredients)
Created batch 26/283 (20 ingredients)
Created batch 27/283 