# Processor

I'll use this processor to obtain only the data I am interest in.

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import logging
from typing import Dict, List, Optional
import json
from dotenv import load_dotenv
import os


In [21]:
class FoundationFoodProcessor:
    """
    Processor to extract and prepare foundation foods from the USDA dataset.
    """
    
    def __init__(self, data_path: str):
        self.data_path = data_path
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        logging.basicConfig(level=logging.INFO)
        return logging.getLogger(__name__)
    
    def load_foundation_foods(self) -> pd.DataFrame:
        """
        Load the foundation foods from the USDA dataset.
        """
        self.logger.info(f"Loading foundation foods from {self.data_path}")
        
        #Load main files
        food_df = pd.read_csv(self.data_path + '/food.csv')
        foundation_df = pd.read_csv(self.data_path + '/foundation_food.csv')
        
        # Filter foundation foods from the main food file
        foundation_from_food = food_df[food_df['data_type'] == 'foundation_food'].copy()
        
        #Merge foundation foods with foundation_food.csv
        foundation_foods = foundation_from_food.merge(
            foundation_df, 
            on='fdc_id', 
            how='left'
        )
        
        self.logger.info(f"Found {len(foundation_foods)} foundation foods")
        return foundation_foods
    
    def load_nutritional_data(self, foundation_foods: pd.DataFrame) -> pd.DataFrame:
            """
            Load all nutritional data for the foundation foods
            """
            self.logger.info("Loading nutritional data...")
            
            # Obtener los IDs de foundation foods
            fdc_ids = foundation_foods['fdc_id'].tolist()
            
            # Load nutritional data - use chunking for large file
            nutrients_df = pd.read_csv(
                self.data_path + '/food_nutrient.csv',
                dtype={'amount': 'float64'},
                low_memory=False
            )
            
            # Filter only nutrients for foundation foods
            foundation_nutrients = nutrients_df[
                nutrients_df['fdc_id'].isin(fdc_ids)
            ].copy()
            
            # Load nutrient information
            nutrient_info = pd.read_csv(self.data_path + '/nutrient.csv')
            
            # Enrich with nutrient names
            foundation_nutrients = foundation_nutrients.merge(
                nutrient_info[['id', 'name', 'unit_name']], 
                left_on='nutrient_id', 
                right_on='id',
                how='left'
            )
            
            self.logger.info(f"Loaded {len(foundation_nutrients)} nutritional records")
            return foundation_nutrients
    
    def create_food_profile(self, foundation_foods: pd.DataFrame, 
                           nutritional_data: pd.DataFrame) -> List[Dict]:
        """
        Create a food profile for each foundation food
        """
        self.logger.info("Creating food profiles...")
        
        # Create a dictionary to store the food profiles
        profiles = []

        for _, food in foundation_foods.iterrows():
            fdc_id = food['fdc_id']
            
            # Obtain nutrients for this food
            food_nutrients = nutritional_data[
                nutritional_data['fdc_id'] == fdc_id
            ].copy()
            
            # Create a dictionary of nutrients
            nutrients_dict = {}
            nutrients_text = []
            
            for idx in food_nutrients.index:
                nutrient = food_nutrients.loc[idx]
                if pd.notna(nutrient['amount']) and nutrient['amount'] > 0:
                    nutrient_name = nutrient['name']
                    amount = round(float(nutrient['amount']), 2)
                    unit = nutrient['unit_name']
                    
                    nutrients_dict[nutrient_name] = {
                        'amount': amount,
                        'unit': unit,
                        'nutrient_id': nutrient['nutrient_id']
                    }
                    
                    nutrients_text.append(f"{nutrient_name}: {amount} {unit}")
            
            calories_amount = 0
            calories_unit = 'kcal'
            
            # Prioritize Atwater General Factors, then Atwater Specific, avoid kJ
            calorie_priorities = [
                'Energy (Atwater General Factors)',
                'Energy (Atwater Specific Factors)', 
                'Energy'
            ]
            
            for energy_type in calorie_priorities:
                if energy_type in nutrients_dict:
                    unit = nutrients_dict[energy_type]['unit'].upper()
                    if unit == 'KCAL':
                        calories_amount = nutrients_dict[energy_type]['amount']
                        calories_unit = 'kcal'
                        break
                    elif unit == 'KJ':
                        # Convertir kJ a kcal: 1 kcal = 4.18 kJ
                        calories_amount = round(nutrients_dict[energy_type]['amount'] / 4.18, 2)
                        calories_unit = 'kcal'
                        break
            
            # Create rich description for vectorization
            description = self._create_rich_description(food, nutrients_dict)
            
            profile = {
                'fdc_id': fdc_id,
                'description': food['description'],
                'calories': calories_amount,
                'calories_unit': calories_unit,
                'food_category_id': food.get('food_category_id'),
                'publication_date': food.get('publication_date'),
                'nutrients': nutrients_dict,
                'rich_description': description,
                'nutrients_summary': '; '.join(nutrients_text[:10])
            }
            
            profiles.append(profile)
        
        self.logger.info(f"Creados {len(profiles)} perfiles de alimentos")
        return profiles
    
    def _create_rich_description(self, food: pd.Series, nutrients: Dict) -> str:
        """
        Create a rich description for vectorization that includes nutritional properties
        """
        description = food['description']
        
        # Important nutrients for diabetic and balanced diets
        carb_nutrients = ['Carbohydrate, by difference', 'Sugars, total including NLEA', 'Fiber, total dietary']
        protein_nutrients = ['Protein']
        fat_nutrients = ['Total lipid (fat)', 'Fatty acids, total saturated', 'Fatty acids, total monounsaturated']
        vitamin_nutrients = [n for n in nutrients.keys() if 'Vitamin' in n]
        mineral_nutrients = [n for n in nutrients.keys() if any(m in n for m in ['Calcium', 'Iron', 'Potassium', 'Sodium', 'Magnesium'])]
        
        # Build rich description
        rich_parts = [f"Food: {description}"]
        
        # Macronutrient information
        if any(n in nutrients for n in carb_nutrients):
            carb_info = []
            for nutrient in carb_nutrients:
                if nutrient in nutrients:
                    carb_info.append(f"{nutrient.replace('Carbohydrate, by difference', 'Carbohidratos')}: {nutrients[nutrient]['amount']}{nutrients[nutrient]['unit']}")
            if carb_info:
                rich_parts.append(f"Carbohidratos: {', '.join(carb_info)}")
        
        if any(n in nutrients for n in protein_nutrients):
            protein_info = [f"{nutrients[n]['amount']}{nutrients[n]['unit']}" for n in protein_nutrients if n in nutrients]
            rich_parts.append(f"Proteínas: {', '.join(protein_info)}")
        
        if any(n in nutrients for n in fat_nutrients):
            fat_info = []
            for nutrient in fat_nutrients:
                if nutrient in nutrients:
                    fat_info.append(f"{nutrient.replace('Total lipid (fat)', 'Grasas totales')}: {nutrients[nutrient]['amount']}{nutrients[nutrient]['unit']}")
            if fat_info:
                rich_parts.append(f"Grasas: {', '.join(fat_info)}")
        
        # Energy - ✅ CORREGIDO: Solo mostrar KCAL
        calorie_info = None
        energy_priorities = [
            'Energy (Atwater General Factors)',
            'Energy (Atwater Specific Factors)',
            'Energy'
        ]
        
        for energy_type in energy_priorities:
            if energy_type in nutrients:
                unit = nutrients[energy_type]['unit'].upper()
                if unit == 'KCAL':
                    calorie_info = f"Energía: {nutrients[energy_type]['amount']} kcal"
                    break
                elif unit == 'KJ':
                    # Convertir kJ a kcal: 1 kcal = 4.18 kJ
                    kcal_amount = round(nutrients[energy_type]['amount'] / 4.18, 2)
                    calorie_info = f"Energía: {kcal_amount} kcal (convertido de {nutrients[energy_type]['amount']} kJ)"
                    break
    
        if calorie_info:
            rich_parts.append(calorie_info)
        
        # Vitamins
        if vitamin_nutrients:
            vitamin_info = [f"{n}: {nutrients[n]['amount']}{nutrients[n]['unit']}" for n in vitamin_nutrients[:3]]
            rich_parts.append(f"Vitaminas: {', '.join(vitamin_info)}")
        
        # Minerals
        if mineral_nutrients:
            mineral_info = [f"{n}: {nutrients[n]['amount']}{nutrients[n]['unit']}" for n in mineral_nutrients[:3]]
            rich_parts.append(f"Minerales: {', '.join(mineral_info)}")
        
        # Relevant information for diabetic diets
        diabetes_relevant = []
        if 'Carbohydrate, by difference' in nutrients:
            diabetes_relevant.append(f"Carbohidratos totales: {nutrients['Carbohydrate, by difference']['amount']}g")
        if 'Sugars, total including NLEA' in nutrients:
            diabetes_relevant.append(f"Azúcares: {nutrients['Sugars, total including NLEA']['amount']}g")
        if 'Fiber, total dietary' in nutrients:
            diabetes_relevant.append(f"Fibra: {nutrients['Fiber, total dietary']['amount']}g")
        
        if diabetes_relevant:
            rich_parts.append(f"Relevante para diabetes: {', '.join(diabetes_relevant)}")
        
        return ". ".join(rich_parts)
    
    def add_food_categories(self, profiles: List[Dict]) -> List[Dict]:
        """
        Add food category information
        """
        try:
            categories_df = pd.read_csv(self.data_path + '/food_category.csv')
            categories_dict = dict(zip(categories_df['id'], categories_df['description']))
            
            for profile in profiles:
                category_id = profile.get('food_category_id')
                if category_id and category_id in categories_dict:
                    profile['food_category'] = categories_dict[category_id]
                    # Add category to rich description
                    profile['rich_description'] = f"Categoría: {categories_dict[category_id]}. " + profile['rich_description']
            
        except FileNotFoundError:
            self.logger.warning("Archivo food_category.csv no encontrado")
        
        return profiles
    
    def save_processed_data(self, profiles: List[Dict], output_path: str = "foundation_foods_processed.json"):
        """
        Save processed data in JSON format
        """
        output_file = Path(self.data_path).parent / output_path
        
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(profiles, f, indent=2, ensure_ascii=False, default=str)
        
        self.logger.info(f"Datos guardados en {output_file}")
        
        # Also create CSV for easy analysis
        csv_data = []
        for profile in profiles:
            csv_row = {
                'fdc_id': profile['fdc_id'],
                'description': profile['description'],
                'calories': profile.get('calories', 0),
                'rich_description': profile['rich_description'],
                'nutrients_count': len(profile['nutrients']),
                'has_carbs': 'Carbohydrate, by difference' in profile['nutrients'],
                'has_protein': 'Protein' in profile['nutrients'],
                'has_energy': 'Energy' in profile['nutrients']
            }
            csv_data.append(csv_row)
        
        csv_df = pd.DataFrame(csv_data)
        csv_file = Path(self.data_path).parent / "foundation_foods_summary.csv"
        csv_df.to_csv(csv_file, index=False)
        self.logger.info(f"Resumen guardado en {csv_file}")
        
        return output_file
    
    def process_all(self) -> str:
        """
        Ejecuta todo el pipeline de procesamiento
        """
        self.logger.info("Iniciando procesamiento completo...")
        
        # Load foundation foods
        foundation_foods = self.load_foundation_foods()
        
        # Load nutritional data
        nutritional_data = self.load_nutritional_data(foundation_foods)
        
        # Create profiles
        profiles = self.create_food_profile(foundation_foods, nutritional_data)
        
        # Save data
        output_file = self.save_processed_data(profiles)
        
        self.logger.info("Procesamiento completado exitosamente!")
        return str(output_file)
    
    def get_summary_stats(self, profiles: List[Dict]) -> Dict:
        """
        Obtain summary statistics of the processed data
        """
        total_foods = len(profiles)
        total_nutrients = sum(len(p['nutrients']) for p in profiles)
        avg_nutrients_per_food = total_nutrients / total_foods if total_foods > 0 else 0
        
        # Nutrientes más comunes
        all_nutrients = []
        for p in profiles:
            all_nutrients.extend(p['nutrients'].keys())
        
        nutrient_counts = pd.Series(all_nutrients).value_counts()
        
        return {
            'total_foundation_foods': total_foods,
            'total_nutrient_records': total_nutrients,
            'avg_nutrients_per_food': round(avg_nutrients_per_food, 2),
            'most_common_nutrients': nutrient_counts.head(10).to_dict()
        }


In [22]:
# Execute the processing
if __name__ == "__main__":
    load_dotenv()
    data_path = os.getenv("USDA_DATA_PATH")
    
    processor = FoundationFoodProcessor(data_path or "")
    
    # Process all data
    output_file = processor.process_all()
    
    print(f"\nProcessing completed!")
    print(f"Output file: {output_file}")


INFO:__main__:Iniciando procesamiento completo...
INFO:__main__:Loading foundation foods from C:/Users/Usuario/Documents/FoodData_Central_csv_2025-04-24/FoodData_Central_csv_2025-04-24
INFO:__main__:Found 411 foundation foods
INFO:__main__:Loading nutritional data...
INFO:__main__:Loaded 19239 nutritional records
INFO:__main__:Creating food profiles...
INFO:__main__:Creados 411 perfiles de alimentos
INFO:__main__:Datos guardados en C:\Users\Usuario\Documents\FoodData_Central_csv_2025-04-24\foundation_foods_processed.json
INFO:__main__:Resumen guardado en C:\Users\Usuario\Documents\FoodData_Central_csv_2025-04-24\foundation_foods_summary.csv
INFO:__main__:Procesamiento completado exitosamente!



Processing completed!
Output file: C:\Users\Usuario\Documents\FoodData_Central_csv_2025-04-24\foundation_foods_processed.json
