In [None]:
# === Cell 1

import os, re, json, logging, hashlib, difflib
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import pandas as pd
import numpy as np
from openai import OpenAI, RateLimitError
import backoff
# change by stephen2
# === GPT Model Selection ===
# Default model used by all GPT calls (Stage 1 & 2).
# You can change this to "gpt-4o" or another snapshot if desired.
OPENAI_MODEL = "gpt-4o-mini"
os.environ["OPENAI_MODEL"] = OPENAI_MODEL
print(f"[config] GPT model set to: {OPENAI_MODEL}")

# Optional: warn if API key missing
if not os.getenv("OPENAI_API_KEY"):
    print("[warning] OPENAI_API_KEY not found — GPT calls will fail unless it’s set.")

# === Initialize OpenAI client ===
client = OpenAI()

# Set variables:
base_dir = Path(__file__).parent if "__file__" in globals() else Path.cwd()
# ---- SET input_filename TO THE NAME OF THE PROCUREMENT SPREADSHEET YOU ARE ASSESSING !!!!!!!!!!!!!!!!!!!!!!!!----
input_filename  = base_dir / "input.csv"
# Base name for outputs
base_filename = Path(input_filename).stem
output_dir = base_dir / "output"

# Set the input worksheet file type. See File Preparation section of documentation
# Choose: "auto" | "type01" | "type02" | "type03"
file_type = "auto"

# START_MODE SWITCH to run full Notebook or resume part way through the process.
# So, for example, you could review the ingredients and/or category output file, make edits and then
# restart the process using your edited file, if you set START_MODE to "after_ingredients" or "after_categories"
# Start at: "full" | "after_ingredients" | "after_categories"
START_MODE = "full"

# Optionally adjustable to eliminate ingredients that are a very minor percent of total ingredient weight
CATEGORY_PERCENT_MIN = 2.1

# factors.csv is the file with all the conversion factors from weight to impacts
# foodcategories.json is a file that contains prior ingredient:category combinations 
# that can be checked for use, avoiding need for calls to AI if the ingredient is already present 
factors_filename    = base_dir / "factors.csv"
categories_filename = base_dir / "foodcategories.json"

# Session epoch for overwrite policy (v100g)
import time
_V100_SESSION_START = globals().get("_V100_SESSION_START")
if _V100_SESSION_START is None:
    _V100_SESSION_START = time.time()
    globals()["_V100_SESSION_START"] = _V100_SESSION_START
    
# === Cell 1: Purpose: Set core filenames, modes, and global switches; verify required assets.
# Inputs:
#   - input_filename (user-specified CSV file), factors.csv (for ALLOWED_CATEGORIES), foodcategories.json.
#   - constants and dictionaries (UOM_TO_LBS, CATEGORY_WEIGHT_MULTIPLIERS, etc.)
# Outputs:
#   - Global variables: BASE, START_MODE, CATEGORY_SOURCE, USE_CATEGORY_WEIGHT_MULTIPLIERS, etc.
# Outputs: In-memory globals (e.g., ALLOWED_CATEGORIES, FOODCATS) used by later cells.
#   - Logging configuration and printout of active settings.
# Key processes:
# • Imports core libraries and configures logging (INFO by default).
# • Reads/sets: input_filename, file_type ('auto'|'type01'|'type02'|'type03'), and base_filename.
# • Sets high-level run control START_MODE ('full' | 'after_ingredients' | 'after_categories').
# • Loads foodcategories.json (exact name → category map) preserving insertion order.
# • Loads factors.csv and composes ALLOWED_CATEGORIES (the “closed set” used by GPT).
#
# Notes:
# • For OpenAI to be used later, the environment variable OPENAI_API_KEY must be set before running Cell 5/6.
# • Changing input_filename or START_MODE here controls what the final Runner (Cell 9) will do.

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

# some different files being used for input_filename in testing
# "hh100g.csv"
# "HH1.csv"
# "testfoods80.csv"
# "usfoods.csv"
# "in1b_fpoint02.csv"      # 213k
# "in2b_usfoods02.csv"     # 226k
# "in2b_usfoods26.csv"     # 15k
# "in3_jpaige01.csv"       # 7k
# "in4b_jpolep01.csv"      # 19k
# "in5b_ncsea01.csv"       # 15k
# "in6b_jamix01.csv"       # 23k
# "baldor100a.csv"         # 40k
# "HH112in.csv"
# baldor103f.csv

# Factor units hint (only used for output conversions in Stage 3)
FACTOR_UNITS = {"co2":"lbs_co2"}  # change to "kg_co2" if your co2 factors are per kg

def _load_foodcategories(json_path: Path) -> Dict[str, str]:
    if not Path(json_path).exists():
        logging.warning("No foodcategories.json found — using empty mapping.")
        return {}
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    mapping = {}
    if isinstance(data, dict):
        for k,v in data.items():
            mapping[str(k).strip().lower()] = str(v).strip()
    elif isinstance(data, list):
        for rule in data:
            if isinstance(rule, dict) and 'from' in rule and 'to' in rule:
                cat = str(rule['to']).strip()
                for src in rule['from']:
                    mapping[str(src).strip().lower()] = cat
    return mapping

FOODCATS = _load_foodcategories(categories_filename)

# Load category list from factors.csv
if not Path(factors_filename).exists():
    raise FileNotFoundError(f"factors file not found: {factors_filename}")
_factors_df = pd.read_csv(factors_filename, encoding="utf-8")
if "category" not in _factors_df.columns:
    raise ValueError("factors.csv must include a 'category' column")
FACTORS_CATEGORY_SET = set(str(x).strip() for x in _factors_df["category"].dropna().unique())

# Designed to help ensure that Food Categories selected for ingredients matches factors in factors.csv
ALLOWED_CATEGORIES = sorted(FACTORS_CATEGORY_SET)

# If worksheet does NOT show real weights in lbs for products with quantities expressed in fluid volumes such as fl oz 
# or pint, quart, gallon, ml, liter then set this to True in order for weights to be properly calculated. The multipliers
# here convert from the default, weight of water, to weight of products that may be lighter or heavier than water.
# --- Configuration Toggles and Data ---
USE_CATEGORY_WEIGHT_MULTIPLIERS = False

# These values represent the approximate specific gravity of the food, 
# which is the ratio of the food's density to the density of water (1.0).
CATEGORY_WEIGHT_MULTIPLIERS = {
    "skim milk": 1.034,  # Heavier due to solids, less fat
    "dairy milk": 1.030, # Slightly lighter than skim milk
    "buttermilk": 1.032, # Similar to skim milk
    "soy milk": 1.020,
    "oat milk": 1.015,
    "almond milk": 1.010,
    "rice milk": 1.008,
    "cream": 0.995,      # Lighter than water due to high fat content
    "olive oil": 0.915,  # All pure oils are less dense than water
    "palm oil": 0.890,
    "rapeseed/canola oil": 0.915,
    "soybean oil": 0.920,
    "sunflower oil": 0.918,
    "vegetable oils": 0.925  # General range for mixed oils
}

print(f"Updated multipliers to reflect Specific Gravity (Weight relative to Water = 1.0).")

In [None]:
# === Cell 2: Utilities ===
# Purpose: Provide shared helper functions used throughout the FoodImpacts workflow.
# Includes:
#   - safe_read_csv: fault-tolerant CSV loading with encoding detection
#   - parse_qty_unit: extract numeric quantity and unit of measure from product strings
#   - to_lbs: convert quantity-unit pairs to weight in pounds
#   - _norm: text normalization helper for category matching
# Inputs: function arguments (varies)
# Outputs: function return values; no direct file writes.
# Behavior:
#   Supports robust data ingestion, normalization, and measurement conversion.

UOM_TO_LBS = {
    # --- Weight/Mass Units (Exact Conversions) ---
    "LB": 1.0, "LBS": 1.0, "POUND": 1.0, "POUNDS": 1.0,
    "OZ": 1/16.0, "OUNCE": 1/16.0, "OUNCES": 1/16.0,
    "KG": 2.20462262, "KGS": 2.20462262, "KILOGRAM": 2.20462262,
    "G": 0.00220462, "GRAM": 0.00220462, "GRAMS": 0.00220462,

    # --- Imperial Volume Units (Water Density Conversion to LBS) ---
    # 1 US Gallon of water = 8.345 LBS
    "GAL": 8.345, "GALLON": 8.345, "GALLONS": 8.345,
    # 1 US Quart of water = 2.086 LBS (8.345 / 4)
    "QT": 2.086, "QUART": 2.086, "QUARTS": 2.086,
    # 1 US Fluid Ounce of water = 0.065 LBS (8.345 / 128)
    "FLUID OZ": 0.065, "FLOZ": 0.065,

    # --- Metric Volume Units (Water Density Conversion to LBS) ---
    # 1 Liter of water = 1 KG = 2.2046 LBS
    "L": 2.20462262, "LITER": 2.20462262, "LITERS": 2.20462262,
    # 1 Milliliter of water = 1 Gram = 0.00220462 LBS
    "ML": 0.00220462, "MILLILITER": 0.00220462, "MILLILITERS": 0.00220462
}

def to_lbs(qty: float, uom: str):
    if pd.isna(qty) or pd.isna(uom): return None
    key = str(uom).strip().upper()
    factor = UOM_TO_LBS.get(key)
    return float(qty) * factor if factor is not None and not np.isnan(factor) else None

def parse_qty_unit(text: str):
    if not isinstance(text, str): return (None, None)
    m = re.search(r"(?P<qty>\d+(?:[\.,]\d+)?)\s*(?P<unit>[A-Za-z]+)\b", text)
    if not m: return (None, None)
    try:
        qty = float(m.group('qty').replace(',', '.'))
    except Exception:
        qty = None
    unit = m.group('unit').upper()
    return qty, unit

def norm_text(s: str) -> str:
    s = str(s or "").lower()
    s = re.sub(r"[^a-z0-9\s/\-]", " ", s)
    s = s.replace("-", " ")
    s = " ".join(s.split())
    return s

def _normalize_colnames(cols):
    out = []
    for c in cols:
        c2 = str(c).replace("\u00A0"," ").strip().lower()
        c2 = re.sub(r"[^\w]+", "_", c2)
        c2 = re.sub(r"_+", "_", c2).strip("_")
        out.append(c2)
    return out

def _try_read_csv(path):
    for enc in ("utf-8-sig","utf-8","latin1"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            continue
    return pd.read_csv(path)

def safe_read_csv(path):
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(path)
    df = _try_read_csv(path)
    df.columns = _normalize_colnames(df.columns)
    return df

def save_csv(df, path: str):
    df.to_csv(path, index=False, encoding="utf-8")
    logging.info(f"Wrote: {path} ({len(df):,} rows)")

def load_if_exists(path: str):
    p = Path(path)
    if p.exists():
        logging.info(f"Loading existing file: {p}")
        return safe_read_csv(p)
    return None

def stable_hash(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]

def fuzzy_choice(text: str, choices: List[str], cutoff: float = 0.86):
    if not choices: return None
    best = None; best_score = 0.0
    for c in choices:
        r = difflib.SequenceMatcher(None, text, c).ratio()
        if r > best_score:
            best, best_score = c, r
    return best if best_score >= cutoff else None

# === Diagnostics: micro logger (safe to live in Utilities cell) ===
import time, os
from contextlib import contextmanager

DEBUG_TIMING = True  # flip to False to silence all timing output

# psutil is optional; we'll degrade gracefully if it's not available
try:
    import psutil
    _PROC = psutil.Process(os.getpid())
    def _mem_mb():
        try:
            return _PROC.memory_info().rss / (1024*1024)
        except Exception:
            return float('nan')
except Exception:
    def _mem_mb():
        return float('nan')

@contextmanager
def log_time(label: str):
    """Usage:
    with log_time("Stage 3: edible-yield"):
        # code here
    """
    if not DEBUG_TIMING:
        yield
        return
    t0 = time.perf_counter()
    m0 = _mem_mb()
    print(f"[timing] ▶ {label} ... (RSS {m0:,.1f} MB)")
    try:
        yield
    finally:
        dt = time.perf_counter() - t0
        m1 = _mem_mb()
        print(f"[timing] ✔ {label} in {dt:,.3f}s (ΔRSS {m1 - m0:+.1f} MB)")

# === GPT bridge used by Stage 1 & Stage 2 ===

def _call_gpt(messages, model=None, temperature=0):
    """
    messages: [{"role":"system"/"user"/"assistant","content": "..."}]
    Returns plain string content.
    Tries to reuse an existing helper if your notebook already defines one.
    """
    # 1) Compat aliases to reuse your existing helper if present
    for alias in ("call_gpt", "gpt_chat", "openai_chat", "chat_gpt", "gpt_call"):
        if alias in globals() and callable(globals()[alias]):
            return globals()[alias](messages, model=model, temperature=temperature)

    # 2) Minimal OpenAI Chat fallback
    import os
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set and no existing GPT helper found.")

    try:
        # Newer OpenAI client
        from openai import OpenAI
        client = OpenAI(api_key=api_key)
        use_model = model or os.getenv("OPENAI_MODEL", "gpt-4o-mini")
        resp = client.chat.completions.create(
            model=use_model,
            messages=messages,
            temperature=temperature
        )
        return resp.choices[0].message.content
    except Exception:
        # Older openai package interface
        import openai
        openai.api_key = api_key
        use_model = model or os.getenv("OPENAI_MODEL", "gpt-4o-mini")
        resp = openai.ChatCompletion.create(
            model=use_model,
            messages=messages,
            temperature=temperature
        )
        return resp["choices"][0]["message"]["content"]



In [None]:
# === Cell 3: Load & Standardize (Enhanced Version) ===
# Purpose: Detect vendor format (type01 / type02 / type03) and normalize data to a common schema.
# Inputs:
#   - input_filename: raw procurement CSV
#   - file_type: explicit ('type01'/'type02'/'type03') or 'auto' for autodetection
# Dependencies:
#   - Utilities from Cell 2 (safe_read_csv, parse_qty_unit, to_lbs)
# Outputs:
#   - standardized DataFrame with columns:
#       ['product', 'description', 'weight_lbs', 'qty' (if available)] !!!!!
# Behavior:
#   - Reads raw file using safe_read_csv()
#   - If file_type='auto', uses comprehensive detection strategies
#   - Dispatches to handle_type01/02/03 to standardize column names and compute weights
#   - Returns a DataFrame ready for ingredient extraction (Stage 1).

import pandas as pd
import numpy as np
import logging
import re
from typing import Optional

def detect_vendor_fingerprint(df: pd.DataFrame) -> Optional[str]:
    """
    Detect specific vendor formats based on unique characteristics
    """
    cols_str = ' '.join(str(c).lower() for c in df.columns)
    
    # Get sample of first column data for content analysis
    first_col_sample = df.iloc[:min(50, len(df)), 0].astype(str).str.lower() if len(df) > 0 else pd.Series()
    first_col_text = ' '.join(first_col_sample.tolist())
    
    vendor_signatures = {
        "type01": [
            lambda df: 'quantity (base unit)' in cols_str,
            lambda df: 'qty (base unit)' in cols_str,
            lambda df: bool(re.search(r'quantity.*base.*unit', cols_str))
        ],
        
        "type02": [
            lambda df: 'net wght shipped' in cols_str,
            lambda df: 'total weight' in cols_str and 'item #' in cols_str,
            lambda df: 'qty shipped' in cols_str and 'total weight' in cols_str,
            lambda df: 'item description' in cols_str and not 'qty sold' in cols_str
        ],
        
        "type03": [
            lambda df: 'qty sold cw' in cols_str,
            lambda df: 'stock unit' in cols_str,
            lambda df: 'item description' in cols_str and 'stock unit' in cols_str,
        ]
    }
    
    for file_type, checks in vendor_signatures.items():
        matches = sum(1 for check in checks if check(df))
        if matches >= 2:  # Need at least 2 matching signatures
            logging.info(f"Vendor fingerprint detected: {file_type} ({matches} signatures matched)")
            return file_type
    
    return None

def statistical_type_detection(df: pd.DataFrame) -> str:
    """
    Use statistical analysis to detect type when column names are ambiguous
    """
    if len(df) == 0:
        logging.warning("Empty dataframe, defaulting to type01")
        return "type01"
    
    # Analyze first non-header column (usually product/description)
    first_col = df.iloc[:, 0].dropna().astype(str)
    if len(first_col) == 0:
        logging.warning("No data in first column, defaulting to type01")
        return "type01"
    
    # Count different pattern occurrences
    scores = {"type01": 0, "type02": 0, "type03": 0}
    
    # Check for type01 patterns (separate qty/unit values like "80.000 OZ")
    qty_unit_separate = first_col.str.match(r'^\d+\.?\d*\s+[A-Z]{2,}$', case=False)
    scores["type01"] += qty_unit_separate.sum() * 3
    
    # Also check in second column if exists
    if len(df.columns) > 1:
        second_col = df.iloc[:, 1].dropna().astype(str)
        qty_unit_in_second = second_col.str.match(r'^\d+\.?\d*\s+[A-Z]{2,}$', case=False)
        scores["type01"] += qty_unit_in_second.sum() * 3
    
    # Check for type02 patterns (clean product names with separate weight column)
    clean_products = first_col.str.match(r'^[A-Za-z][A-Za-z\s\-\'\"&,\.]+$')
    has_weight_col = any('weight' in str(c).lower() or 'wght' in str(c).lower() for c in df.columns)
    has_numeric_col = False
    
    if len(df.columns) > 1:
        for col in df.columns[1:]:
            sample = df[col].dropna().head(10)
            numeric_ratio = pd.to_numeric(sample, errors='coerce').notna().sum() / max(len(sample), 1)
            if numeric_ratio > 0.7:
                has_numeric_col = True
                break
    
    if has_weight_col and has_numeric_col:
        scores["type02"] += clean_products.sum() * 2
        scores["type02"] += 5  # Bonus for having weight column
    
    # Check for type03 patterns (embedded weights in product description)
    # Pattern examples: "CHEDDAR CLOTHBOUND AGED CABOT 30 LB MC", "BLUE BAYLEY HAZEN BLUE 7 LB JASPER HILL"
    embedded_patterns = [
        r'\d+\s*LB\s+',  # "30 LB MC"
        r'\d+X\d+\s*(?:OZ|LB)',  # "12X12 OZ"
        r'\d+/\d+\s*(?:LB|OZ|GAL)',  # "6/2.5 LB"
        r'\d+\s*(?:LB|OZ|GAL|QT|PT)\s+[A-Z]',  # "5 LB CABOT"
        r'[A-Z]+\s+\d+\s*(?:LB|OZ)\s+[A-Z]'  # "TUB 5 LB CABOT"
    ]
    
    for pattern in embedded_patterns:
        matches = first_col.str.contains(pattern, case=False, regex=True)
        scores["type03"] += matches.sum() * 2
    
    # Look for case/carton patterns common in type03
    case_patterns = first_col.str.contains(r'\d+/\d+|\d+X\d+|\d+\s*(?:CS|CTN|CASE|EACH|EA)', case=False)
    scores["type03"] += case_patterns.sum()
    
    # Additional type03 indicators: mixed units in product names
    unit_variety = first_col.str.contains(r'\b(?:LB|OZ|GAL|EACH|EA|CTN|CS)\b', case=False)
    if unit_variety.sum() > len(first_col) * 0.3:
        scores["type03"] += 3
    
    best_type = max(scores, key=scores.get)
    max_score = scores[best_type]
    confidence = max_score / max(len(first_col), 1)
    
    # Log all scores for debugging
    logging.debug(f"Type detection scores: {scores}")
    
    if confidence < 0.2 and max_score < 3:
        logging.warning(f"Low confidence detection: {best_type} with score {max_score} (confidence: {confidence:.2f})")
    else:
        logging.info(f"Statistical detection: {best_type} with score {max_score} (confidence: {confidence:.2f})")
    
    return best_type

def validate_type_detection(df: pd.DataFrame, file_type: str) -> bool:
    """
    Try parsing a sample with the detected type to validate
    """
    try:
        # Use a small sample to test
        sample_df = df.head(min(10, len(df))).copy()
        
        if file_type == "type01":
            result = handle_type01(sample_df)
        elif file_type == "type02":
            result = handle_type02(sample_df)
        elif file_type == "type03":
            result = handle_type03(sample_df)
        else:
            return False
        
        # Check if we got reasonable results
        if 'weight_lbs' not in result.columns:
            return False
            
        valid_weights = result['weight_lbs'].notna().sum()
        valid_ratio = valid_weights / max(len(result), 1)
        
        # Type03 might have lower valid ratio due to complex parsing
        threshold = 0.2 if file_type == "type03" else 0.3
        
        return valid_ratio >= threshold
        
    except Exception as e:
        logging.debug(f"Validation failed for {file_type}: {str(e)}")
        return False

def autodetect_type_improved(df: pd.DataFrame) -> str:
    """
    Enhanced autodetection with multiple pattern matching strategies
    """
    if len(df.columns) == 0:
        logging.error("Auto-detect: Input file has no columns")
        raise ValueError("Auto-detect failed: CSV has no columns")
    
    # Normalize column names for comparison
    cols_lower = [str(c).lower().strip() for c in df.columns]
    cols_map = {str(c).lower().strip(): c for c in df.columns}
    
    # === Type01 Detection ===
    type01_col_patterns = [
        r'quantity.*base.*unit',
        r'qty.*base',
        r'product.*weight'
    ]
    
    for pattern in type01_col_patterns:
        if any(re.search(pattern, col) for col in cols_lower):
            # Additional validation: check if values contain quantity+unit
            target_col = None
            for col in cols_lower:
                if re.search(pattern, col):
                    target_col = cols_map[col]
                    break
            
            if target_col and target_col in df.columns:
                sample = df[target_col].dropna().head(20).astype(str)
                if len(sample) > 0:
                    # Check for patterns like "80.000 OZ", "4.400 LB", "84.000 lb"
                    qty_unit_pattern = r'^\d+\.?\d*\s+[A-Za-z]{1,10}$'
                    matches = sample.str.match(qty_unit_pattern, case=False)
                    if matches.sum() > len(sample) * 0.3:
                        logging.info("Auto-detect: Type01 - found quantity+unit combined format in column data")
                        return "type01"
    
    # === Type02 Detection ===
    has_weight_col = any('weight' in col or 'wght' in col for col in cols_lower)
    has_qty_col = any('qty' in col or 'quantity' in col for col in cols_lower)
    has_unit_col = any('unit' in col or 'uom' in col for col in cols_lower)
    
    # Type02 typically has weight column with numeric values and separate qty
    if has_weight_col:
        weight_cols = [cols_map[c] for c in cols_lower if 'weight' in c or 'wght' in c]
        if weight_cols and weight_cols[0] in df.columns:
            sample_weights = df[weight_cols[0]].dropna().head(20)
            if len(sample_weights) > 0:
                numeric_ratio = pd.to_numeric(sample_weights, errors='coerce').notna().sum() / len(sample_weights)
                if numeric_ratio > 0.7:
                    # Additional check: if we have qty column or no unit column, likely type02
                    if has_qty_col or not has_unit_col:
                        logging.info("Auto-detect: Type02 - separate numeric weight column detected")
                        return "type02"
    
    # === Type03 Detection ===
    # Look for product/description columns
    product_cols = []
    for priority_term in ['product', 'description', 'item']:
        for col in cols_lower:
            if priority_term in col and cols_map[col] not in product_cols:
                product_cols.append(cols_map[col])
                break
    
    # If no product column found, use first column
    if not product_cols and len(df.columns) > 0:
        product_cols = [df.columns[0]]
    
    if product_cols and product_cols[0] in df.columns:
        sample_products = df[product_cols[0]].dropna().head(30).astype(str)
        
        if len(sample_products) > 0:
            # Look for embedded weight patterns characteristic of type03
            embedded_patterns = [
                r'\d+\s*LB\b',  # "30 LB"
                r'\d+X\d+\s*(?:OZ|LB)',  # "12X12 OZ"
                r'\d+/\d+\s*(?:LB|OZ|GAL)',  # "6/2.5 LB"
                r'\b\d+\s*(?:LB|OZ|GAL|QT|PT|EA|EACH|CTN|CS)\b'  # Various units embedded
            ]
            
            pattern_matches = 0
            for pattern in embedded_patterns:
                pattern_matches += sample_products.str.contains(pattern, case=False, regex=True).sum()
            
            match_ratio = pattern_matches / len(sample_products)
            if match_ratio > 0.4:
                logging.info(f"Auto-detect: Type03 - weight embedded in product description (match ratio: {match_ratio:.2f})")
                return "type03"
    
    # If no clear match from column analysis, use statistical detection
    return statistical_type_detection(df)

def autodetect_type(df: pd.DataFrame) -> str:
    """
    Comprehensive type detection with multiple strategies and validation
    """
    import logging
    
    # Strategy 1: Check for explicit type markers in first column
    if len(df) > 0 and len(df.columns) > 0:
        first_col = df.iloc[:, 0].astype(str).str.lower()
        for type_name in ["type01", "type02", "type03"]:
            if any(type_name in str(val) for val in first_col if pd.notna(val)):
                logging.info(f"Explicit type marker found: {type_name}")
                return type_name
    
    # Strategy 2: Vendor fingerprinting
    vendor_type = detect_vendor_fingerprint(df)
    if vendor_type:
        # Validate the vendor detection
        if validate_type_detection(df, vendor_type):
            return vendor_type
        else:
            logging.warning(f"Vendor fingerprint {vendor_type} failed validation, continuing detection")
    
    # Strategy 3: Enhanced pattern matching
    detected_type = autodetect_type_improved(df)
    
    # Strategy 4: Validation through sample parsing
    if validate_type_detection(df, detected_type):
        logging.info(f"Validated type detection: {detected_type}")
        return detected_type
    
    # If validation fails, try alternative types
    logging.warning(f"Primary detection {detected_type} failed validation, trying alternatives")
    
    for alt_type in ["type01", "type02", "type03"]:
        if alt_type != detected_type and validate_type_detection(df, alt_type):
            logging.info(f"Alternative type {alt_type} passed validation")
            return alt_type
    
    # Default to statistical detection if all else fails
    final_type = statistical_type_detection(df)
    logging.info(f"Using statistical detection fallback: {final_type}")
    return final_type

def handle_type01(df: pd.DataFrame) -> pd.DataFrame:
    """Handle type01 format: quantity and units in combined column"""
    cols = {c.lower(): c for c in df.columns}
    
    # Ensure we have a product column
    if "product" not in df.columns:
        df["product"] = df.iloc[:, 0].astype(str)
    
    # Look for the quantity/weight column
    if "product_weight" in cols:
        pw = cols["product_weight"]
    elif any("quantity" in c.lower() and "base" in c.lower() for c in df.columns):
        # Find the quantity base unit column
        pw = next(c for c in df.columns if "quantity" in c.lower() and "base" in c.lower())
    else:
        # Fallback: look for a column with quantity+unit patterns
        pw = None
        for col in df.columns[1:]:  # Skip first column (likely product name)
            sample = df[col].dropna().head(10).astype(str)
            if sample.str.match(r'^\d+\.?\d*\s+[A-Za-z]+$', case=False).sum() > len(sample) * 0.3:
                pw = col
                break
    
    if pw and pw in df.columns:
        # Split the quantity and unit
        parts = df[pw].astype(str).str.strip().str.split(r'\s+', n=1, expand=True)
        if parts.shape[1] == 2:
            df["qty"] = pd.to_numeric(parts[0].str.replace(",", "", regex=False), errors="coerce")
            df["uom"] = parts[1]
            df["weight_lbs"] = [to_lbs(q, u) if pd.notna(q) and pd.notna(u) else np.nan 
                               for q, u in zip(df["qty"], df["uom"])]
        else:
            df["qty"] = pd.to_numeric(parts[0], errors="coerce")
            df["weight_lbs"] = np.nan
    else:
        # Extract from product column as fallback
        q, u = zip(*df["product"].map(parse_qty_unit))
        df["qty"], df["uom"] = q, u
        df["weight_lbs"] = [to_lbs(q, u) if q and u else np.nan 
                           for q, u in zip(df["qty"], df["uom"])]
    
    df["description"] = df["product"]
    base = ["product", "description", "weight_lbs"]
    if "qty" in df.columns:
        base.append("qty")
    return df[base]

def handle_type02(df: pd.DataFrame) -> pd.DataFrame:
    """Handle type02 format: separate weight column in pounds"""
    cols = {c.lower(): c for c in df.columns}
    
    # Find product column
    prod_col = cols.get("product") or cols.get("item description") or cols.get("description") or df.columns[0]
    df["product"] = df[prod_col].astype(str)
    
    # Find weight column
    prod_wt_col = (cols.get("product_weight") or cols.get("total weight") or 
                   cols.get("weight") or cols.get("total_weight") or 
                   cols.get("net wght shipped"))
    
    # Find unit and quantity columns
    unit_col = (cols.get("unit") or cols.get("uom") or 
                cols.get("unit_of_measure") or cols.get("unit_of_measurement"))
    qty_col = (cols.get("qty") or cols.get("quantity") or 
               cols.get("qty shipped") or cols.get("qty_shipped"))
    
    if qty_col and qty_col in df.columns:
        df["qty"] = pd.to_numeric(df[qty_col], errors="coerce")
    
    # Calculate weight in pounds
    weight_lbs = None
    if prod_wt_col and prod_wt_col in df.columns:
        wt_numeric = pd.to_numeric(df[prod_wt_col], errors="coerce")
        
        if unit_col and unit_col in df.columns and df[unit_col].notna().any():
            # If we have units, convert accordingly
            unit_series = df[unit_col].astype(str).str.upper()
            factors = unit_series.map(lambda u: UOM_TO_LBS.get(u, np.nan))
            weight_lbs = wt_numeric * factors
            
            # If already in pounds, use as-is
            mask_lbs = unit_series.isin(["LB", "LBS", "POUND", "POUNDS"])
            weight_lbs = weight_lbs.where(~mask_lbs, wt_numeric)
        else:
            # Assume weights are in pounds (common for type02)
            weight_lbs = wt_numeric
    elif qty_col and unit_col and qty_col in df.columns and unit_col in df.columns:
        # Calculate from qty and unit
        qty = pd.to_numeric(df[qty_col], errors="coerce")
        uom = df[unit_col].astype(str)
        weight_lbs = pd.Series([to_lbs(q, u) if (pd.notna(q) and pd.notna(u)) else np.nan 
                               for q, u in zip(qty, uom)], index=df.index)
    else:
        # Try to extract from product description as last resort
        q, u = zip(*df["product"].map(parse_qty_unit))
        weight_lbs = pd.Series([to_lbs(qi, ui) if qi and ui else np.nan 
                               for qi, ui in zip(q, u)], index=df.index)
    
    df["weight_lbs"] = weight_lbs
    df["description"] = df["product"]
    
    base = ["product", "description", "weight_lbs"]
    if "qty" in df.columns:
        base.append("qty")
    return df[base]

def handle_type03(df: pd.DataFrame) -> pd.DataFrame:
    """Handle type03 format: weight info embedded in product description"""
    cols = {c.lower(): c for c in df.columns}
    
    # Find product column
    prod_col = (cols.get("product") or cols.get("item description") or 
                cols.get("description") or df.columns[0])
    df["product"] = df[prod_col].astype(str)
    
    # Look for a quantity sold column (common in Baldor format)
    qty_col = cols.get("qty sold cw") or cols.get("qty") or cols.get("quantity")
    if qty_col and qty_col in df.columns:
        df["qty_sold"] = pd.to_numeric(df[qty_col], errors="coerce")
    
    # Extract weight from product description
    q, u = zip(*df["product"].map(parse_qty_unit))
    df["qty"] = pd.to_numeric(pd.Series(q, index=df.index), errors="coerce")
    
    # Calculate weight in pounds
    maybe = [to_lbs(qi, ui) if qi and ui else np.nan for qi, ui in zip(q, u)]
    df["weight_lbs"] = pd.Series(maybe, index=df.index)
    
    # If we have qty_sold and a unit column, we might be able to calculate total weight
    if "qty_sold" in df.columns:
        unit_col = cols.get("stock unit") or cols.get("unit")
        if unit_col and unit_col in df.columns:
            # For items sold by weight units (LB, OZ, etc.), multiply qty_sold by unit weight
            unit_series = df[unit_col].astype(str).str.upper()
            
            # Create a mask for weight-based units
            weight_units = ["LB", "LBS", "OZ", "KG", "G", "POUND", "POUNDS", "OUNCE", "OUNCES"]
            is_weight_unit = unit_series.isin(weight_units)
            
            # For weight units, calculate total weight
            for idx in df[is_weight_unit].index:
                unit = unit_series[idx]
                qty_sold = df.loc[idx, "qty_sold"]
                if pd.notna(qty_sold):
                    total_weight = to_lbs(qty_sold, unit)
                    if pd.notna(total_weight):
                        df.loc[idx, "weight_lbs"] = total_weight
        
        # Use qty_sold as qty if we don't have qty from parsing
        if df["qty"].isna().all() and df["qty_sold"].notna().any():
            df["qty"] = df["qty_sold"]
    
    df["description"] = df["product"]
    base = ["product", "description", "weight_lbs", "qty"]
    return df[base]

def load_and_standardize(path: str, file_type: str = "auto") -> pd.DataFrame:
    """
    Main function to load and standardize procurement data
    
    Args:
        path: Path to the CSV file
        file_type: One of 'auto', 'type01', 'type02', 'type03'
    
    Returns:
        Standardized DataFrame with columns: product, description, weight_lbs, qty (optional)
    """
    # Load the raw data
    raw = safe_read_csv(path)
    
    # Determine the file type
    ft = file_type
    if ft == "auto":
        ft = autodetect_type(raw)
        logging.info(f"Auto-detected file_type: {ft}")
    elif ft not in ["type01", "type02", "type03"]:
        raise ValueError("file_type must be one of {'auto','type01','type02','type03'}")
    
    # Process according to detected/specified type
    if ft == "type01":
        std = handle_type01(raw)
    elif ft == "type02":
        std = handle_type02(raw)
    elif ft == "type03":
        std = handle_type03(raw)
    else:
        raise ValueError(f"Unexpected file_type: {ft}")
    
    # Log summary statistics
    total_rows = len(std)
    valid_weights = std["weight_lbs"].notna().sum()
    logging.info(f"Standardization complete: {valid_weights}/{total_rows} rows with valid weights ({valid_weights/total_rows*100:.1f}%)")
    
    if valid_weights / total_rows < 0.3:
        logging.warning("Low weight extraction rate. Consider checking the file format or type detection.")
    
    return std

In [None]:
# === Cell 4 — AI Ingredient Extraction ===
# Purpose: Define GPT-based function(s) that infer ingredient lists and estimated proportions for each product.
# Includes: Guard lists (protected items, vegan/meatless handling, fish terms) and exact category helpers.
# Inputs:
#   - product text string(s)
#   - optional explicit_ingredients (for compatibility with non-AI modes)
# Dependencies:
#   - Utilities from Cell 2 (_norm, safe_json, etc.)
#   - Config flags from Cell 1 (PROTECTED_ITEMS, VEGAN_GUARD)
# Outputs:
#   - List[Dict[str, float]] per product, each containing {"ingredient": <name>, "percent": <float>}
# Non-decomposition rules & exact category resolvers used by Stages 1 & 2.
# Behavior:
#   - Calls GPT via ai_extract_ingredients() with structured prompts.
#   - Applies post-processing:
#       • normalize capitalization and spacing
#       • protect compound food names (e.g., “ice cream”, “almond milk”)
#       • enforce vegan/meatless exclusion filters.

import re
from typing import List, Dict, Optional
from pathlib import Path
import pandas as pd

def _norm(x: str) -> str:
    """Use the notebook's normalizer so everything stays consistent."""
    return norm_text(x)

def _canon_key(s: str) -> str:
    """
    Canonical key for STRICT equality:
      - lowercase
      - replace any non-alphanumeric with a single space ((),/,&,commas, hyphens, etc.)
      - collapse whitespace, strip
    NOTE: This is still equality-based (no substring matching).
    """
    s = str(s or "").lower()
    s = re.sub(r"[^0-9a-z]+", " ", s)     # punctuation/paren → space
    s = re.sub(r"\s+", " ", s).strip()
    return s

# ------------------ Load factors categories (source of truth) ------------------
try:
    _fac_df = safe_read_csv(factors_filename)  # prefer your safer loader
except Exception:
    _fac_df = pd.read_csv(factors_filename)

if "category" in _fac_df.columns:
    _fac_series = _fac_df["category"].astype(str)
else:
    _fac_series = _fac_df.iloc[:, 0].astype(str)

ALLOWED_CATEGORIES: List[str] = [c.strip() for c in _fac_series.tolist() if str(c).strip()]

# Canonical maps keyed by strict canonical equality (no substrings)
CANON_CAT_BY_CANON: Dict[str, str] = {}
EXACT_ALLOWED_CATEGORIES: set[str] = set()
for cat in ALLOWED_CATEGORIES:
    k = _canon_key(cat)
    if not k or k == "unknown":
        continue
    EXACT_ALLOWED_CATEGORIES.add(k)
    # Preserve original label as the canonical output
    CANON_CAT_BY_CANON.setdefault(k, cat)

# ------------------ Category map (from your foodcategories.json) ------------------
def load_category_map(json_path: Path) -> Dict[str, str]:
    """Return normalized-name -> category map loaded earlier (FOODCATS)."""
    return FOODCATS

CATEGORY_MAP = load_category_map(categories_filename)

# ------------------ Protections used by later cells ------------------
DO_NOT_DECOMPOSE = set(map(_norm, [
    # Core dairy intact items:
    "milk","dairy milk","skim milk","concentrated milk","buttermilk",
    "cheese","yogurt","low fat yogurt","cream","sour cream","butter","ghee",
    "ice cream","whey powder","milk powder",
    # Plant milks intact as single items:
    "almond milk","oat milk","soy milk","rice milk","coconut milk","pea milk",
]))

# Minimal fish list so downstream checks won’t fail; include milkfish explicitly.
FISH_SPECIES = set(map(_norm, [
    "salmon","trout","tilapia","catfish","bass","cod","pollock","haddock","mahi",
    "tuna","sardine","anchovy","halibut","carp","redfish","snapper","milkfish"
]))

ANIMAL_INGREDIENTS = set(map(_norm, [
    "milk","dairy milk","cheese","butter","cream","yogurt","whey","casein","ghee","buttermilk","lactose",
    "egg","egg whites","egg yolk","honey","gelatin", "dairy cheese",
    "beef","pork","chicken","turkey","lamb","bacon","sausage",
    "fish","salmon","tuna","shrimp","tilapia","trout","catfish","bass","redfish","milkfish"
]))

# ------------------ Prioritized list = EXACT categories only (for prompts, if used) ------------------
PRIORITIZED_INGREDIENTS: List[str] = sorted({CANON_CAT_BY_CANON[k] for k in EXACT_ALLOWED_CATEGORIES})

# ------------------ Exact-only resolver (canonical equality, no substrings) ------------------
def resolve_exact_category(product_text: str) -> Optional[str]:
    """
    STRICT equality after canonicalization:
      1) If canon(product) equals canon(category) from factors.csv → return that category (canonical label).
      2) Else, if product matches a key in CATEGORY_MAP and that mapped value is a valid factors category,
         accept the mapped category (also via canonical equality).
      3) Else, return None.

    Notes:
      - No token splitting, no partial/substring/phrase matching.
      - 'milkfish' will NOT equal 'milk' in canonical form.
      - 'barley (beer)' equals 'barley beer' canonically on both sides, so it matches cleanly.
      - Multi-word categories like 'low fat yogurt' remain distinct from 'yogurt'.
    """
    name_can = _canon_key(product_text)
    if not name_can:
        return None

    # (1) Direct exact to factors category
    if name_can in CANON_CAT_BY_CANON:
        return CANON_CAT_BY_CANON[name_can]

    # (2) Exact to a key in CATEGORY_MAP, but only accept if mapped value is in factors
    #     (We canonicalize the mapped value and check against CANON_CAT_BY_CANON.)
    k_norm = _norm(product_text)
    if k_norm in CATEGORY_MAP:
        mapped = CATEGORY_MAP[k_norm]
        mapped_can = _canon_key(mapped)
        if mapped_can in CANON_CAT_BY_CANON:
            return CANON_CAT_BY_CANON[mapped_can]

    return None

logging.info(
    "[Cell 4] Exact-only canonical mode: %d categories; PRIORITIZED_INGREDIENTS=%d; DO_NOT_DECOMPOSE=%d",
    len(EXACT_ALLOWED_CATEGORIES), len(PRIORITIZED_INGREDIENTS), len(DO_NOT_DECOMPOSE)
)

# --- Adapter for legacy calls expecting `categorize_name_from_map` ---
def categorize_name_from_map(name: str) -> Optional[str]:
# Use the exact-only canonical resolver
    return resolve_exact_category(name)


In [None]:
# === Cell 5 — Stage 1 (type03) Runner ===
# Purpose: Execute AI ingredient extraction for type03 vendor files.
# Inputs:
#   - standardized DataFrame from Cell 3 (type03 schema)
#   - external item_weights.json (optional overrides)
#   - ai_extract_ingredients() from Cell 4
# Outputs:
#   - ingredients_{BASE}.csv (one row per product–ingredient pair, with estimated weights)
# Behavior:
#   - Merges item_weights overrides if present.
#   - Applies GPT extraction per product description.
#   - Normalizes percentages to sum ≈ 100 %.
#   - Logs progress and total ingredient count.
# Overwrites existing ingredients_{BASE}.csv if START_MODE='full'; skips otherwise.

import os, json, re, concurrent.futures as cf
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd

# === Guard flag ===
_run_cell5 = False
try:
    _run_cell5 = str(file_type).lower() == "type03"
except NameError:
    _run_cell5 = False

# --- Logger ---
try:
    log_time
except NameError:
    from contextlib import contextmanager
    import time
    @contextmanager
    def log_time(label: str):
        t0 = time.perf_counter()
        print(f"[timing] ▶ {label} ...")
        try:
            yield
        finally:
            dt = time.perf_counter() - t0
            print(f"[timing] ✓ {label} in {dt:,.3f}s")

DEBUG_WEIGHT_CALC = True

def debug_print(msg: str, row_sheet: Optional[int] = None):
    if DEBUG_WEIGHT_CALC:
        if row_sheet is not None:
            print(f"[Row {row_sheet}] {msg}")
        else:
            print(msg)

# ============================================================================
# WEIGHT EXTRACTION WITH ALL FIXES (v110b)
# ============================================================================

ITEM_WEIGHTS = {}

def load_item_weights():
    global ITEM_WEIGHTS
    weights_file = Path("item_weights.json")
    if not weights_file.exists():
        print(f"[Warning] {weights_file} not found. Using minimal defaults.")
        ITEM_WEIGHTS = {
            r'\bwatermelons?\b': 15.0,
            r'\bcantaloupes?\b': 3.0,
            r'\bhoneydewe?s?\b': 5.0,
            r'\bpineapples?\b': 4.0,
            r'\bapples?\b': 0.33,
        }
        return
    try:
        with open(weights_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
        for item_name, weight in data.items():
            pattern = rf'\b{item_name}e?s?\b'
            ITEM_WEIGHTS[pattern] = weight
        print(f"[Config] Loaded {len(ITEM_WEIGHTS)} item weights from {weights_file}")
    except Exception as e:
        print(f"[Error] Failed to load {weights_file}: {e}")
        ITEM_WEIGHTS = {r'\bapples?\b': 0.33}

load_item_weights()

WEIGHT_UNITS = {"LB": 1.0, "LBS": 1.0, "OZ": 1.0/16.0, "KG": 2.20462262, "G": 0.00220462262}
CAN_WEIGHTS = {10: 6.5, 5: 3.5, 2: 1.5}
BUSHEL_WEIGHTS = {
    'lima bean': 30, 'bean': 30, 'beet': 50, 'cabbage': 50, 'carrot': 50,
    'cucumber': 59, 'eggplant': 34, 'okra': 30, 'onion': 52, 'parsnip': 50,
    'potato': 60, 'rutabaga': 53, 'spinach': 23, 'greens': 23, 'squash': 40,
    'sweet potato': 52, 'tomato': 55, 'turnip': 55, 'apple': 48,
    'blueberry': 42, 'cantaloupe': 50, 'peach': 50, 'pear': 50, 'plum': 50,
    'pepper': 28, 'corn': 56,
}

def get_bushel_weight(product_text: str) -> float:
    text_lower = product_text.lower()
    for product_name, weight in BUSHEL_WEIGHTS.items():
        if product_name in text_lower:
            return weight
    return 35

def get_liquid_density_per_liter(product_text: str) -> float:
    text_lower = product_text.lower()
    if re.search(r'\b(water|tonic)\b', text_lower):
        return 2.20462
    elif re.search(r'\b(juice|cider)\b', text_lower):
        return 2.25
    elif re.search(r'\b(oil)\b', text_lower):
        return 2.03
    elif re.search(r'\b(vinegar|wine)\b', text_lower):
        return 2.20
    elif re.search(r'\b(honey|syrup)\b', text_lower):
        return 3.31
    else:
        return 2.20462

def get_liquid_density(product_text: str) -> float:
    text_lower = product_text.lower()
    if re.search(r'\b(cider|juice|lemonade)\b', text_lower):
        return 8.6
    elif re.search(r'\b(olive oil|oil)\b', text_lower):
        return 7.5
    elif re.search(r'\b(vinegar|wine)\b', text_lower):
        return 8.4
    elif re.search(r'\b(honey|syrup)\b', text_lower):
        return 12.0
    else:
        return 8.34

def _parse_fraction(num_str):
    if '/' in num_str:
        parts = num_str.split('/')
        if len(parts) == 2:
            try:
                return float(parts[0]) / float(parts[1])
            except:
                return float(num_str.replace('/', ''))
    return float(num_str)

def extract_weight_from_text(product_text: str, row_sheet: Optional[int] = None):
    if not product_text:
        return None, "unknown"
    
    product_upper = product_text.upper()
    bushel_wt = get_bushel_weight(product_text)
    
    # BUSHEL PATTERNS (support both "1-1/9 BU" and "BU 1-1/9" formats)
    
    # Format: BU 1-1/9 (unit before number)
    bushel_match = re.search(r'(?:BU|BSH|BUSH)\b\s*(\d+)-(\d+)/(\d+)', product_upper)
    if bushel_match:
        whole, num, denom = int(bushel_match.group(1)), int(bushel_match.group(2)), int(bushel_match.group(3))
        result = (whole + num/denom) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (unit-first mixed) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: 1-1/9 BU (number before unit)
    bushel_match = re.search(r'(\d+)-(\d+)/(\d+)\s*(?:BU|BSH|BUSH)\b', product_upper)
    if bushel_match:
        whole, num, denom = int(bushel_match.group(1)), int(bushel_match.group(2)), int(bushel_match.group(3))
        result = (whole + num/denom) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (mixed) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    bushel_match = re.search(r'(\d+)\s+(\d+)/(\d+)\s*(?:BU|BSH|BUSH)\b', product_upper)
    if bushel_match:
        whole, num, denom = int(bushel_match.group(1)), int(bushel_match.group(2)), int(bushel_match.group(3))
        result = (whole + num/denom) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (space) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: BU 1/2 (unit before fraction)
    bushel_match = re.search(r'(?:BU|BSH|BUSH)\b\s*(?:1/2|½)', product_upper)
    if bushel_match:
        result = 0.5 * bushel_wt
        debug_print(f"'{product_text}' matched bushel (unit-first half) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: 1/2 BU (fraction before unit)
    bushel_match = re.search(r'(?:1/2|½)\s*(?:BU|BSH|BUSH)\b', product_upper)
    if bushel_match:
        result = 0.5 * bushel_wt
        debug_print(f"'{product_text}' matched bushel (half) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: BU 1/4 (unit before simple fraction)
    bushel_match = re.search(r'(?:BU|BSH|BUSH)\b\s*(\d+)/(\d+)', product_upper)
    if bushel_match:
        result = (int(bushel_match.group(1))/int(bushel_match.group(2))) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (unit-first fraction) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: 1/4 BU (simple fraction before unit)
    bushel_match = re.search(r'(\d+)/(\d+)\s*(?:BU|BSH|BUSH)\b', product_upper)
    if bushel_match:
        result = (int(bushel_match.group(1))/int(bushel_match.group(2))) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (fraction) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: BU 2 (unit before whole number)
    bushel_match = re.search(r'(?:BU|BSH|BUSH)\b\s*(\d+(?:\.\d+)?)', product_upper)
    if bushel_match:
        result = float(bushel_match.group(1)) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (unit-first whole) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    # Format: 2 BU (whole number before unit)
    bushel_match = re.search(r'\b(\d+(?:\.\d+)?)\s*(?:BU|BSH|BUSH)\b', product_upper)
    if bushel_match:
        result = float(bushel_match.group(1)) * bushel_wt
        debug_print(f"'{product_text}' matched bushel (whole) → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"

    # CASE PATTERNS (CS = case, typical produce case weights)
    case_weights = {
        'berry': 12, 'berries': 12, 'strawberry': 12, 'strawberries': 12,
        'blueberry': 12, 'blueberries': 12, 'raspberry': 12, 'raspberries': 12,
        'blackberry': 12, 'blackberries': 12,
        'lettuce': 24, 'cabbage': 50, 'broccoli': 20, 'cauliflower': 20,
        'pepper': 25, 'peppers': 25, 'tomato': 25, 'tomatoes': 25,
        'cucumber': 24, 'cucumbers': 24, 'squash': 20, 'zucchini': 20,
        'onion': 50, 'onions': 50, 'carrot': 50, 'carrots': 50,
        'celery': 55, 'scallion': 15, 'scallions': 15, 'greens': 20,
        'okra': 15, 'watercress': 4,
    }
    
    def get_case_weight(product_text: str) -> float:
        text_lower = product_text.lower()
        for product_name, weight in case_weights.items():
            if product_name in text_lower:
                return weight
        return 25  # default case weight
    
    # Match "1/2 CS" or "CS" patterns
    case_match = re.search(r'(?:1/2|½)\s*CS\b', product_upper)
    if case_match:
        result = get_case_weight(product_text) * 0.5
        debug_print(f"'{product_text}' matched half-case → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"
    
    case_match = re.search(r'\bCS\b', product_upper)
    if case_match:
        result = get_case_weight(product_text)
        debug_print(f"'{product_text}' matched full case → {result:.4f} lbs", row_sheet)
        return round(result, 4), "weight_parsed"

    # WEIGHT RANGES
    range_match = re.search(r'(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\s+(lb|lbs|oz|kg|g)\b', product_text, flags=re.I)
    if range_match:
        midpoint = (float(range_match.group(1)) + float(range_match.group(2))) / 2
        unit = range_match.group(3).upper()
        if unit in WEIGHT_UNITS:
            result = midpoint * WEIGHT_UNITS[unit]
            debug_print(f"'{product_text}' matched weight range → {result:.4f} lbs", row_sheet)
            return result, "weight_range"
            
    # WEIGHT RANGES
#    range_match = re.search(r'(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\s+(lb|lbs|oz|kg|g)\b', product_text, flags=re.I)
#    if range_match:
#        midpoint = (float(range_match.group(1)) + float(range_match.group(2))) / 2
#        unit = range_match.group(3).upper()
#        if unit in WEIGHT_UNITS:
#            return midpoint * WEIGHT_UNITS[unit], "weight_range"

    # CT + WEIGHT
    ct_weight_patterns = [
        (r'\b(\d+)\s*CT\s+(\d+(?:\.\d+)?)\s*LBS?\b', lambda m: float(m.group(2))),
        (r'\b(\d+)\s*CT\s+(\d+(?:\.\d+)?)\s*KG\b', lambda m: float(m.group(2)) * 2.20462262),
        (r'\b(\d+)\s*CT\s+(\d+(?:\.\d+)?)\s*OZ\b', lambda m: float(m.group(2)) / 16),
    ]
    for pattern, calc_func in ct_weight_patterns:
        match = re.search(pattern, product_upper)
        if match:
            try:
                result = calc_func(match)
                if result and result > 0:
                    debug_print(f"'{product_text}' matched CT+weight → {result:.4f} lbs", row_sheet)
                    return round(result, 4), "ct_total_weight"
            except:
                continue
    
    # PATTERNS
    LB_TO_KG = 0.45359237
    patterns = [
        (r'\b(\d+(?:\.\d+)?)\s*DOZ\b', lambda m: float(m.group(1)) * 1.5),
        (r'(\d+)X#(\d+)\b', lambda m: int(m.group(1)) * CAN_WEIGHTS.get(int(m.group(2)), 6.5)),
        (r'(\d+)X(\d*\.?\d+)\s*OZ\b', lambda m: int(m.group(1)) * float(m.group(2)) / 16),
        (r'(\d+)X(\d*\.?\d+)\s*LBS?\b', lambda m: int(m.group(1)) * float(m.group(2))),
        (r'(\d+)X(\d*\.?\d+)\s*KG\b', lambda m: int(m.group(1)) * float(m.group(2)) / LB_TO_KG),
        (r'(\d+)X(\d*\.?\d+)\s*GR?\b', lambda m: int(m.group(1)) * float(m.group(2)) / 453.592),
        (r'(\d+)X(\d*\.?\d+)\s*L(?:T|TR?)?\b', lambda m: int(m.group(1)) * float(m.group(2)) * get_liquid_density_per_liter(product_text)),
        (r'\b(\d+(?:\.\d+)?)\s*L(?:T|TR?)?\b', lambda m: float(m.group(1)) * get_liquid_density_per_liter(product_text)),
        (r'(\d+)X(\d*\.?\d+)\s*ML\b', lambda m: int(m.group(1)) * float(m.group(2)) / 1000 * get_liquid_density_per_liter(product_text)),
        (r'\b(\d+(?:\.\d+)?)\s*ML\b', lambda m: float(m.group(1)) / 1000 * get_liquid_density_per_liter(product_text)),
        (r'(\d+)X(\d*\.?\d+)\s*PT\b', lambda m: int(m.group(1)) * float(m.group(2)) * 1.0),
        (r'\b(\d+(?:\.\d+)?)\s*PT\b', lambda m: float(m.group(1)) * 1.0),
        (r'\b(\d+(?:\.\d+)?)\s*OZ\s*\((\d+)\)', lambda m: float(m.group(1)) * int(m.group(2)) / 16),
        (r'\b(\d+(?:\.\d+)?)\s*QT\s*\((\d+)\)', lambda m: float(m.group(1)) * int(m.group(2)) * get_liquid_density(product_text) * 0.25),
        (r'\b(\d+(?:\.\d+)?)\s*PT\s*\((\d+)\)', lambda m: float(m.group(1)) * int(m.group(2)) * get_liquid_density(product_text) * 0.125),
        (r'(\d+)X(?:1/2|½)\s*GAL\b', lambda m: int(m.group(1)) * 0.5 * get_liquid_density(product_text)),
        (r'(\d+)X(\d*\.?\d+(?:/\d+)?)\s*GAL\b', lambda m: int(m.group(1)) * _parse_fraction(m.group(2)) * get_liquid_density(product_text)),
        (r'\b(\d+(?:\.\d+)?(?:/\d+)?)\s*GAL\b', lambda m: _parse_fraction(m.group(1)) * get_liquid_density(product_text)),
        (r'(\d+)X(\d*\.?\d+(?:/\d+)?)\s*QT\b', lambda m: int(m.group(1)) * _parse_fraction(m.group(2)) * get_liquid_density(product_text) * 0.25),
        (r'\b(\d+(?:\.\d+)?(?:/\d+)?)\s*QT\b(?!\s*\()', lambda m: _parse_fraction(m.group(1)) * get_liquid_density(product_text) * 0.25),
        (r'(?:^|\s)(\d*\.?\d+)\s*LBS?\b', lambda m: float(m.group(1))),
        (r'\b(\d+)\s*LBS?\b(?!\s*MC|\s*ATL)', lambda m: float(m.group(1))),
        (r'\b(\d+(?:\.\d+)?)\s*KG\b', lambda m: float(m.group(1)) / LB_TO_KG),
        (r'\b(\d+(?:\.\d+)?)\s*GR?\b', lambda m: float(m.group(1)) / 453.592),
        (r'\b(\d+(?:\.\d+)?)\s*OZ\b', lambda m: float(m.group(1)) / 16),
        (r'\b(\d+)\s*BUNCH(?:ES)?\b', lambda m: int(m.group(1)) * 0.25),
        (r'\b(\d+(?:\.\d+)?)\s*CT\b', lambda m: estimate_ct_weight(float(m.group(1)), product_text, row_sheet)),
        (r'\b(\d+(?:\.\d+)?)\s*P(?:CS?|IECES?)\b', lambda m: estimate_piece_weight(float(m.group(1)), product_text, row_sheet)),
    ]
    
    for pattern, calc_func in patterns:
        match = re.search(pattern, product_upper)
        if match:
            try:
                result = calc_func(match)
                if result and result > 0:
                    debug_print(f"'{product_text}' matched → {result:.4f} lbs", row_sheet)
                    return round(result, 4), "weight_parsed"
            except Exception as e:
                continue
    
    return None, "unknown"

def estimate_ct_weight(count: float, product_text: str, row_sheet: Optional[int] = None) -> float:
    text_lower = product_text.lower()
    for pattern, weight in ITEM_WEIGHTS.items():
        if re.search(pattern, text_lower):
            total_weight = count * weight
            debug_print(f"CT: {count} × {weight} = {total_weight} lbs", row_sheet)
            return total_weight
    if re.search(r'\bsheets?\b|\bpapers?\b', text_lower):
        return count * 0.1
    elif re.search(r'\bsmalls?\b|\bminis?\b', text_lower):
        return count * 0.16
    else:
        return count * 0.25

def estimate_piece_weight(count: float, product_text: str, row_sheet: Optional[int] = None) -> float:
    text_lower = product_text.lower()
    if re.search(r'\bwatermelons?\b|\bpumpkins?\b', text_lower):
        return count * 15.0
    elif re.search(r'\bpineapples?\b|\bcantaloupes?\b|\bhoneydewe?\b', text_lower):
        return count * 3.0
    elif re.search(r'\bturkeys?\b|\bducks?\b', text_lower):
        return count * 12.0
    elif re.search(r'\bchickens?\b|\bfish\b', text_lower):
        return count * 4.0
    else:
        return count * 2.0

# ============================================================================
# AI INTEGRATION
# ============================================================================

def require_gpt():
    if "_call_gpt" not in globals() or not callable(globals()["_call_gpt"]):
        raise RuntimeError("[Cell 5] _call_gpt is not defined.")

def analyze_product_with_ai(product_text: str, estimated_weight=None, row_sheet: Optional[int] = None):
    require_gpt()
    weight_hint = f" (estimated total weight: {estimated_weight} lbs)" if estimated_weight else ""
    prompt = f"""Analyze this food item: {product_text}{weight_hint}

Rules:
1. Single products (cheese, butter, milk) → 100% of that food
2. Prepared foods → break into main ingredients
3. Think like a chef, not a chemist

Return JSON: {{"items": [{{"ingredient": "name", "percent": number}}]}}"""

    try:
        response = _call_gpt(messages=[
            {"role": "system", "content": "You identify main food ingredients in products as they appear in recipes."},
            {"role": "user", "content": prompt}
        ])
        response_text = str(response).strip()
        json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if json_match:
            response_text = json_match.group(0)
        data = json.loads(response_text)
        items = data.get("items", [])
        if not items:
            return get_fallback_ingredients(product_text)
        valid_items = []
        total_percent = 0
        for item in items:
            ingredient = str(item.get("ingredient", "")).strip().lower()
            if not ingredient or ingredient == "null":
                continue
            percent = float(item.get("percent", 0))
            if percent > 0:
                valid_items.append({"ingredient": ingredient, "percent": percent})
                total_percent += percent
        if valid_items and abs(total_percent - 100) > 0.1:
            for item in valid_items:
                item["percent"] = (item["percent"] / total_percent) * 100
        return valid_items if valid_items else get_fallback_ingredients(product_text)
    except Exception as e:
        return get_fallback_ingredients(product_text)

def get_fallback_ingredients(product_text: str):
    text_lower = product_text.lower()
    ingredient_map = {
        r'\b(butter)\b': 'butter', r'\b(cheese)\b': 'cheese', r'\b(milk)\b': 'milk',
        r'\b(cream)\b': 'cream', r'\b(yogurt)\b': 'yogurt', r'\b(bread)\b': 'bread',
        r'\b(chicken)\b': 'chicken', r'\b(beef)\b': 'beef', r'\b(pork)\b': 'pork',
        r'\b(fish)\b': 'fish', r'\b(oil)\b': 'oil', r'\b(flour)\b': 'flour',
    }
    for pattern, ingredient in ingredient_map.items():
        if re.search(pattern, text_lower):
            return [{"ingredient": ingredient, "percent": 100.0}]
    words = re.findall(r'\b[a-z]{3,}\b', text_lower)
    ingredient = words[0] if words else "food product"
    return [{"ingredient": ingredient, "percent": 100.0}]

def estimate_weight_with_ai(product_text: str, qty: float, row_sheet: Optional[int] = None) -> Optional[float]:
    text_lower = product_text.lower()
    if 'watermelon' in text_lower:
        result = 12.0 * qty
        debug_print(f"'{product_text}' AI estimate (watermelon) → {result:.4f} lbs", row_sheet)
        return result
    elif 'cantaloupe' in text_lower:
        result = 3.0 * qty
        debug_print(f"'{product_text}' AI estimate (cantaloupe) → {result:.4f} lbs", row_sheet)
        return result
    elif 'pineapple' in text_lower:
        result = 4.0 * qty
        debug_print(f"'{product_text}' AI estimate (pineapple) → {result:.4f} lbs", row_sheet)
        return result
    try:
        require_gpt()
        prompt = f"""Estimate the total weight in pounds for this produce/food item.

Product: {product_text}
Quantity: {qty}

Context:
- CS = case (full case of produce, typically 15-50 lbs depending on item)
- 1/2 CS = half case
- BU/BUSH = bushel (typically 30-60 lbs depending on item)
- Berries in cases typically weigh 10-15 lbs per case
- Leafy greens in cases typically weigh 20-30 lbs per case
- Root vegetables in cases typically weigh 40-60 lbs per case
- Dense vegetables (peppers, tomatoes) in cases typically weigh 20-30 lbs per case

Think step by step:
1. Identify the product type
2. Identify the packaging (case, half-case, bushel, count, weight unit)
3. Estimate reasonable weight per unit
4. Calculate total weight

Provide only the final numeric answer in pounds (e.g., "24.5")"""
        response = _call_gpt(messages=[
            {"role": "system", "content": "You are an expert in produce wholesale and food service packaging. You understand industry abbreviations like CS (case), BU (bushel), and typical weights for produce items in commercial foodservice."},
            {"role": "user", "content": prompt}
        ])

        weight_match = re.search(r'(\d+(?:\.\d+)?)', str(response))
        if weight_match:
            weight = float(weight_match.group(1))
            if 0.1 <= weight <= 10000:
                result = round(weight, 4)
                debug_print(f"'{product_text}' AI estimate (GPT) → {result:.4f} lbs", row_sheet)
                return result
    except:
        pass
    result = 3.0 * qty
    debug_print(f"'{product_text}' AI estimate (fallback default) → {result:.4f} lbs", row_sheet)
    return result

# ============================================================================
# MAIN PROCESSING
# ============================================================================

def process_single_product(index: int, product_text: str, df_raw: pd.DataFrame, qty_col: Optional[str]):
    row_offset = int(os.getenv("FI_ROW_OFFSET", "2"))
    row_sheet = index + row_offset
    
    try:
        if qty_col and qty_col in df_raw.columns:
            qty_value = df_raw.loc[index, qty_col]
            if pd.notna(qty_value) and qty_value != '':
                qty_final = max(float(qty_value), 1.0)
            else:
                qty_final = 1.0
        else:
            qty_final = 1.0
    except:
        qty_final = 1.0
    
    if not product_text or not isinstance(product_text, str):
        return None
    product_text_clean = str(product_text).strip()
    if len(product_text_clean) < 3 or product_text_clean.lower() in ['type01', 'type02', 'type03', 'nan', 'null']:
        return None

    per_unit_weight, source_type = extract_weight_from_text(product_text_clean, row_sheet)
    if per_unit_weight:
        total_weight = round(per_unit_weight * qty_final, 4)
    else:
        debug_print(f"No weight pattern matched, estimating via AI", row_sheet)
        total_weight = estimate_weight_with_ai(product_text_clean, qty_final, row_sheet)
        source_type = "ai_estimated"
   
    items = analyze_product_with_ai(product_text_clean, total_weight, row_sheet)
    
    return {
        "index": index, "product": product_text_clean, "qty": qty_final,
        "total_weight": total_weight, "items": items, "source_type": source_type
    }

def run_stage_1_type03_gpt_base(input_filename: str, base_name: str) -> pd.DataFrame:
    with log_time("Loading input data"):
        df_raw = pd.read_csv(input_filename)
        unwanted = {"type01", "type02", "type03"}
        if len(df_raw.columns) > 0:
            mask1 = ~(df_raw[df_raw.columns[0]].notna() & df_raw[df_raw.columns[0]].astype(str).str.strip().str.lower().isin(unwanted))
        else:
            mask1 = pd.Series([True] * len(df_raw))
        if len(df_raw.columns) > 1:
            mask2 = ~(df_raw[df_raw.columns[1]].notna() & df_raw[df_raw.columns[1]].astype(str).str.strip().str.lower().isin(unwanted))
        else:
            mask2 = pd.Series([True] * len(df_raw))
        df_raw = df_raw[mask1 & mask2].reset_index(drop=True)

    columns = {c.lower().strip(): c for c in df_raw.columns}
    product_col = columns.get("product") or columns.get("description") or df_raw.columns[0]
    qty_col = columns.get("qty")
    
    product_series = df_raw[product_col].astype(str)
    max_workers = int(os.getenv("FI_MAX_WORKERS", "4"))
    row_offset = int(os.getenv("FI_ROW_OFFSET", "2"))
    
    results = []
    with log_time(f"Processing {len(product_series)} products"):
        with cf.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(process_single_product, i, text, df_raw, qty_col): (i, text)
                      for i, text in product_series.items() if pd.notna(text) and str(text).strip()}
            for future in cf.as_completed(futures):
                result = future.result()
                if result:
                    results.append(result)
    
    output_rows = []
    for result in results:
        for item in result["items"]:
            ingredient = item.get("ingredient", "").strip() or "food product"
            percent = item.get("percent", 100.0)
            ingredient_weight = round((percent * result["total_weight"] / 100), 4) if result["total_weight"] and percent else None
            output_rows.append({
                "row_sheet": result["index"] + row_offset,
                "product": result["product"],
                "qty": result["qty"],
                "product_weight_lbs": result["total_weight"],
                "ingredient": ingredient,
                "percent": round(percent, 2) if percent else None,
                "ingredient_weight_lbs": ingredient_weight,
                "extract_origin": "ai_analysis",
                "source_uom_type": result["source_type"]
            })

    with log_time("Creating output file"):
        df_output = pd.DataFrame(output_rows)
        if not df_output.empty:
            df_output = df_output.sort_values("row_sheet")
            for col in ["product_weight_lbs", "percent", "ingredient_weight_lbs"]:
                if col in df_output.columns:
                    df_output[col] = df_output[col].round(2)
        output_path = output_dir / Path(f"ingredients_{base_name}.csv")
        df_output.to_csv(output_path, index=False, encoding="utf-8")
        print(f"[Stage 1] Wrote {len(df_output)} rows to {output_path}")
    return df_output

try:
    _SESSION_START_TIME
except NameError:
    import time
    _SESSION_START_TIME = time.time()

def run_stage_1_type03_gpt(*args, **kwargs):
    base_name = kwargs.get("base") or (args[1] if len(args) >= 2 else None)
    output_file = Path(f"ingredients_{base_name}.csv") if base_name else None
    if output_file and output_file.exists():
        try:
            if output_file.stat().st_mtime >= _SESSION_START_TIME:
                return pd.read_csv(output_file)
        except:
            pass
    return run_stage_1_type03_gpt_base(*args, **kwargs)

print("=== CELL 5 v110b COMPLETE ===")
print("✓ All fixes applied")
print("✓ Ready to process")

In [None]:
# === Cell 6 — Stage 1 (General) Runner ===
# Purpose: Execute AI ingredient extraction for type01 and type02 vendor formats.
# Reads the procurement table, extracts MAJOR ingredients (+ percents), writes ingredients_{BASE}.csv
# Provides log_time timings.
# Inputs:
#   - standardized DataFrame from Cell 3
#   - ai_extract_ingredients() from Cell 4
# Outputs:
#   - ingredients_{BASE}.csv
# Behavior:
#   - Loops through all products, calling GPT for ingredient prediction.
#   - Applies vegan/meatless safeguards and normalization.
#   - Writes results to disk.
# Notes:
#   - This cell mirrors Cell 5 but omits item_weights.json dependency.
#   - Overwrites existing ingredients file if START_MODE='full'.

from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import concurrent.futures as cf
import pandas as pd
import numpy as np
import json, re, os


if file_type == 'type03':
    print("Skipping this cell - type03 uses different processing")
    # You could also just use a pass statement or return if it's in a function
else:

    # ----------------------------
    # Timing helper (no-op fallback)
    # ----------------------------
    try:
        log_time
    except NameError:
        from contextlib import contextmanager
        @contextmanager
        def log_time(_label: str):
            yield
    
    # ----------------------------
    # Constants / configuration
    # ----------------------------
    VEGAN_TERMS = (
        "vegan","plant-based","plant based","meatless","vegetarian",
        "dairy-free","dairy free","non-dairy","nondairy"
    )

    VEGAN_INGREDIENT_REPLACEMENTS = {
    # Dairy products
    "yogurt": "non-dairy yogurt",
    "milk": "plant milk",
    "cheese": "vegan cheese",
    "cream": "plant-based cream",
    "butter": "vegan butter",
    "sour cream": "vegan sour cream",
    "cream cheese": "vegan cream cheese",
    "ice cream": "non-dairy ice cream",
    
    # Meat products (for vegan meat substitutes)
    "chicken": "plant-based chicken",
    "beef": "plant-based beef", 
    "pork": "plant-based pork",
    "turkey": "plant-based turkey",
    "sausage": "plant-based sausage",
    "bacon": "plant-based bacon",
    
    # Other animal products
    "egg": "egg substitute",
    "gelatin": "plant-based gelling agent",
    "honey": "plant-based sweetener",
}
    
    ANIMAL_INGREDIENT_TOKENS = {
        # dairy
        "milk","dairy milk","cow milk","goat milk","sheep milk","cheese","whey","casein","lactose",
        "butter","ghee","cream","yogurt","buttermilk","skim milk","whole milk","milk powder","whey powder","sour cream",
        # eggs & honey & gelatin
        "egg","eggs","egg white","egg whites","egg yolk","honey","gelatin",
        # terrestrial meats
        "beef","pork","chicken","turkey","lamb","mutton","bacon","sausage","venison","veal","ham",
        # fish & seafood
        "fish","salmon","tuna","cod","haddock","pollock","anchovy","sardine","halibut","mahi",
        "catfish","tilapia","trout","bass","redfish","snapper","shrimp","prawn","crab","lobster","oyster","clam","mussel",
    }
    
    # Fish species tokens for annotation
    FISH_SPECIES = {
        "fish","salmon","tuna","cod","haddock","pollock","anchovy","sardine","halibut","mahi",
        "catfish","tilapia","trout","bass","redfish","snapper","hake","sole","flounder","mackerel","herring","anchovies","sardines"
    }
    
    # “Do not decompose” labels (top-level categories kept intact if name matches exactly)
    DO_NOT_DECOMPOSE = set(globals().get("ALLOWED_CATEGORIES", []))
    
    # Protected multi-word items that shouldn’t be split into subcomponents
    PROTECTED_INGREDIENTS = {
    "almond milk","soy milk","oat milk","rice milk","pea milk","coconut milk","cashew milk",
    "peanut butter","ice cream","tofu","tempeh","seitan","vegan cheese alternative",
    "vegan mozzarella cheese alternative","non-dairy yogurt","coconut yogurt",
    "almond yogurt","soy yogurt","cashew yogurt","plant-based cream","vegan butter",
    "plant-based chicken","plant-based beef","plant-based pork","vegan sausage",
}
    
    # Single-item detectors (collapse obvious single-ingredient staples)
    SINGLE_ITEM_DETECTORS: List[Tuple[re.Pattern,str]] = [
        (re.compile(r"\bwater\b", re.I), "water"),
        (re.compile(r"\bsugar\b", re.I), "sugar"),
        (re.compile(r"\bsalt\b", re.I), "salt"),
        (re.compile(r"\bflour\b", re.I), "flour"),
        (re.compile(r"\bbutter\b", re.I), "butter"),
        (re.compile(r"\bcream\b", re.I), "cream"),
        (re.compile(r"\bmilk\b", re.I), "milk"),
        (re.compile(r"\byogurt\b", re.I), "yogurt"),
        (re.compile(r"\b(oil|olive oil|canola oil|vegetable oil)\b", re.I), "oil"),
        (re.compile(r"\brice\b", re.I), "rice"),
        (re.compile(r"\boats?\b", re.I), "oats"),
        (re.compile(r"\bpasta\b|\bspaghetti\b|\bpenne\b", re.I), "pasta"),
    ]
    
    # Packaging-water context (avoid collapsing to “water” just because “packed in water”)
    PACKAGING_WATER_CONTEXT_RE = re.compile(r"\b(in|with)\s+water\b|\bwater\s*pack(ed)?\b", re.I)
    
    # Half & Half canonical rule
    HALF_AND_HALF_RE = re.compile(r"\bhalf\s*&\s*half\b|\bhalf\s+and\s+half\b", re.I)
    
    # Ratios like 75/25
    RATIO_RE = re.compile(r'(\d{1,3})\s*/\s*(\d{1,3})(?:\s*/\s*(\d{1,3}))?')
    
    # Threshold to drop micro-ingredients post-normalization
#    MIN_KEEP_PCT = float(os.getenv("FI_MIN_KEEP_PCT", "5")) old version deleted
    MIN_KEEP_PCT = CATEGORY_PERCENT_MIN
    
    # Vegan meat default mix (when GPT returns weak/unknown for vegan meat substitutes)
    VEGAN_MEAT_DEFAULT = [
        {"ingredient": "legumes",          "percent": 30.0},
        {"ingredient": "wheat",            "percent": 30.0},
        {"ingredient": "other vegetables", "percent": 30.0},
        {"ingredient": "vegetable oil",    "percent": 10.0},
    ]
    
    # ----------------------------
    # Helpers
    # ----------------------------
    def _norm(s: str) -> str:
        try:
            return norm_text(s)  # use your notebook's normalizer if present
        except Exception:
            t = str(s).strip().lower()
            return re.sub(r"\s+", " ", t)

    def _is_blank_product(s: Any) -> bool:
        if s is None:
            return True
        t = str(s).strip()
        if t == "":
            return True
        tl = t.lower()
        return tl in ("nan", "none")
    
    def is_vegan_labeled(name: str) -> bool:
        n = _norm(name)
        return any(t in n for t in VEGAN_TERMS)
    
    def contains_animal_token(name_lc: str) -> bool:
        return any(tok in name_lc for tok in ANIMAL_INGREDIENT_TOKENS)
    
    def is_allowed_plant_variant(name_lc: str) -> bool:
        # allow explicit plant milks etc.
        for prot in PROTECTED_INGREDIENTS:
            if prot in name_lc:
                return True
        return False
    
    def sanitize_vegan_dairy_naming(s: str) -> str:
        # avoid returning bare “mozzarella” for vegan cheese; keep it generic
        if "mozzarella" in s.lower() and any(k in s.lower() for k in ("vegan","plant")):
            return "vegan mozzarella cheese alternative"
        return s
    
    def is_protected(ing: str) -> bool:
        ing_lc = ing.lower()
        return any(p in ing_lc for p in PROTECTED_INGREDIENTS)
    
    def detect_bone_in(text: str) -> bool:
        t = _norm(text)
        return bool(re.search(r"\bbone[-\s]?in\b|\bwith bones?\b|\bbone[-\s]?in\s+skin[-\s]?on\b", t))
    
    def detect_fish_source(text: str) -> Optional[str]:
        t = _norm(text)
        if re.search(r"\bwild\s*[- ]?\s*caught\b|\bwild\b", t):
            return "wild-caught"
        if re.search(r"\bfarm(ed|[- ]raised)?\b|\baquaculture\b|\braised\b", t):
            return "farmed"
        return None
    
    def annotate_fish_items(items: List[Dict[str,Any]], product_text: str) -> List[Dict[str,Any]]:
        src = detect_fish_source(product_text)
        if not src:
            return items
        out = []
        for it in items:
            name = str(it.get("ingredient",""))
            nlc = name.lower()
            # Skip sauces & non-fish
            if "sauce" in nlc:
                out.append(it); continue
            # If already annotated, keep
            if "(farmed" in nlc or "(wild" in nlc:
                out.append(it); continue
            # Annotate fish/seafood tokens
            token = None
            for sp in FISH_SPECIES:
                if re.search(rf"\b{re.escape(sp)}\b", nlc):
                    token = sp; break
            if token:
                it = dict(it)
                it["ingredient"] = f"{name} ({src})"
            out.append(it)
        return out
    
    def canonical_single_ingredient(title: str) -> Optional[str]:
        t = title or ""
        if is_vegan_labeled(t):
            return None
        # Respect do-not-decompose if name exactly equals an allowed category
        n = _norm(t)
        if n in {_norm(x) for x in DO_NOT_DECOMPOSE}:
            return t.strip()
        # Otherwise single-ingredient staples
        for rx, name in SINGLE_ITEM_DETECTORS:
            if rx.search(t):
                # Never collapse to water if it's clearly just packaging context
                if name == 'water' and PACKAGING_WATER_CONTEXT_RE.search(t):
                    continue
                return name
        return None
    
    def normalize_percents(items: List[Dict[str,Any]], product_text: str) -> List[Dict[str,Any]]:
        # If explicit ratios are in the name (e.g., 75/25), honor them if two items
        m = RATIO_RE.search(product_text or "")
        if m and len(items) == 2:
            nums = [int(x) for x in m.groups() if x]
            if len(nums) >= 2:
                total = sum(nums[:2])
                if total > 0:
                    return [
                        {"ingredient": items[0]["ingredient"], "percent": 100.0 * nums[0] / total},
                        {"ingredient": items[1]["ingredient"], "percent": 100.0 * nums[1] / total},
                    ]
    
        # Otherwise, if percents missing or don’t sum ~100, scale majors sensibly
        vals = [it.get("percent") for it in items if it.get("percent") is not None]
        if not vals:
            return items
        total = sum(p for p in vals if p is not None)
        if total <= 0:
            return items
        scale = 100.0 / total
        out = []
        for it in items:
            pct = it.get("percent")
            if pct is None:
                out.append({"ingredient": it["ingredient"], "percent": None})
            else:
                out.append({"ingredient": it["ingredient"], "percent": pct * scale})
        return out
    
    def drop_minors(items: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
        majors = []
        for it in items:
            pct = it.get("percent")
            if pct is None:
                majors.append(it)  # keep if unsure
            elif pct >= MIN_KEEP_PCT:
                majors.append(it)
        return majors or items  # never drop everything
    
    def looks_like_vegan_meat(text: str) -> bool:
        if not is_vegan_labeled(text):
            return False
        t = _norm(text)
        return bool(re.search(
            r"\b(chicken|beef|pork|turkey|sausage|nugget|tender|patty|burger|meatball|ground|crumbles|hot\s*dog|rib|steak|cutlet)\b",
            t))
    
    def vegan_meat_fallback_needed(items: List[Dict[str,Any]]) -> bool:
        if not items:
            return True
        # Too generic or single generic item like "patty"
        generic = {"patty","burger","protein","meat substitute","meatless patty","tender","nugget"}
        if len(items) == 1 and _norm(items[0]["ingredient"]) in generic:
            return True
        # All unknown?
        if all(_norm(it["ingredient"]) == "unknown" for it in items):
            return True
        return False




    # --------------------------------------------------------------------
# --- Vegan safeguard patch (Option A) ---
# Provides a minimal, safe definition of enhanced_vegan_filter_and_convert()
# and guards its invocation to prevent "error"/"unknown" failures for vegan items.
# --------------------------------------------------------------------

    def enhanced_vegan_filter_and_convert(items, product_text):
        """
        Simple vegan/meatless filter and converter.
    
        - Removes animal-derived ingredients (meat, fish, eggs, dairy, honey, gelatin).
        - Keeps plant-based or neutral ingredients.
        - Returns the cleaned list; if everything is removed, returns an empty list.
        """
        if not items:
            return []
    
        animal_terms = (
            "beef","pork","chicken","turkey","lamb","fish","salmon","tuna","shrimp",
            "egg","cheese","milk","butter","cream","yogurt","honey","gelatin"
        )
    
        safe_items = []
        for it in items:
            ing = str(it.get("ingredient","")).lower()
            # skip clear animal items
            if any(tok in ing for tok in animal_terms):
                continue
            safe_items.append(it)
    
        return safe_items
    
    
    # --- Wrap vegan-filter invocation safely ---
    _old_ai_extract_ingredients = None
    if 'ai_extract_ingredients' in globals():
        _old_ai_extract_ingredients = ai_extract_ingredients  # preserve old symbol if rerun

    #----------------------------
    # ----------------------------
    # ai_extract_ingredients (returns items, origin)
    # ----------------------------
    def ai_extract_ingredients(product_text: str, max_ings: int = 8) -> Tuple[List[Dict[str, Any]], str]:
        """
        Return (items, origin) where items = [{"ingredient": str, "percent": float|None}, ...]
        origin in {"rule_halfhalf","rule_single","rule_fallback_vegan_meat","gpt1","error","skip"}
        """
        if _is_blank_product(product_text):
            return [], "skip"
    
        product_text = (product_text or "").strip()
        vegan_flag = is_vegan_labeled(product_text)
    
        # EXACT RULE: Half & Half is 50/50 milk/cream (ignore butterfat % range)
        if not vegan_flag and HALF_AND_HALF_RE.search(product_text):
            return [{"ingredient": "milk", "percent": 50.0},
                    {"ingredient": "cream", "percent": 50.0}], "rule_halfhalf"
    
        # Do-not-decompose (exact category match) OR single staples
        single = canonical_single_ingredient(product_text)
        if single:
            return [{"ingredient": single, "percent": 100.0}], "rule_single"
    
        # --- Build the LLM prompt (your established guidance) ---
        base_rules = [
            "Return ingredients for the product below as a JSON object with key `items`.",
            "Each item must be {\"ingredient\": <string>, \"percent\": <number or null>}.",
            f"Limit to at most {max_ings} items.",
            "Report ONLY major ingredients by mass. OMIT micro-ingredients likely under "
            f"{MIN_KEEP_PCT}% (e.g., salt, acids, leaveners, stabilizers, spices, flavors), unless the product has only 1–2 ingredients.",
            "If the product name specifies blend ratios (e.g., 75/25), reflect those percents exactly.",
            "If unsure of exact amounts, provide realistic approximate percents for MAJOR ingredients; do NOT evenly split many items.",
            "Do NOT decompose protected items; keep them intact (e.g., 'almond milk', 'ice cream', 'peanut butter', 'tofu', 'tempeh', 'seitan').",
            "IF the product name denotes a single-ingredient staple (milk, cream, butter, yogurt, a single specific cheese, plain oil, sugar, flour, salt, rice, oats, pasta, water, vinegar), output EXACTLY one item at 100 and do NOT list additives.",
            "IF the product name includes 'half & half' or 'half and half', output exactly two items: 'milk' 50% and 'cream' 50%.",
            # === added to reduce unknowns ===
            "If the product clearly is a non-food item (e.g. napkins, paper towels, cups, containers, foil, plastic wrap), return one item {\"ingredient\": \"unknown\", \"percent\": 100}.  Otherwise, never return 'unknown'—instead infer the most typical major ingredients for that food product.",
        ]
        vegan_rules = [
            "IMPORTANT: The product is labeled vegan/plant-based/vegetarian/non-dairy.",
            "Do NOT include any animal-derived ingredients (meat, fish/seafood, eggs, dairy, gelatin, honey).",
            "If a 'milk' is implied, interpret as a plant milk and name explicitly (e.g., 'almond milk').",
            "Avoid naming dairy cheeses directly; if needed, use 'vegan cheese alternative' terminology.",
            "IF the product name indicates vegan or imitation cheese and GPT would otherwise return 'unknown', "
            "infer a plausible base composition such as water (~45%), vegetable oil (~23%), potato starch (~22%), "
            "and legumes (~10%), or similar plant-based ingredients that form vegan cheese.",
        ] if vegan_flag else []
    
        protected_note = "Protected examples:\n" + ", ".join(sorted(PROTECTED_INGREDIENTS))
        system_msg = (
            "You extract food ingredients as structured JSON for downstream life-cycle analysis. "
            "Prioritize MAJOR ingredients and realistic percents; omit micro-ingredients under the threshold."
        )
        user_msg = "\n".join(base_rules + vegan_rules + [
            "", protected_note, "", "Product text:", product_text, "",
            "Respond ONLY with JSON like: {\"items\":[{\"ingredient\":\"...\",\"percent\":12.3}, ...]}",
        ])
    
        gpt_called = True
        try:
            # NOTE: uses the notebook's existing _call_gpt defined earlier
            raw = _call_gpt(
                messages=[{"role": "system", "content": system_msg},
                          {"role": "user", "content": user_msg}]
            )
        except Exception:
            raw = ""
            gpt_called = False
    
        # Parse JSON
        items: List[Dict[str, Any]] = []
        if raw:
            try:
                obj = json.loads(raw)
                cand = obj.get("items", [])
                if isinstance(cand, list):
                    for it in cand:
                        ing = str(it.get("ingredient", "")).strip()
                        if not ing:
                            continue
                        pct = it.get("percent", None)
                        try:
                            pct = float(pct) if pct is not None else None
                        except Exception:
                            pct = None
                        # Snap protected phrases
                        if is_protected(ing):
                            for prot in PROTECTED_INGREDIENTS:
                                if prot in ing.lower():
                                    ing = prot
                                    break
                        items.append({"ingredient": ing, "percent": pct})
            except Exception:
                items = []
    
        origin = "gpt1" if gpt_called else "error"

        def _is_all_unknown(lst):
            return (not lst) or all(_norm(it.get("ingredient","")) == "unknown" for it in lst)
        
        if _is_all_unknown(items):
            try:
                fb_user = (
                    "Return realistic MAJOR ingredients (avoid 'unknown'). "
                    f"Limit to 5 items, omit micro-ingredients under {MIN_KEEP_PCT}%. "
                    "If the product is vegan/plant-based/imitation, use typical substitutes. "
                    f"Product text: {product_text}\n"
                    'Respond ONLY as JSON like {"items":[{"ingredient":"...","percent":12.3}, ...]}'
                )
                raw2 = _call_gpt(messages=[
                    {"role": "system", "content": "You infer typical ingredient compositions for institutional foods."},
                    {"role": "user", "content": fb_user}
                ])
                try:
                    obj2 = json.loads(raw2)
                except Exception:
                    m = re.search(r"\{[\s\S]*\}", raw2 or "")
                    obj2 = json.loads(m.group(0)) if m else {}
                cand2 = obj2.get("items", [])
                new_items = []
                if isinstance(cand2, list):
                    for it in cand2:
                        ing = str(it.get("ingredient","")).strip()
                        if not ing:
                            continue
                        pct = it.get("percent", None)
                        try:
                            pct = float(pct) if pct is not None else None
                        except Exception:
                            pct = None
                        # snap protected phrases
                        if is_protected(ing):
                            for prot in PROTECTED_INGREDIENTS:
                                if prot in ing.lower():
                                    ing = prot
                                    break
                        new_items.append({"ingredient": ing, "percent": pct})
                if new_items and not _is_all_unknown(new_items):
                    items = new_items
                    origin = "gpt_fallback"
            except Exception:
                # keep prior items/origin if fallback fails
                pass


        # === added fallback to avoid "unknown" ===
        def _is_all_unknown(lst):
            return all(_norm(it["ingredient"]) == "unknown" for it in lst)

        if (not items) or _is_all_unknown(items):
            try:
                fallback_prompt = f"""
You are a food product analyst estimating typical compositions for institutional foods.

The following product name was too vague for prior analysis:
"{product_text}"

Task:
Infer the most typical MAJOR ingredients and their approximate percentages by mass.
Avoid "unknown" or "other". If the product name is generic (e.g., 'Chicken Patty', 'Fish Stick'),
use realistic, generic ingredients (e.g., 'chicken', 'breading', 'oil').
Limit to 5 ingredients.

Respond ONLY as JSON:
{{"items":[{{"ingredient":"...","percent":...}},...]}}
"""
                raw2 = _call_gpt(
                    messages=[
                        {"role": "system", "content": "You estimate typical ingredient compositions for common institutional food products."},
                        {"role": "user", "content": fallback_prompt}
                    ]
                )
                obj2 = json.loads(raw2)
                cand2 = obj2.get("items", [])
                if isinstance(cand2, list) and cand2:
                    items = []
                    for it in cand2:
                        ing = str(it.get("ingredient", "")).strip()
                        pct = it.get("percent", None)
                        try:
                            pct = float(pct) if pct is not None else None
                        except Exception:
                            pct = None
                        if ing:
                            items.append({"ingredient": ing, "percent": pct})
                    origin = "gpt_fallback"
            except Exception:
                pass
        # === end added fallback ===
    
        # Vegan post-filter & naming sanitizer
        if vegan_flag and items:
            try:
                items = enhanced_vegan_filter_and_convert(items, product_text)
            except Exception as e:
                print(f"[warn] vegan filter failed for: {product_text[:80]} ({e})")
            if not items and looks_like_vegan_meat(product_text):
                return VEGAN_MEAT_DEFAULT.copy(), "rule_fallback_vegan_meat"
    
        # Vegan meat fallback if GPT weak/unknown for vegan-meatlike item
        if looks_like_vegan_meat(product_text) and vegan_meat_fallback_needed(items):
            return VEGAN_MEAT_DEFAULT.copy(), "rule_fallback_vegan_meat"
    
        # Normalize, annotate fish, and drop minors
        items = normalize_percents(items, product_text)
        items = annotate_fish_items(items, product_text)
        items = drop_minors(items)

        if not items and vegan_flag and "cheese" in product_text.lower():
            return [
                {"ingredient": "water", "percent": 45.0},
                {"ingredient": "vegetable oil", "percent": 23.0},
                {"ingredient": "potato starch", "percent": 22.0},
                {"ingredient": "legumes", "percent": 10.0},
            ], "rule_fallback_vegan_cheese"

        if not items:
            return [{"ingredient": "unknown", "percent": 100.0}], origin
        return items, origin
    
    # ----------------------------
    # Weight extraction helper
    # ----------------------------
    def _extract_row_weights(src_row: pd.Series) -> Tuple[Optional[float], Optional[float]]:
        """Return (product_weight_lbs, weight_kg) if present on the source row."""
        p_lbs = None
        wkg = None
        for cand in ("ingredient_weight_lbs","product_weight_lbs","weight_lbs"):
            if cand in src_row.index and pd.notna(src_row[cand]):
                try:
                    p_lbs = float(src_row[cand])
                    break
                except Exception:
                    pass
        if ("weight_kg" in src_row.index) and pd.notna(src_row["weight_kg"]):
            try:
                wkg = float(src_row["weight_kg"])
            except Exception:
                pass
        return p_lbs, wkg
    
    # ----------------------------
    # Stage 1 runner (with log_time)
    # ----------------------------
    def _find_product_col(df: pd.DataFrame) -> str:
        for c in df.columns:
            if _norm(c) in ("product","product name","item","description","name"):
                return c
        return df.columns[0]
    
    def run_stage_1(input_filename: str, file_type: str, base: str) -> pd.DataFrame:
        with log_time("S1.0 load + standardize"):
            try:
                std = load_and_standardize(input_filename, file_type)
                df = std
                prod_col = "product"
                series = df[prod_col].astype(str)
            except Exception:
                df = pd.read_csv(input_filename)
                prod_col = _find_product_col(df)
                series = df[prod_col].astype(str)
   
        # Filter out unwanted type markers before processing
        unwanted = {"type01", "type02", "type03"}
        
        # Create masks for first two columns (typically product and description)
        if len(df.columns) > 0:
            mask1 = ~(df[df.columns[0]].notna() & 
                     df[df.columns[0]].astype(str).str.strip().str.lower().isin(unwanted))
        else:
            mask1 = pd.Series([True] * len(df))
        
        if len(df.columns) > 1:
            mask2 = ~(df[df.columns[1]].notna() & 
                     df[df.columns[1]].astype(str).str.strip().str.lower().isin(unwanted))
        else:
            mask2 = pd.Series([True] * len(df))
        
        # Apply both masks
        df = df[mask1 & mask2].reset_index(drop=True)
        series = df[prod_col].astype(str)

        # === added: skip NaN/blank product rows BEFORE submitting to GPT ===
        keep_mask = df[prod_col].notna() & df[prod_col].astype(str).str.strip().ne("")
        df = df.loc[keep_mask]
        series = df[prod_col].astype(str)
        # === end added ===
        # --- Skip empty, blank, or obvious non-values before submitting to GPT ---
        mask_valid = (
            series.notna()
            & series.str.strip().ne("")
            & ~series.str.lower().isin(["nan", "none"])
        )
        series = series[mask_valid]
        
        # Settings
        max_workers = int(os.getenv("FI_MAX_WORKERS", "8"))
        max_ings = int(os.getenv("FI_MAX_INGS", "8"))
        ROW_OFFSET = int(os.getenv("FI_ROW_OFFSET", "2"))  # header row=1, first product=2
    
        def _one(idx_val: int, text: str) -> Tuple[List[Dict[str, Any]], str]:
            try:
                return ai_extract_ingredients(text, max_ings=max_ings)
            except Exception:
                return [{"ingredient": "unknown", "percent": 100.0}], "error"
    
        rows: List[Dict[str, Any]] = []
    
        with log_time(f"S1.1 submit {len(series)} items to pool (max_workers={max_workers})"):
            with cf.ThreadPoolExecutor(max_workers=max_workers) as ex:
                futures = {
                    ex.submit(_one, i, t): (i, t)
                    for i, t in series.items()
                    if (pd.notna(t) and str(t).strip())
                }
    
                with log_time("S1.2 collect results from pool"):
                    for fut in cf.as_completed(futures):
                        i, t = futures[fut]
                        items, origin = fut.result()
    
                        src_row = df.loc[i] if i in df.index else pd.Series(dtype=object)
                        p_lbs, wkg = _extract_row_weights(src_row)
                        bone_in = detect_bone_in(str(t))
                        qty_val = None
                        if "qty" in src_row.index and pd.notna(src_row["qty"]):
                            qty_val = src_row["qty"]
    
                        # Prefer standardized product name if available
                        prod_name = (str(src_row["product"])
                                     if ("product" in src_row.index and pd.notna(src_row["product"]))
                                     else str(t))
    
                        base_fields = {
                            "__row": i,
                            "row_sheet": int(i) + ROW_OFFSET,
                            "product": prod_name,
                            "extract_origin": origin,
#                            "bone_in": bool(bone_in),
                            "bone_in": "True" if bone_in else "",
                        }
                        if qty_val is not None:
                            base_fields["qty"] = qty_val
                        if p_lbs is not None:
                            base_fields["product_weight_lbs"] = p_lbs
                        if wkg is not None:
                            base_fields["weight_kg"] = wkg
    
                        for it in items:
                            rec = dict(base_fields)
                            rec["ingredient"] = it["ingredient"]
#                            rec["percent"] = it.get("percent", None)
                            val = it.get("percent", None)
                            rec["percent"] = round(val, 2) if isinstance(val, (int, float)) else val
                            rows.append(rec)


        with log_time("S1.3 build DataFrame"):
            df_ing = pd.DataFrame(rows)
    
            # Ensure row_sheet exists (fallback to __row + offset)
            if "row_sheet" not in df_ing.columns:
                if "__row" in df_ing.columns:
                    df_ing["row_sheet"] = df_ing["__row"].astype(int) + ROW_OFFSET
                else:
                    df_ing = df_ing.reset_index(drop=False).rename(columns={"index": "__row"})
                    df_ing["row_sheet"] = df_ing["__row"].astype(int) + ROW_OFFSET
    
            # Calculate ingredient_weight_lbs and format decimal columns
            if 'percent' in df_ing.columns and 'product_weight_lbs' in df_ing.columns:
                df_ing['ingredient_weight_lbs'] = df_ing['percent'] * df_ing['product_weight_lbs'] / 100
                
                # Round to 2 decimal places
                df_ing['percent'] = df_ing['percent'].round(2)
                df_ing['ingredient_weight_lbs'] = df_ing['ingredient_weight_lbs'].round(2)
                df_ing['product_weight_lbs'] = df_ing['product_weight_lbs'].round(2)
    
            # Column order (keep one 'product' column; hide __row in file) - add ingredient_weight_lbs
# optional add qty (remove next line)           preferred = ["row_sheet","product","qty","product_weight_lbs","ingredient","percent","ingredient_weight_lbs","bone_in","extract_origin"]
            preferred = ["row_sheet","product","product_weight_lbs","ingredient","percent","ingredient_weight_lbs","extract_origin","bone_in"]
            ordered = [c for c in preferred if c in df_ing.columns]
            ordered += [c for c in df_ing.columns if c not in set(ordered + ["__row"])]
            df_ing = df_ing[ordered].sort_values("row_sheet", kind="stable")
    
        with log_time("S1.4 write ingredients CSV"):
            out_path = output_dir / Path(f"ingredients_{base}.csv")
            df_ing.drop(columns=["__row"], errors="ignore").to_csv(out_path, index=False)
            print(f"[Stage 1] Wrote {len(df_ing)} rows to {out_path} (submitted {len(series)} products)")
    
        return df_ing


In [None]:
# === Cell 7 — Stage 2: Categorization ===
# Purpose: Map each ingredient to a standardized food category using a mix of deterministic and AI-assisted logic.
# Inputs:
#   - ingredients_{BASE}.csv (from Stage 1), factors.csv (to derive ALLOWED_CATEGORIES if not provided)
#   - foodcategories.json (category dictionary)
# Dependencies:
#   - AI fallback function (optional)
#   - normalization utilities (_norm)
# Outputs:
#   - categories_{BASE}.csv — categorized ingredient data
#   - updated foodcategories.json (optional append)
# Behavior:
#   - Performs exact and fuzzy matching.
#   - Calls GPT for unmatched ingredients if ALLOW_AI_FALLBACK is True.
#   - Logs all newly learned ingredient–category pairs for review.
# Notes:
#   - Overwrites existing categories_{BASE}.csv if START_MODE='full' or 'after_ingredients'.
#   - Skips if START_MODE='after_categories'.


from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
import json, re, os
from difflib import SequenceMatcher

# --- Optional timing (no-op if you didn't add it in Utilities) ---
try:
    log_time
except NameError:
    from contextlib import contextmanager
    @contextmanager
    def log_time(_): 
        yield

# ----------------------------fing - 
# Config & helpers
# ----------------------------
VEGAN_TERMS = (
    "vegan","plant-based","plant based","meatless","vegetarian",
    "dairy-free","dairy free","non-dairy","nondairy"
)

DAIRY_CATEGORIES = {
    "dairy milk","skim milk","whole milk","concentrated milk","milk powder","whey powder",
    "butter","ghee","cream","sour cream","ice cream","cheese","dairy cheese",
    "yogurt","low fat yogurt","buttermilk","lactose powder"
}

PLANT_MILK_WORDS = {
    "almond","soy","oat","rice","pea","coconut","cashew","hemp","hazelnut","macadamia"
}

FUZZY_CUTOFF = 0.92  # high cutoff to avoid wrong hits

def _norm(s: str) -> str:
    """Use your norm_text if defined, else lowercase/strip/collapse spaces."""
    try:
        return norm_text(s)
    except Exception:
        t = str(s).strip().lower()
        return re.sub(r"\s+", " ", t)

def _load_allowed_categories(factors_csv: Optional[str] = None) -> Tuple[List[str], Dict[str,str]]:
    """Return (allowed_list, norm->display map)."""
    allowed = None
    if "ALLOWED_CATEGORIES" in globals():
        try:
            allowed = list(ALLOWED_CATEGORIES)
        except Exception:
            allowed = None
    if allowed is None:
        cand = factors_csv or globals().get("factors_filename") or "factors.csv"
        fac = pd.read_csv(cand)
        if "category" not in fac.columns:
            fac = fac.rename(columns={fac.columns[0]: "category"})
        allowed = fac["category"].dropna().astype(str).tolist()
    key_map = {_norm(c): c for c in allowed}
    return allowed, key_map

def _vegan_hint(product_key: str) -> bool:
    return any(t in product_key for t in VEGAN_TERMS)

def _is_dairy(cat_display: str) -> bool:
    return _norm(cat_display) in {_norm(x) for x in DAIRY_CATEGORIES}

def _milk_heuristic(ingredient_key: str, allowed_key2disp: Dict[str,str]) -> Optional[str]:
    """If ingredient looks like '<plant> milk' and plant is known, map to that milk category if allowed."""
    m = re.search(r"\b([a-z][a-z ]+)\s+milk\b", ingredient_key)
    if not m:
        return None
    plant = m.group(1).strip().split()[-1]
    if plant in PLANT_MILK_WORDS:
        key = _norm(f"{plant} milk")
        return allowed_key2disp.get(key)
    return None

def _fuzzy_best(label_norm: str, allowed_norm: List[str]) -> Tuple[Optional[str], float]:
    best = 0.0
    hit = None
    for cand in allowed_norm:
        r = SequenceMatcher(None, label_norm, cand).ratio()
        if r > best:
            best = r; hit = cand
    return hit, best

# ---- Minimal JSON dict cache (ingredient -> category), append-only ----
def _load_foodcats_map(path: Path) -> Dict[str, str]:
    """Load simple dict {ingredient -> category}. If missing/invalid, return {}. Keys normalized."""
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
        if not isinstance(data, dict):
            return {}
        out = {}
        for k, v in data.items():
            nk = _norm(k)
            if nk and str(v).strip():
                out.setdefault(nk, str(v).strip())  # keep first; do not overwrite
        return out
    except Exception:
        return {}

def _append_foodcats_map(path: Path, new_rows: pd.DataFrame) -> int:
    """Append-only: add new {ingredient_key -> category}. Never overwrites existing keys.
       Skips empty categories and 'unknown'. Returns the number of keys appended."""
    if new_rows is None or new_rows.empty:
        return 0
    cur = _load_foodcats_map(path)
    before = len(cur)
    for _, r in new_rows.iterrows():
        ik = _norm(r.get("ingredient_key", ""))
        cat = str(r.get("category", "")).strip()
        if not ik or not cat or cat.lower() == "unknown":
            continue
        if ik not in cur:
            cur[ik] = cat
    added = len(cur) - before
    if added > 0:
        path.write_text(json.dumps(cur, ensure_ascii=False, indent=2), encoding="utf-8")
    return added


# ---- Per-run CSV cache for (ingredient_key, product_key) ----
def _load_run_cache(path: Path) -> pd.DataFrame:
    if path.exists():
        df = pd.read_csv(path)
        need = ["ingredient_key","product_key","category","match_method"]
        for c in need:
            if c not in df.columns:
                df[c] = ""
        return df[need].drop_duplicates()
    return pd.DataFrame(columns=["ingredient_key","product_key","category","match_method"])

def _update_run_cache(path: Path, rows: pd.DataFrame) -> None:
    if rows is None or rows.empty:
        return
    cur = _load_run_cache(path)
    merged = pd.concat([cur, rows[["ingredient_key","product_key","category","match_method"]]], ignore_index=True)
    merged = merged.drop_duplicates(subset=["ingredient_key","product_key"], keep="last")
    merged.to_csv(path, index=False)

# ---- Strict GPT categorizer (must return exactly one allowed label) ----
def _call_gpt_categorize(ingredient: str, product: str, allowed: List[str], vegan_guard: bool) -> Optional[str]:
    """Requires `_call_gpt` (defined in Utilities/Cell 5). Accepts only an allowed label."""
    if "_call_gpt" not in globals():
        return None
    used = [c for c in allowed if not (_is_dairy(c) and vegan_guard)] or allowed
    if not used:
        return None
    allowed_map = {c.lower(): c for c in used}

    system_msg = (
        "You classify one ingredient into exactly one category from a provided list. "
        "Reply with ONLY the chosen category string—no extra words."
    )
    guard_msg = (
        "Do NOT choose any dairy categories (milk, cheese, butter, cream, yogurt, whey, casein); "
        "the product indicates vegan/plant-based/non-dairy."
        if vegan_guard else ""
    )
    user_msg = (
        "Choose exactly ONE category from the list that best matches the ingredient.\n"
        "Rules:\n"
        "  • Reply with ONLY the chosen category string (character-for-character).\n"
        "  • If none is perfect, choose the closest reasonable option from the list.\n"
        f"  • {guard_msg}\n\n"
        f"Ingredient: {ingredient}\n"
        f"Product context: {product}\n\n"
        "Allowed categories:\n" + "\n".join(f"- {c}" for c in used)
    )
    try:
        out = _call_gpt(
            messages=[{"role":"system","content":system_msg},
                      {"role":"user","content":user_msg}],
            temperature=0
        )
    except Exception:
        return None

    if not out:
        return None
    reply = out.strip().strip('"').strip("'")
    return allowed_map.get(reply.lower())  # None if not exact allowed label

# ----------------------------
# Main: Stage 2
# ----------------------------
def run_stage_2_base(base: str, factors_csv: Optional[str] = None):
    with log_time("S2.0 load ingredients"):
        ing_path = output_dir / Path(f"ingredients_{base}.csv")
        df = pd.read_csv(ing_path)

        # Ensure 'product' and 'row_sheet'
        if "product" not in df.columns and "product_text" in df.columns:
            df = df.rename(columns={"product_text":"product"})
        if "product" not in df.columns:
            # last resort fallback to first column
            df["product"] = df.iloc[:,0].astype(str)
        df["product"] = df["product"].astype(str)

        if "row_sheet" not in df.columns:
            df = df.reset_index(drop=False).rename(columns={"index":"__row"})
            row_offset = int(os.getenv("FI_ROW_OFFSET","2"))
            df["row_sheet"] = df["__row"].astype(int) + row_offset

        # Normalize once
        df["ingredient"] = df["ingredient"].astype(str)
        df["ingredient_key"] = df["ingredient"].map(_norm)
        df["product_key"]    = df["product"].map(_norm)

        print(f"[S2] rows={len(df):,}  unique (ingredient,product)={df[['ingredient_key','product_key']].drop_duplicates().shape[0]:,}")

    with log_time("S2.1 allowed & maps"):
        allowed, allowed_key2disp = _load_allowed_categories(factors_csv)
        allowed_norm = list(allowed_key2disp.keys())
        # Optional curated exact map
        global_map = {}
        if "CATEGORY_MAP" in globals():
            try:
                global_map = {_norm(k): v for k, v in dict(CATEGORY_MAP).items()}
            except Exception:
                pass

    with log_time("S2.2 load caches"):
        json_cache_path = Path("foodcategories.json")
        json_map = _load_foodcats_map(json_cache_path)  # {ingredient_key -> category}
        csv_cache_path  = Path(f"category_cache_{base}.csv")
        run_cache = _load_run_cache(csv_cache_path)
        appended_json_total = 0

    # Work copy
    df_cat = df.copy()
    df_cat["category"] = pd.NA
    df_cat["match_method"] = pd.NA

    # Layer 1: exact CATEGORY_MAP (curated)
    with log_time("S2.3 exact CATEGORY_MAP"):
        if global_map:
            hit = df_cat["ingredient_key"].map(global_map)
            m = hit.notna()
            df_cat.loc[m, "category"] = hit[m]
            df_cat.loc[m, "match_method"] = "map_exact"

    # Layer 2: resolve_exact_category (if provided), on unique unknowns
    with log_time("S2.3b resolve_exact_category"):
        if "resolve_exact_category" in globals():
            mask = df_cat["category"].isna()
            uniq = df_cat.loc[mask, "ingredient_key"].drop_duplicates()
            if not uniq.empty:
                res = {k: resolve_exact_category(k) for k in uniq}
                hit = df_cat["ingredient_key"].map(res)
                m = mask & hit.notna()
                df_cat.loc[m, "category"] = hit[m]
                df_cat.loc[m, "match_method"] = "map_resolve"

    # Layer 3: foodcategories.json dict (ingredient-only)
    with log_time("S2.4 json cache (ingredient only)"):
        if json_map:
            mask = df_cat["category"].isna()
            if mask.any():
                hit = df_cat.loc[mask, "ingredient_key"].map(json_map)
                m = hit.notna()
                idx = hit.index[m]
                df_cat.loc[idx, "category"] = hit.loc[idx]
                df_cat.loc[idx, "match_method"] = "cache_json"

    # Layer 4: plant milk heuristic
    with log_time("S2.5 milk heuristic"):
        mask = df_cat["category"].isna()
        if mask.any():
            cand = df_cat.loc[mask, "ingredient_key"].map(lambda k: _milk_heuristic(k, allowed_key2disp))
            m = cand.notna()
            idx = cand.index[m]
            df_cat.loc[idx, "category"] = cand.loc[idx]
            df_cat.loc[idx, "match_method"] = "milk_heuristic"

    # Layer 5: fuzzy to allowed (high cutoff), with vegan guard (dedup pairs)
    with log_time("S2.6 fuzzy (unique pairs)"):
        mask = df_cat["category"].isna()
        fuzzy_df = pd.DataFrame(columns=["ingredient_key","product_key","category","match_method"])
        if mask.any():
            pairs = df_cat.loc[mask, ["ingredient_key","product_key"]].drop_duplicates()
            vegan_map = {pk: _vegan_hint(pk) for pk in pairs["product_key"].unique()}
            rows = []
            for _, r in pairs.iterrows():
                ik, pk = r["ingredient_key"], r["product_key"]
                best_norm, score = _fuzzy_best(ik, allowed_norm)
                if best_norm and score >= FUZZY_CUTOFF:
                    cat = allowed_key2disp[best_norm]
                    if vegan_map.get(pk, False) and _is_dairy(cat):
                        continue
                    rows.append({"ingredient_key": ik, "product_key": pk, "category": cat, "match_method": "fuzzy"})
            if rows:
                fuzzy_df = pd.DataFrame(rows)
                df_cat = df_cat.merge(fuzzy_df, on=["ingredient_key","product_key"], how="left", suffixes=("","_fzy"))
                fill = df_cat["category"].isna() & df_cat["category_fzy"].notna()
                df_cat.loc[fill, "category"] = df_cat.loc[fill, "category_fzy"]
                df_cat.loc[fill, "match_method"] = "fuzzy"
                df_cat.drop(columns=[c for c in ("category_fzy","match_method_fzy") if c in df_cat.columns], inplace=True)
                # Persist fuzzy hits to the simple dict (ingredient-only)
                # Persist fuzzy hits to the simple dict (ingredient-only)
                add_fuzzy = (fuzzy_df[["ingredient_key","category"]]
                             .drop_duplicates("ingredient_key"))
                appended_json_total += _append_foodcats_map(json_cache_path, add_fuzzy)


    # Layer 6: per-run CSV cache
    with log_time("S2.7 run CSV cache"):
        if not run_cache.empty:
            cache_hit = df_cat[["ingredient_key","product_key"]].merge(run_cache, on=["ingredient_key","product_key"], how="left")
            has = df_cat["category"].isna() & cache_hit["category"].notna()
            df_cat.loc[has, "category"] = cache_hit.loc[has, "category"]
            df_cat.loc[has, "match_method"] = cache_hit.loc[has, "match_method"].fillna("gpt_cached")

    # Layer 7: GPT fallback (dedup pairs; strict to allowed)
    with log_time("S2.8 GPT (unique unknown pairs; constrained)"):
        need = df_cat["category"].isna() | df_cat["category"].eq("unknown")
        if need.any():
            if "_call_gpt" not in globals():
                print("[S2.8] WARNING: _call_gpt not defined; skipping GPT fallback.")
                gpt_df = pd.DataFrame(columns=["ingredient_key","product_key","category","match_method"])
            else:
                pairs = df_cat.loc[need, ["ingredient_key","product_key"]].drop_duplicates()
                allowed_list = list(allowed_key2disp.values())
    
                rows = []
                for _, r in pairs.iterrows():
                    ik, pk = r["ingredient_key"], r["product_key"]
                    cat = _call_gpt_categorize(ik, pk, allowed_list, vegan_guard=_vegan_hint(pk))
                    if cat:
                        rows.append({
                            "ingredient_key": ik,
                            "product_key": pk,
                            "category": cat,
                            "match_method": "gpt"
                        })

                # ... after building gpt_df ...
                gpt_df = pd.DataFrame(rows, columns=["ingredient_key","product_key","category","match_method"])
                
                # --- After GPT: persist to dict cache (append-only), skipping 'unknown' ---
                if not gpt_df.empty:
                    _update_run_cache(csv_cache_path, gpt_df)  # keep your per-run cache
                    add_gpt = (gpt_df[gpt_df["category"].astype(str).str.lower() != "unknown"]
                               [["ingredient_key","category"]]
                               .drop_duplicates("ingredient_key"))
                    # If you added the run counter, capture how many were appended:
                    try:
                        appended_json_total += _append_foodcats_map(json_cache_path, add_gpt)
                    except NameError:
                        _append_foodcats_map(json_cache_path, add_gpt)
                # -------------------------------------------------------------------------
                
                # Keep your existing merge-back block unchanged:
                if not gpt_df.empty:
                    df_cat = df_cat.merge(
                        gpt_df, on=["ingredient_key","product_key"], how="left", suffixes=("","_gpt")
                    )
                    fill = (df_cat["category"].isna() | df_cat["category"].eq("unknown")) & df_cat["category_gpt"].notna()
                    df_cat.loc[fill, "category"] = df_cat.loc[fill, "category_gpt"]
                    df_cat.loc[fill, "match_method"] = "gpt"
                    df_cat.drop(columns=[c for c in ("category_gpt","match_method_gpt") if c in df_cat.columns], inplace=True)

    # Finalize unknowns
    with log_time("S2.9 finalize unknowns"):
        still = df_cat["category"].isna()
        df_cat.loc[still, "category"] = "unknown"
        df_cat.loc[still, "match_method"] = df_cat.loc[still, "match_method"].fillna("unknown")

    # ----------------- Outputs (preserve row_sheet; sorted) -----------------
    with log_time("S2.A write outputs"):

        # Categories file: keep key columns up front; keep the rest as-is
        front = [c for c in ["row_sheet","product","qty","ingredient","percent","category","match_method"] if c in df_cat.columns]
        rest  = [c for c in df_cat.columns if c not in front and c not in ("ingredient_key","product_key")]
        df_out = df_cat[front + rest].copy()
        if "row_sheet" in df_out.columns:
            df_out["row_sheet"] = pd.to_numeric(df_out["row_sheet"], errors="coerce")
            df_out = df_out.sort_values(["row_sheet","product","ingredient"], kind="stable")
        out_path = output_dir / Path(f"categories_{base}.csv")
        df_out.to_csv(out_path, index=False)

    with log_time("S2.B summary"):
        print(f"[Stage 2] Wrote: {out_path.name} ({len(df_out):,} rows)")
        if 'appended_json_total' in locals():
            print(f"[S2.cache] foodcategories.json appended {appended_json_total} new key(s) this run.")

    with log_time("S2.Z cleanup"):
        try:
            if csv_cache_path.exists():
                csv_cache_path.unlink()
                print(f"[S2.cache] Deleted per-run cache {csv_cache_path.name}")
        except Exception as e:
            print(f"[S2.cache] WARN: could not delete {csv_cache_path.name}: {e}")

    return df_out

# --- merged categories column order for run_stage_2 (v100f) ---
from pathlib import Path
import pandas as pd
def run_stage_2(*args, **kwargs):
    # Policy (v100g):
    # - Reorders the categories DataFrame so 'category' appears immediately after 'ingredient'.
    # - Persists that order to categories_{BASE}.csv when base is available.
    df = run_stage_2_base(*args, **kwargs)
    try:
        cols = list(df.columns)
        if "ingredient" in cols and "category" in cols:
            new_cols = []
            for c in cols:
                if c == "ingredient":
                    new_cols.append("ingredient"); new_cols.append("category")
                elif c == "category":
                    continue
                else:
                    new_cols.append(c)
            df = df[new_cols]
        base = kwargs.get("base") or kwargs.get("base_filename")
        if base is None and len(args) >= 1:
            base = args[0]
        if base:
            df.to_csv(output_dir / Path(f"categories_{base}.csv"), index=False)
    except Exception as e:
        print(f"[v100f] Could not reorder/persist categories output: {e}")
    return df


In [None]:
# === Cell 8 — Stage 3: Impact Calculation ===
# Purpose: Quantify environmental and animal-welfare impacts for each categorized ingredient.
# Inputs:
#   - categories_{BASE}.csv (Stage 2 output)
#   - factors.csv (impact conversion factors)
#   - optional CATEGORY_WEIGHT_MULTIPLIERS (from config)
# Outputs:
#   - impacts_metric_{BASE}.csv — impacts per kg, m², L, etc.
#   - impacts_lbs_{BASE}.csv — impacts per lb, ft², gal, etc.
# Behavior:
#   - Joins category data with factors table.
#   - Applies unit conversions and category multipliers.
#   - Aggregates impacts by product and category.
# Notes:
#   - Requires that categories_{BASE}.csv exists (raises FileNotFoundError otherwise).
#   - Respects USE_CATEGORY_WEIGHT_MULTIPLIERS flag.

from pathlib import Path
import pandas as pd
import numpy as np
import re  # (added) for species pattern matching in bone-in shim

# ---- Category weight multipliers (toggle + map) ----
# Multiplies weights by a category-specific factor in addition to any existing logic.
# set in Cell 1 not here: USE_CATEGORY_WEIGHT_MULTIPLIERS = True or set False to disable

# Keys are matched using the same normalization your cell uses (via _norm at call-time).

USE_CATEGORY_WEIGHT_MULTIPLIERS = bool(globals().get("USE_CATEGORY_WEIGHT_MULTIPLIERS", False))
CATEGORY_WEIGHT_MULTIPLIERS = globals().get("CATEGORY_WEIGHT_MULTIPLIERS", {})  # empty if not set

def _mult_for_category(cat: str) -> float:
    if not USE_CATEGORY_WEIGHT_MULTIPLIERS:
        return 1.0
    try:
        key = _norm(cat)  # uses your notebook normalizer if present
    except Exception:
        key = " ".join(str(cat).strip().lower().split())
    return CATEGORY_WEIGHT_MULTIPLIERS.get(key, 1.0)


def calculate_impacts(df_cat: pd.DataFrame, factors_csv: str, base: str):
    """
    Inputs:
      df_cat: categories_{BASE}.csv already loaded into a DataFrame
      factors_csv: path to factors.csv
      base: BASE string for file naming

    Outputs:
      Writes impacts_metric_{BASE}.csv and impacts_lbs_{BASE}.csv
      Returns (df_lbs_out, df_metric_out) exactly as written to disk (formatted strings)
    """
    LB_TO_KG = 0.45359237
    KG_TO_LB = 1.0 / LB_TO_KG

    # ---- Helpers ----
    def _norm(x: str) -> str:
        try:
            return norm_text(x)  # use your notebook's normalizer if present
        except Exception:
            s = str(x).strip().lower()
            s = " ".join(s.split())
            return s

    # ---- Load/normalize factors ----
    fac = pd.read_csv(factors_csv)
    if "category" not in fac.columns:
        fac = fac.rename(columns={fac.columns[0]: "category"})
    # Coerce factor columns to numeric, create missing as zeros
    need_cols = ["kcal","co2","carbon_opp","d_lives","t_lives","water","water_us","land_metric","land_acres","eutro"]
    for c in need_cols:
        if c not in fac.columns:
            fac[c] = 0.0
        fac[c] = pd.to_numeric(fac[c], errors="coerce").fillna(0.0)

    # Build normalized keys for merge
    fac = fac.copy()
    fac["__category_key"] = fac["category"].astype(str).map(_norm)

    # ---- Normalize df_cat columns and weights ----
    cats = df_cat.copy()
    cols = {c.lower(): c for c in cats.columns}

    if "category" not in cols:
        raise KeyError("categories file is missing a 'category' column.")
    cat_col = cols["category"]
    cats["__category_key"] = cats[cat_col].astype(str).map(_norm)

    # weight in lbs priority: ingredient_weight_lbs > weight_lbs > product_weight_lbs * percent
    def _colnum(name):
        return pd.to_numeric(cats[name], errors="coerce") if name in cats.columns else None

    wlbs = None
    if "ingredient_weight_lbs" in cols:
        wlbs = _colnum(cols["ingredient_weight_lbs"])
    if wlbs is None or wlbs.isna().all():
        if "weight_lbs" in cols:
            wlbs = _colnum(cols["weight_lbs"])
    if wlbs is None or wlbs.isna().all():
        if ("product_weight_lbs" in cols) and (("percent" in cols) or ("pct" in cols)):
            pct_col = cols.get("percent") or cols.get("pct")
            p_wlbs = _colnum(cols["product_weight_lbs"])
            pct = pd.to_numeric(cats[pct_col], errors="coerce")
            wlbs = p_wlbs * (pct / 100.0)

    if wlbs is None:
        raise KeyError("No usable weight_lbs found (need ingredient_weight_lbs OR weight_lbs OR product_weight_lbs+percent).")

    # ======= BEGIN bone-in edible-weight shim (minimal change; everything else stays v90) =======
    # Ensure the flag exists
    if "bone_in" not in cats.columns:
        cats["bone_in"] = False

    # Species patterns + yields
    EDIBLE_YIELD_DEFAULTS = {
        ("chicken", False): 1,   ("chicken", True): 0.7708,
        ("turkey",  False): 1,   ("turkey",  True): 0.7708,
        ("beef",    False): 1,   ("beef",    True): 0.8667,
        ("pork",    False): 1,   ("pork",    True): 0.8784,
        ("lamb",    False): 1,   ("lamb",    True): 0.6643,
        ("fish",    False): 1,   ("fish",    True): 0.57,  # whole/specimen fish often ~0.5–0.65 edible
    }
    _SPECIES_PATTERNS = [
        ("chicken", re.compile(r"\bchicken\b", re.I)),
        ("turkey",  re.compile(r"\bturkey\b",  re.I)),
        ("beef",    re.compile(r"\bbeef|bovine|cow\b", re.I)),
        ("pork",    re.compile(r"\bpork|swine\b", re.I)),
        ("lamb",    re.compile(r"\blamb|mutton\b", re.I)),
        ("fish",    re.compile(
            r"\bfish|salmon|tuna|cod|haddock|pollock|anchovy|sardine|halibut|mahi|"
            r"catfish|tilapia|trout|bass|redfish|snapper|hake|sole|flounder|"
            r"mackerel|herring\b", re.I)),
    ]
    def _species_key(cat_str: str):
        c = str(cat_str or "")
        for species, rx in _SPECIES_PATTERNS:
            if rx.search(c):
                return species
        return None
    def _yield_for_row(cat_str: str, bone_in_flag: bool) -> float:
        sp = _species_key(cat_str)
        if sp is None:
            return 1.0
        return EDIBLE_YIELD_DEFAULTS.get((sp, bool(bone_in_flag)), 1.0)

    # Apply edible yield row-by-row
    # Vectorized edible-yield using existing _SPECIES_PATTERNS + EDIBLE_YIELD_DEFAULTS
    cat_txt = cats[cat_col].astype(str).str.lower()
#    bone = cats["bone_in"].fillna(False).astype(bool) if "bone_in" in cats.columns else pd.Series(False, index=cats.index)
    bone = (
        cats["bone_in"].astype(str).str.strip().str.lower().eq("true")
    ) if "bone_in" in cats.columns else pd.Series(False, index=cats.index)

    
    # Assign first-matching species according to the order in _SPECIES_PATTERNS
    species = pd.Series(pd.NA, index=cats.index, dtype="object")
    for sp, rx in _SPECIES_PATTERNS:  # reuses your compiled patterns
        m = cat_txt.str.contains(rx)
        species.loc[species.isna() & m] = sp
    
    # Build yield maps from the single source of truth (no hardcoded numbers here)
    boneless_map = {sp: y for (sp, b), y in EDIBLE_YIELD_DEFAULTS.items() if not b}
    bone_map     = {sp: y for (sp, b), y in EDIBLE_YIELD_DEFAULTS.items() if b}
    
    yield_boneless = species.map(boneless_map).fillna(1.0)
    yield_bone     = species.map(bone_map)
    
    # Choose bone-in yield when bone=True, otherwise boneless; default to 1.0
    edible_yield = yield_boneless.where(~bone, yield_bone).fillna(yield_boneless)
    

    edible_lbs = pd.to_numeric(wlbs, errors="coerce").fillna(0.0) * edible_yield.fillna(1.0)
    edible_kg  = edible_lbs * LB_TO_KG
    # ======= END bone-in shim =======

    # Store per-row edible weights under the same internal names v90 expects
    cats["__weight_lbs"] = edible_lbs
    cats["__weight_kg"]  = edible_kg

    # ---- Apply category multipliers to produce/extend effective weights ----
    # Start from existing effective weights if your bone-in/edible-yield logic already created them,
    # otherwise start from the base weights. We only apply the category multiplier here to avoid
    # touching your existing edible-yield math.
    base_lbs = cats["__weight_lbs_eff"] if "__weight_lbs_eff" in cats.columns else cats["__weight_lbs"]
    base_kg  = cats["__weight_kg_eff"]  if "__weight_kg_eff"  in cats.columns else cats["__weight_kg"]
    
    # Per-row category multiplier
    cat_mult = cats["category"].astype(str).map(_mult_for_category) if "category" in cats.columns else \
               cats[cat_col].astype(str).map(_mult_for_category)
    
    cats["__weight_lbs_eff"] = pd.to_numeric(base_lbs, errors="coerce").fillna(0.0) * cat_mult
    cats["__weight_kg_eff"]  = pd.to_numeric(base_kg,  errors="coerce").fillna(0.0) * cat_mult


    # ---- DEBUG: are edible weights changing? ----
    try:
        base_sum = float(pd.to_numeric(wlbs, errors="coerce").fillna(0).sum())
    except Exception:
        base_sum = float("nan")
    edible_sum = float(pd.to_numeric(cats["__weight_lbs"], errors="coerce").fillna(0).sum())
    print(f"[debug] base lbs sum={base_sum:,.2f}   edible lbs sum={edible_sum:,.2f}   delta={edible_sum - base_sum:,.2f}")
    
    # Check a few chicken rows and their yields/flags
    chix = cats[cats[cat_col].astype(str).str.contains("chicken", case=False, regex=True)]
    print(f"[debug] chicken rows={len(chix)}   bone_in True count={int(chix.get('bone_in', pd.Series()).isin([True,'True',1,'1']).sum())}")
    print(chix[[cat_col, "bone_in", "__weight_lbs"]].head(8).to_string(index=False))

    # ---- Group by category ----
    grp = (
        cats.groupby("__category_key", as_index=False)
            .agg(total_weight_lbs=("__weight_lbs_eff", "sum"),
                 total_weight_kg=("__weight_kg_eff",  "sum"))
    )


    # ---- Merge display names (majority label per key) ----
    disp = (
        cats.assign(__category_disp=cats[cat_col].astype(str))
            .groupby("__category_key")["__category_disp"]
            .agg(lambda s: s.value_counts(dropna=False).idxmax())
            .reset_index()
    )
    out = grp.merge(disp, on="__category_key", how="left")
    out.rename(columns={"__category_disp":"category"}, inplace=True)

    # ---- Merge factors ----
    out = out.merge(fac, on="__category_key", how="left", suffixes=("","_f"))

    # ---- Compute impacts (unit aware; see your factor definitions) ----
    # Metric-side (per kg factors)
    out["kcal"]         = out["kcal"] * out["total_weight_kg"]
    out["co2_kg"]       = out["co2"] * out["total_weight_kg"]
    out["carbon_opp_kg"]   = out["carbon_opp"] * out["total_weight_kg"]
    out["water_liters"] = out["water"] * out["total_weight_kg"]
    out["land_sqm"]     = out["land_metric"] * out["total_weight_kg"]
    out["eutro_g"]      = out["eutro"] * out["total_weight_kg"]

    # Imperial-side composites
    out["kcal_lbs"]         = out["kcal"]  # same scalar value, separate column for selection
    out["co2_lbs"]          = out["co2_kg"] * KG_TO_LB
    out["carbon_opp_lbs"]   = out["carbon_opp_kg"] * KG_TO_LB
    # water: prefer water_us (gal/lb) when present (>0 somewhere), else liters→gal
    use_gal = (out["water_us"] > 0).any()
    out["water_gal"]        = np.where(use_gal,
                                       out["water_us"] * out["total_weight_lbs"],
                                       out["water_liters"] * 0.264172)
    # land: prefer land_acres (acres/lb) when present (>0 somewhere), else sqm→sqft
    use_acres = (out["land_acres"] > 0).any()
    out["land_sqft"]        = np.where(use_acres,
                                       out["land_acres"] * out["total_weight_lbs"] * 43560.0,
                                       out["land_sqm"] * 10.7639)
    out["eutro_lb"]         = out["eutro_g"] / 453.59237

    # Lives (per lb factors)
    out["d_lives_imp"] = out["d_lives"] * out["total_weight_lbs"]
    out["t_lives_imp"] = out["t_lives"] * out["total_weight_lbs"]

    # ---- Build outputs (pre-format numeric frames) ----
    metric = out[[
        "category","total_weight_kg","kcal","co2_kg","carbon_opp_kg","d_lives_imp","t_lives_imp","water_liters","land_sqm","eutro_g"
    ]].rename(columns={"d_lives_imp":"d_lives","t_lives_imp":"t_lives"}).copy()

    lbs = out[[
        "category","total_weight_lbs","kcal_lbs","co2_lbs","carbon_opp_lbs","d_lives_imp","t_lives_imp","water_gal","land_sqft","eutro_lb"
    ]].rename(columns={"kcal_lbs":"kcal","d_lives_imp":"d_lives","t_lives_imp":"t_lives"}).copy()

    # ---- Add totals (Dairy & All) BEFORE formatting ----
    # Dairy set (normalized)
    DAIRY_CATEGORIES = {
        "butter","dairy cheese","cream","dairy milk","buttermilk","ice cream","low fat yogurt",
        "milk powder","yogurt","concentrated milk","ghee","lactose powder","skim milk","whey powder"
    }
    metric["__key"] = metric["category"].map(_norm)
    lbs["__key"]    = lbs["category"].map(_norm)

    is_dairy = metric["__key"].isin(DAIRY_CATEGORIES)

    def _sum_rows(df):
        return {
            "total_weight": pd.to_numeric(df.filter(like="total_weight").sum(numeric_only=True), errors="coerce").max(skipna=True),
            "kcal": df["kcal"].sum(skipna=True),
            "co2":  (df["co2_kg"].sum(skipna=True) if "co2_kg" in df.columns else df["co2_lbs"].sum(skipna=True)),
            "carbon_opp": (df["carbon_opp_kg"].sum(skipna=True) if "carbon_opp_kg" in df.columns else df["carbon_opp_lbs"].sum(skipna=True)),
            "d_lives": df["d_lives"].sum(skipna=True),
            "t_lives": df["t_lives"].sum(skipna=True),
            "water": df.filter(regex=r"^water_").sum(numeric_only=True).max(skipna=True),
            "land": df.filter(regex=r"^land_").sum(numeric_only=True).max(skipna=True),
            "eutro": df.filter(regex=r"^eutro_").sum(numeric_only=True).max(skipna=True),
        }

    # Metric totals
    m_d = metric[is_dairy]
    m_a = metric
    m_d_s = _sum_rows(m_d)
    m_a_s = _sum_rows(m_a)

    metric_totals = pd.DataFrame([
        {"category":"TOTAL — Dairy",
         "total_weight_kg": m_d["total_weight_kg"].sum(skipna=True),
         "kcal": m_d_s["kcal"], "co2_kg": m_d_s["co2"], "carbon_opp_kg": m_d_s["carbon_opp"],
         "d_lives": m_d_s["d_lives"], "t_lives": m_d_s["t_lives"],
         "water_liters": m_d_s["water"], "land_sqm": m_d_s["land"], "eutro_g": m_d_s["eutro"]},
        {"category":"TOTAL — All",
         "total_weight_kg": m_a["total_weight_kg"].sum(skipna=True),
         "kcal": m_a_s["kcal"], "co2_kg": m_a_s["co2"], "carbon_opp_kg": m_a_s["carbon_opp"],
         "d_lives": m_a_s["d_lives"], "t_lives": m_a_s["t_lives"],
         "water_liters": m_a_s["water"], "land_sqm": m_a_s["land"], "eutro_g": m_a_s["eutro"]},
    ])

    metric = pd.concat([metric.drop(columns=["__key"]), metric_totals], ignore_index=True)

    # LBS totals
    l_d = lbs[lbs["__key"].isin(DAIRY_CATEGORIES)]
    l_a = lbs
    l_d_s = _sum_rows(l_d)
    l_a_s = _sum_rows(l_a)

    lbs_totals = pd.DataFrame([
        {"category":"TOTAL — Dairy",
         "total_weight_lbs": l_d["total_weight_lbs"].sum(skipna=True),
         "kcal": l_d_s["kcal"], "co2_lbs": l_d_s["co2"], "carbon_opp_lbs": l_d_s["carbon_opp"],
         "d_lives": l_d_s["d_lives"], "t_lives": l_d_s["t_lives"],
         "water_gal": l_d_s["water"], "land_sqft": l_d_s["land"], "eutro_lb": l_d_s["eutro"]},
        {"category":"TOTAL — All",
         "total_weight_lbs": l_a["total_weight_lbs"].sum(skipna=True),
         "kcal": l_a_s["kcal"], "co2_lbs": l_a_s["co2"], "carbon_opp_lbs": l_a_s["carbon_opp"],
         "d_lives": l_a_s["d_lives"], "t_lives": l_a_s["t_lives"],
         "water_gal": l_a_s["water"], "land_sqft": l_a_s["land"], "eutro_lb": l_a_s["eutro"]},
    ])

    lbs = pd.concat([lbs.drop(columns=["__key"]), lbs_totals], ignore_index=True)

    def _round_numeric_only(df: pd.DataFrame, decimals: int = 6) -> pd.DataFrame:
        """Return a copy of df with numeric columns rounded; non-numeric columns unchanged."""
        df_rounded = df.copy()
        numeric_cols = df_rounded.select_dtypes(include=["number"]).columns
        df_rounded[numeric_cols] = df_rounded[numeric_cols].round(decimals)
        return df_rounded
    
    metric = _round_numeric_only(metric, 6)
    lbs    = _round_numeric_only(lbs,    6)

    # ---- Strict formatting (write numbers as strings with exact decimals) ----
    metric_fmt = {
        "total_weight_kg": "{:.0f}",
        "kcal":            "{:.0f}",
        "co2_kg":          "{:.0f}",
        "carbon_opp_kg":      "{:.0f}",
        "d_lives":         "{:.1f}",
        "t_lives":         "{:.1f}",
        "water_liters":    "{:.0f}",
        "land_sqm":        "{:.0f}",
        "eutro_g":         "{:.0f}",
    }
    lbs_fmt = {
        "total_weight_lbs": "{:.0f}",
        "kcal":             "{:.0f}",
        "co2_lbs":          "{:.0f}",
        "carbon_opp_lbs":   "{:.0f}",
        "d_lives":          "{:.1f}",
        "t_lives":          "{:.1f}",
        "water_gal":        "{:.0f}",
        "land_sqft":        "{:.0f}",
        "eutro_lb":         "{:.0f}",
    }

    def _apply_fmt_strict(df: pd.DataFrame, spec: dict) -> pd.DataFrame:
        out = df.copy()
        for col, fmt in spec.items():
            if col in out.columns:
                vals = pd.to_numeric(out[col], errors="coerce")
                out[col] = vals.map(lambda x: "" if pd.isna(x) else fmt.format(x)).astype("object")
        return out

    metric_cols = ["category","total_weight_kg","kcal","co2_kg","carbon_opp_kg","d_lives","t_lives","water_liters","land_sqm","eutro_g"]
    lbs_cols    = ["category","total_weight_lbs","kcal","co2_lbs","carbon_opp_lbs","d_lives","t_lives","water_gal","land_sqft","eutro_lb"]

    metric = metric[metric_cols]
    lbs    = lbs[lbs_cols]

    metric_out = _apply_fmt_strict(metric, metric_fmt)
    lbs_out    = _apply_fmt_strict(lbs,    lbs_fmt)

    metric_out.to_csv(output_dir / f"impacts_metric_{base}.csv", index=False, encoding="utf-8")
    lbs_out.to_csv(output_dir / f"impacts_lbs_{base}.csv", index=False, encoding="utf-8")
    print(f"[impacts] wrote impacts_metric_{base}.csv and impacts_lbs_{base}.csv")

    return lbs_out, metric_out


# --- If your runner calls this function, keep it as-is ---
def run_stage_3(base_filename: str, factors_filename: str):
    cats_path = output_dir / Path(f"categories_{base_filename}.csv")
    df_cat = pd.read_csv(cats_path)
    return calculate_impacts(df_cat, factors_filename, base_filename)


In [None]:
# === Cell 9 — Runner / Workflow Controller ===
# Purpose: Orchestrate execution of all pipeline stages based on START_MODE.
# Inputs:
#   - input_filename and configuration from Cell 1
#   - intermediate files (ingredients_, categories_)
# Dependencies:
#   - run_stage_1, run_stage_2, run_stage_3 functions
# Outputs:
#   - Console/log summary of executed stages
# Behavior:
#   - START_MODE='full' → runs all stages sequentially
#   - START_MODE='after_ingredients' → skips Stage 1
#   - START_MODE='after_categories' → skips Stage 1 and 2
#   - Uses file_type to route to correct Stage 1 function
# Error Handling:
#   - Raises clear messages if required input files or functions are missing.

from pathlib import Path
import sys
import pandas as pd
import inspect  # for safe routing to type03 function

def _die(msg: str):
    print(f"[ERROR] {msg}")
    raise SystemExit("Fix errors above and re-run this cell.")

# --- Normalize/ensure BASE & base_filename consistency
try:
    input_filename
except NameError:
    _die("`input_filename` is not defined in the config cell.")
try:
    base_filename
except NameError:
    base_filename = Path(input_filename).stem
BASE = Path(input_filename).stem

# --- Ensure file_type is resolved (handle 'auto' here before any Stage 1 routing)
try:
    _ft_lower = str(file_type).lower()
except NameError:
    _die("'file_type' is not defined. Run the config/setup cells first.")

if _ft_lower == "auto":
    # We rely on Cell 3's helpers:
    if 'safe_read_csv' not in globals() or 'autodetect_type' not in globals():
        _die("Cannot resolve file_type='auto' because safe_read_csv/autodetect_type are undefined. Run Cell 3.")
    _raw_for_detect = safe_read_csv(input_filename)
    file_type = autodetect_type(_raw_for_detect)
    print(f"[runner] file_type resolved from 'auto' → '{file_type}'")

# --- Provide robust fallbacks for run_stage_1 / run_stage_2 / run_stage_3
if 'run_stage_1' not in globals():
    def run_stage_1(input_filename: str, file_type: str, base_filename: str):
    # Policy (v100g):
    # - Within the SAME kernel session, skip re-running Stage 1 if ingredients_{BASE}.csv was already created.
    # - On a NEW session (after kernel restart), allow overwriting older outputs.
    # - If 'qty' exists in the returned DataFrame, ensure it is preserved to disk.
        if 'load_and_standardize' not in globals():
            _die("`load_and_standardize` is not defined. Ensure the standardization cell ran.")
        if 'extract_ingredients' not in globals():
            _die("`extract_ingredients` is not defined. Ensure the ingredient extraction cell ran.")
        std_df = load_and_standardize(input_filename, file_type)
        ing_df = extract_ingredients(std_df, base_filename)
        return ing_df

if 'run_stage_2' not in globals():
    def run_stage_2(base: str):
        ing_path = output_dir / Path(f"ingredients_{base}.csv")
        if not ing_path.exists():
            _die(f"`{ing_path.name}` not found. Run Stage 1 first or check `base_filename`/`BASE`.")
        df_ing = pd.read_csv(ing_path)
        if 'categorize_ingredients' not in globals():
            _die("`categorize_ingredients` is not defined. Ensure the categorization cell ran.")
        df_cat = categorize_ingredients(df_ing, base)
        return df_cat

if 'run_stage_3' not in globals():
    def run_stage_3(base: str, factors_csv: str):
        cats_path = output_dir / Path(f"categories_{base}.csv")
        if not cats_path.exists():
            _die(f"`{cats_path.name}` not found. Run Stage 2 first or check `base_filename`/`BASE`.")
        df_cat = pd.read_csv(cats_path)
        if 'calculate_impacts' not in globals():
            _die("`calculate_impacts` is not defined. Ensure the impacts cell ran.")
        return calculate_impacts(df_cat, factors_csv, base)

# --- Strict Stage 1 router (type03 requires Cell 5's GPT path)
def run_stage_1_router(input_filename: str, file_type: str, base_filename: str):
    ft = (file_type or "").lower()
    if ft == "type03":
        if 'run_stage_1_type03_gpt' not in globals():
            _die("type03 file but Cell 5 function not available. Run Cell 5 first.")
        fn = globals()['run_stage_1_type03_gpt']
        try:
            nparams = len(inspect.signature(fn).parameters)
        except Exception:
            nparams = 0
        print("[runner] Stage 1: using type03 GPT path (Cell 5).")
        return fn(input_filename, file_type, base_filename) if nparams >= 3 else fn(input_filename, base_filename)
    print("[runner] Stage 1: using standard path.")
    return run_stage_1(input_filename, file_type, base_filename)

# --- Banner
print("FoodImpacts — runner")
print(f"  input_filename: {Path(input_filename).resolve()}")
print(f"  file_type: {file_type}")
print(f"  base_filename: {base_filename}")
print(f"  CATEGORY_SOURCE: {CATEGORY_SOURCE if 'CATEGORY_SOURCE' in globals() else '(unspecified)'}")
print(f"  START_MODE: {START_MODE}")

# --- Execute requested start mode
if START_MODE == "full":
    print("[runner] Stage 1: force recompute ingredients (will overwrite any existing ingredients file).")
    df_ing = run_stage_1_router(input_filename, file_type, base_filename)
    df_cat = run_stage_2(base_filename)
    df_imp_lbs, df_imp_metric = run_stage_3(base_filename, factors_filename)

elif START_MODE == "after_ingredients":
    df_cat = run_stage_2(base_filename)
    df_imp_lbs, df_imp_metric = run_stage_3(base_filename, factors_filename)

elif START_MODE == "after_categories":
    df_imp_lbs, df_imp_metric = run_stage_3(base_filename, factors_filename)

else:
    _die("START_MODE must be 'full', 'after_ingredients', or 'after_categories'")

print("\n[runner] Done.")