In [2]:




from __future__ import annotations

from dataclasses import dataclass
from math import erf, pi, sqrt
from typing import Dict, List, Tuple


SUPPLIER_SPECS = {
    "BBM": {
        "moisture_pct": 4.2,
        "fine_extract_db_pct": 82.0,
        "wort_pH": 5.98,
        "diastatic_power_WK": 342.1,
        "total_protein_pct": 10.12,
        "wort_colour_EBC": 3.8,
    },
    "COFCO": {
        "moisture_pct": 4.4,
        "fine_extract_db_pct": 81.8,
        "wort_pH": 5.93,
        "diastatic_power_WK": 317.4,
        "total_protein_pct": 11.1,
        "wort_colour_EBC": 4.0,
    },
    "Malteurop": {
        "moisture_pct": 4.3,
        "fine_extract_db_pct": 81.2,
        "wort_pH": 5.97,
        "diastatic_power_WK": 336.9,
        "total_protein_pct": 10.5,
        "wort_colour_EBC": 3.8,
    },
}


@dataclass(frozen=True)
class Material:
    """Bulk properties used in both Beverloo flow and geometric layer-height conversion."""
    rho_bulk_kg_m3: float
    grain_diameter_m: float


@dataclass(frozen=True)
class Silo:
    """Simple cylindrical silo geometry in SI units."""
    body_diameter_m: float
    outlet_diameter_m: float

    @property
    def cross_section_area_m2(self) -> float:
        return pi * (self.body_diameter_m / 2.0) ** 2


@dataclass(frozen=True)
class BeverlooParams:
    """Beverloo discharge constants."""
    C: float = 0.58
    k: float = 1.4
    g_m_s2: float = 9.81


@dataclass(frozen=True)
class Lot:
    """One lot layer with known supplier parameters and known loaded mass."""
    name: str
    supplier: str
    mass_kg: float
    params: Dict[str, float]


@dataclass(frozen=True)
class Result:
    discharge_mass_kg: float
    discharge_time_s: float
    mass_flow_rate_kg_s: float
    sigma_m: float
    lot_mass_kg: Dict[str, float]
    lot_pct: Dict[str, float]
    blended_params: Dict[str, float]


def normal_cdf(x: float) -> float:
    return 0.5 * (1.0 + erf(x / sqrt(2.0)))


def beverloo_mass_flow_rate_kg_s(
    silo: Silo, material: Material, bev: BeverlooParams
) -> float:
    """
    Stage 1: Beverloo equation ("clock").
    m_dot = C * rho_bulk * sqrt(g) * (D - k*d)^(5/2)
    """
    effective_diameter = silo.outlet_diameter_m - bev.k * material.grain_diameter_m
    if effective_diameter <= 0.0:
        raise ValueError("Invalid Beverloo geometry: D - k*d must be > 0.")
    return (
        bev.C
        * material.rho_bulk_kg_m3
        * sqrt(bev.g_m_s2)
        * (effective_diameter ** 2.5)
    )


def build_lot_intervals_m(
    lots: List[Lot], material: Material, silo: Silo
) -> Tuple[List[Tuple[Lot, float, float]], float]:
    """
    Stage 2 prep: Convert lot masses to vertical layer intervals [z0, z1] in meters.
    z=0 at outlet (bottom), increasing upward.
    """
    intervals: List[Tuple[Lot, float, float]] = []
    z = 0.0
    for lot in lots:
        height = lot.mass_kg / (material.rho_bulk_kg_m3 * silo.cross_section_area_m2)
        z0, z1 = z, z + height
        intervals.append((lot, z0, z1))
        z = z1
    return intervals, z


def layer_probabilities(
    z_front_m: float,
    sigma_m: float,
    intervals: List[Tuple[Lot, float, float]],
    total_height_m: float,
) -> Dict[str, float]:
    """
    Stage 2 core (Benyamine-style sampler):
    At each time, discharge samples material around z_front using Gaussian kernel.
    Probability for a lot = integral of kernel over that lot interval.
    Kernel is conditioned to [0, H] so probabilities sum to ~1 over in-silo material.
    """
    if sigma_m <= 0.0:
        raise ValueError("sigma_m must be > 0.")

    denom = normal_cdf((total_height_m - z_front_m) / sigma_m) - normal_cdf(
        (0.0 - z_front_m) / sigma_m
    )
    if denom <= 1e-15:
        return {lot.name: 0.0 for lot, _, _ in intervals}

    probs: Dict[str, float] = {}
    for lot, z0, z1 in intervals:
        p = normal_cdf((z1 - z_front_m) / sigma_m) - normal_cdf((z0 - z_front_m) / sigma_m)
        probs[lot.name] = max(0.0, p / denom)

    s = sum(probs.values())
    if s > 0:
        for k in probs:
            probs[k] /= s
    return probs


def estimate_lot_discharge_masses_kg(
    lots: List[Lot],
    discharge_mass_kg: float,
    material: Material,
    silo: Silo,
    m_dot_kg_s: float,
    sigma_m: float,
    steps: int = 2000,
) -> Dict[str, float]:
    """
    Stage 2 integration over time:
    - z_front(t) = m_removed(t) / (rho_bulk * A)
    - At each time midpoint, compute lot probabilities from Gaussian-over-layers.
    - Integrate dm = m_dot*dt across time to get x1, x2, x3.
    """
    total_mass = sum(l.mass_kg for l in lots)
    if discharge_mass_kg <= 0:
        raise ValueError("discharge_mass_kg must be > 0.")
    if discharge_mass_kg > total_mass:
        raise ValueError("discharge_mass_kg cannot exceed total silo mass.")
    if m_dot_kg_s <= 0:
        raise ValueError("m_dot_kg_s must be > 0.")

    intervals, total_height_m = build_lot_intervals_m(lots, material, silo)

    t_end = discharge_mass_kg / m_dot_kg_s
    dt = t_end / steps
    dm = m_dot_kg_s * dt  # equals discharge_mass_kg / steps
    lot_mass = {lot.name: 0.0 for lot in lots}

    for i in range(steps):
        t_mid = (i + 0.5) * dt
        m_removed = min(discharge_mass_kg, m_dot_kg_s * t_mid)
        z_front = m_removed / (material.rho_bulk_kg_m3 * silo.cross_section_area_m2)

        probs = layer_probabilities(z_front, sigma_m, intervals, total_height_m)
        for lot_name, p in probs.items():
            lot_mass[lot_name] += dm * p

    # Small numerical correction so sum(x_i) equals target discharge exactly.
    s = sum(lot_mass.values())
    if s > 0:
        scale = discharge_mass_kg / s
        for k in lot_mass:
            lot_mass[k] *= scale

    return lot_mass


def blend_parameters(
    lots: List[Lot],
    lot_mass_kg: Dict[str, float],
) -> Dict[str, float]:
    """Stage 3: Weighted average of malt quality parameters by estimated discharged masses."""
    if not lots:
        return {}

    param_keys = list(lots[0].params.keys())
    total = sum(lot_mass_kg.values())
    if total <= 0:
        raise ValueError("Total estimated lot mass must be > 0.")

    by_name = {l.name: l for l in lots}
    blended: Dict[str, float] = {}
    for p in param_keys:
        num = 0.0
        for lot_name, mass in lot_mass_kg.items():
            num += mass * by_name[lot_name].params[p]
        blended[p] = num / total
    return blended


def auto_adjust_sigma_for_multi_lot(
    lots: List[Lot],
    discharge_mass_kg: float,
    material: Material,
    silo: Silo,
    m_dot_kg_s: float,
    initial_mixing_zone_width_m: float,
    min_lot_mass_kg: float = 0.5,
    max_iterations: int = 12,
) -> Tuple[float, Dict[str, float]]:
    """
    Helper to guarantee at least two meaningful non-zero lot contributions.
    Deterministic rule: widen mixing zone by fixed factor until criterion is met.
    sigma is derived from mixing zone width via sigma = width / 2.
    """
    width = max(initial_mixing_zone_width_m, 1e-4)
    growth = 1.35

    final_sigma = width / 2.0
    final_masses = {}

    for _ in range(max_iterations):
        sigma = width / 2.0
        masses = estimate_lot_discharge_masses_kg(
            lots=lots,
            discharge_mass_kg=discharge_mass_kg,
            material=material,
            silo=silo,
            m_dot_kg_s=m_dot_kg_s,
            sigma_m=sigma,
        )
        contributors = sum(1 for v in masses.values() if v >= min_lot_mass_kg)

        final_sigma, final_masses = sigma, masses
        if contributors >= 2:
            break
        width *= growth

    return final_sigma, final_masses


def run_model(
    lots: List[Lot],
    discharge_mass_kg: float,
    silo: Silo,
    material: Material,
    bev: BeverlooParams,
    initial_mixing_zone_width_m: float,
) -> Result:
    m_dot = beverloo_mass_flow_rate_kg_s(silo, material, bev)
    discharge_time_s = discharge_mass_kg / m_dot

    sigma_m, lot_mass_kg = auto_adjust_sigma_for_multi_lot(
        lots=lots,
        discharge_mass_kg=discharge_mass_kg,
        material=material,
        silo=silo,
        m_dot_kg_s=m_dot,
        initial_mixing_zone_width_m=initial_mixing_zone_width_m,
    )

    total = sum(lot_mass_kg.values())
    lot_pct = {k: (100.0 * v / total if total > 0 else 0.0) for k, v in lot_mass_kg.items()}
    blended = blend_parameters(lots, lot_mass_kg)

    return Result(
        discharge_mass_kg=discharge_mass_kg,
        discharge_time_s=discharge_time_s,
        mass_flow_rate_kg_s=m_dot,
        sigma_m=sigma_m,
        lot_mass_kg=lot_mass_kg,
        lot_pct=lot_pct,
        blended_params=blended,
    )


def main() -> None:
    # Example setup with three layered lots from the required suppliers.
    # Order is bottom -> top exactly as requested.
    lots = [
        Lot(name="Lot1", supplier="BBM", mass_kg=1500.0, params=SUPPLIER_SPECS["BBM"]),
        Lot(name="Lot2", supplier="COFCO", mass_kg=1000.0, params=SUPPLIER_SPECS["COFCO"]),
        Lot(name="Lot3", supplier="Malteurop", mass_kg=1200.0, params=SUPPLIER_SPECS["Malteurop"]),
    ]

    # SI units only.
    material = Material(
        rho_bulk_kg_m3=610.0,     # typical malt bulk density
        grain_diameter_m=0.004,   # 4 mm characteristic grain diameter
    )
    silo = Silo(
        body_diameter_m=3.0,      # cylindrical body
        outlet_diameter_m=0.20,   # outlet opening
    )
    bev = BeverlooParams(C=0.58, k=1.4, g_m_s2=9.81)

    # Chosen so discharge can include at least two lots after sigma adjustment if needed.
    discharge_mass_kg = 2000.0
    initial_mixing_zone_width_m = 0.10

    result = run_model(
        lots=lots,
        discharge_mass_kg=discharge_mass_kg,
        silo=silo,
        material=material,
        bev=bev,
        initial_mixing_zone_width_m=initial_mixing_zone_width_m,
    )

    print(f"Discharge mass: {result.discharge_mass_kg:.1f} kg")
    print(f"Estimated mass flow rate (Beverloo): {result.mass_flow_rate_kg_s:.3f} kg/s")
    print(f"Estimated discharge time: {result.discharge_time_s:.1f} s")
    print(f"Sampler sigma used: {result.sigma_m:.4f} m")
    print("Estimated composition (by lot):")
    for lot in lots:
        m = result.lot_mass_kg[lot.name]
        p = result.lot_pct[lot.name]
        print(f"  {lot.name} ({lot.supplier}): {m:.2f} kg ({p:.2f}%)")

    print("Blended malt parameters:")
    for k, v in result.blended_params.items():
        print(f"  {k}: {v:.4f}")


if __name__ == "__main__":
    main()


Discharge mass: 2000.0 kg
Estimated mass flow rate (Beverloo): 18.464 kg/s
Estimated discharge time: 108.3 s
Sampler sigma used: 0.0500 m
Estimated composition (by lot):
  Lot1 (BBM): 1499.25 kg (74.96%)
  Lot2 (COFCO): 500.00 kg (25.00%)
  Lot3 (Malteurop): 0.75 kg (0.04%)
Blended malt parameters:
  moisture_pct: 4.2500
  fine_extract_db_pct: 81.9497
  wort_pH: 5.9675
  diastatic_power_WK: 335.9231
  total_protein_pct: 10.3651
  wort_colour_EBC: 3.8500


In [4]:
#Consider three silos and multiple lots

In [6]:
pip install pandas numpy matplotlib

Collecting pandas
  Using cached pandas-3.0.0-cp314-cp314-win_amd64.whl.metadata (19 kB)
Collecting numpy
  Using cached numpy-2.4.2-cp314-cp314-win_amd64.whl.metadata (6.6 kB)
Collecting matplotlib
  Downloading matplotlib-3.10.8-cp314-cp314-win_amd64.whl.metadata (52 kB)
Collecting tzdata (from pandas)
  Using cached tzdata-2025.3-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.3.3-cp314-cp314-win_amd64.whl.metadata (5.5 kB)
Collecting cycler>=0.10 (from matplotlib)
  Downloading cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB)
Collecting fonttools>=4.22.0 (from matplotlib)
  Downloading fonttools-4.61.1-cp314-cp314-win_amd64.whl.metadata (116 kB)
Collecting kiwisolver>=1.3.1 (from matplotlib)
  Downloading kiwisolver-1.4.9-cp314-cp314-win_amd64.whl.metadata (6.4 kB)
Collecting pillow>=8 (from matplotlib)
  Downloading pillow-12.1.1-cp314-cp314-win_amd64.whl.metadata (9.0 kB)
Collecting pyparsing>=3 (from matplotlib)
  Do


[notice] A new release of pip is available: 25.3 -> 26.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:

from __future__ import annotations

from dataclasses import dataclass
from math import erf, pi, sqrt
from typing import Dict, List, Tuple
import warnings

import pandas as pd


@dataclass(frozen=True)
class Material:
    rho_bulk_kg_m3: float
    grain_diameter_m: float


@dataclass(frozen=True)
class BeverlooParams:
    C: float = 0.58
    k: float = 1.4
    g_m_s2: float = 9.81


@dataclass(frozen=True)
class Silo:
    silo_id: str
    capacity_kg: float
    body_diameter_m: float
    outlet_diameter_m: float
    initial_mass_kg: float = 0.0

    @property
    def cross_section_area_m2(self) -> float:
        return pi * (self.body_diameter_m / 2.0) ** 2


def normal_cdf(x: float) -> float:
    return 0.5 * (1.0 + erf(x / sqrt(2.0)))


def _build_silo_map(df_silos: pd.DataFrame) -> Dict[str, Silo]:
    required = {"silo_id", "capacity_kg", "body_diameter_m", "outlet_diameter_m"}
    missing = required - set(df_silos.columns)
    if missing:
        raise ValueError(f"df_silos missing columns: {missing}")

    if "initial_mass_kg" not in df_silos.columns:
        df_silos = df_silos.copy()
        df_silos["initial_mass_kg"] = 0.0

    silos: Dict[str, Silo] = {}
    for _, r in df_silos.iterrows():
        silos[str(r["silo_id"])] = Silo(
            silo_id=str(r["silo_id"]),
            capacity_kg=float(r["capacity_kg"]),
            body_diameter_m=float(r["body_diameter_m"]),
            outlet_diameter_m=float(r["outlet_diameter_m"]),
            initial_mass_kg=float(r["initial_mass_kg"]),
        )
    return silos


def _validate_suppliers(df_layers: pd.DataFrame, df_suppliers: pd.DataFrame) -> None:
    if "supplier" not in df_suppliers.columns:
        raise ValueError("df_suppliers must include 'supplier' column.")
    missing = set(df_layers["supplier"].astype(str).unique()) - set(df_suppliers["supplier"].astype(str).unique())
    if missing:
        raise ValueError(f"Suppliers in df_layers not found in df_suppliers: {missing}")


def build_intervals_from_df_layers(
    silo_id: str,
    df_layers: pd.DataFrame,
    silo: Silo,
    material: Material,
) -> Tuple[pd.DataFrame, float]:
    required = {"silo_id", "layer_index", "lot_id", "supplier", "segment_mass_kg"}
    missing = required - set(df_layers.columns)
    if missing:
        raise ValueError(f"df_layers missing columns: {missing}")

    d = df_layers[df_layers["silo_id"].astype(str) == str(silo_id)].copy()
    if d.empty:
        raise ValueError(f"No layers found for silo_id={silo_id}")

    d["layer_index"] = d["layer_index"].astype(int)
    d = d.sort_values(["layer_index"], kind="mergesort").reset_index(drop=True)

    expected = list(range(1, len(d) + 1))
    actual = d["layer_index"].tolist()
    if actual != expected:
        raise ValueError(
            f"layer_index for silo {silo_id} must be contiguous bottom->top 1..N. Got {actual}"
        )

    total_mass = float(d["segment_mass_kg"].sum())
    if total_mass > silo.capacity_kg + 1e-9:
        warnings.warn(
            f"Silo {silo_id}: segment mass sum ({total_mass:.2f}) exceeds capacity ({silo.capacity_kg:.2f})."
        )

    area = silo.cross_section_area_m2
    z_cursor = 0.0
    z0_list: List[float] = []
    z1_list: List[float] = []

    for m in d["segment_mass_kg"].astype(float):
        if m < 0:
            raise ValueError(f"Silo {silo_id}: negative segment_mass_kg found.")
        h = m / (material.rho_bulk_kg_m3 * area)
        z0_list.append(z_cursor)
        z_cursor += h
        z1_list.append(z_cursor)

    d["z0_m"] = z0_list
    d["z1_m"] = z1_list
    return d, z_cursor


def layer_probabilities(
    z_front_m: float,
    sigma_m: float,
    intervals_df: pd.DataFrame,
    total_height_m: float,
) -> pd.Series:
    if sigma_m <= 0:
        raise ValueError("sigma_m must be > 0.")

    denom = normal_cdf((total_height_m - z_front_m) / sigma_m) - normal_cdf((0.0 - z_front_m) / sigma_m)
    if denom <= 1e-15:
        p = pd.Series(0.0, index=intervals_df.index)
        return p

    p_raw = (
        intervals_df.apply(lambda r: normal_cdf((r["z1_m"] - z_front_m) / sigma_m), axis=1)
        - intervals_df.apply(lambda r: normal_cdf((r["z0_m"] - z_front_m) / sigma_m), axis=1)
    ) / denom
    p_raw = p_raw.clip(lower=0.0)
    s = float(p_raw.sum())
    return (p_raw / s) if s > 0 else pd.Series(0.0, index=intervals_df.index)


def beverloo_mass_flow_rate_kg_s(silo: Silo, material: Material, bev: BeverlooParams) -> float:
    deff = silo.outlet_diameter_m - bev.k * material.grain_diameter_m
    if deff <= 0:
        raise ValueError(
            f"Silo {silo.silo_id}: invalid Beverloo term D-k*d <= 0 "
            f"({silo.outlet_diameter_m:.6f} - {bev.k:.6f}*{material.grain_diameter_m:.6f})."
        )
    return bev.C * material.rho_bulk_kg_m3 * sqrt(bev.g_m_s2) * (deff ** 2.5)


def _resolve_discharge_mass_kg(silo_id: str, df_discharge: pd.DataFrame, total_mass_kg: float) -> float:
    row = df_discharge[df_discharge["silo_id"].astype(str) == str(silo_id)]
    if row.empty:
        raise ValueError(f"No discharge row found for silo_id={silo_id}")
    row = row.iloc[0]

    has_mass = "discharge_mass_kg" in row.index and pd.notna(row["discharge_mass_kg"])
    has_frac = "discharge_fraction" in row.index and pd.notna(row["discharge_fraction"])

    if has_mass:
        m = float(row["discharge_mass_kg"])
    elif has_frac:
        m = float(row["discharge_fraction"]) * total_mass_kg
    else:
        raise ValueError(f"Silo {silo_id}: provide discharge_mass_kg or discharge_fraction.")

    if m < 0:
        raise ValueError(f"Silo {silo_id}: discharge mass cannot be negative.")
    if m > total_mass_kg + 1e-9:
        raise ValueError(
            f"Silo {silo_id}: discharge_mass_kg ({m:.2f}) exceeds total mass in silo ({total_mass_kg:.2f})."
        )
    return m


def estimate_discharge_contrib_for_silo(
    silo: Silo,
    df_layers: pd.DataFrame,
    df_discharge: pd.DataFrame,
    material: Material,
    bev: BeverlooParams,
    sigma_m: float,
    steps: int = 2000,
    auto_adjust: bool = False,
    min_nonzero_mass_kg: float = 1e-3,
) -> Dict[str, object]:
    intervals_df, total_height_m = build_intervals_from_df_layers(silo.silo_id, df_layers, silo, material)
    total_mass_kg = float(intervals_df["segment_mass_kg"].sum())
    discharge_mass_kg = _resolve_discharge_mass_kg(silo.silo_id, df_discharge, total_mass_kg)

    m_dot = beverloo_mass_flow_rate_kg_s(silo, material, bev)
    discharge_time_s = discharge_mass_kg / m_dot if m_dot > 0 else 0.0

    def _simulate(use_sigma_m: float) -> pd.DataFrame:
        seg = intervals_df.copy()
        seg["discharged_mass_kg"] = 0.0

        if discharge_mass_kg == 0:
            return seg

        dt = discharge_time_s / steps
        dm = m_dot * dt  # equals discharge_mass_kg / steps
        area = silo.cross_section_area_m2

        for i in range(steps):
            t_mid = (i + 0.5) * dt
            m_removed = min(discharge_mass_kg, m_dot * t_mid)
            z_front = m_removed / (material.rho_bulk_kg_m3 * area)

            p = layer_probabilities(z_front, use_sigma_m, seg, total_height_m)
            seg["discharged_mass_kg"] += dm * p.values

        s = float(seg["discharged_mass_kg"].sum())
        if s > 0:
            seg["discharged_mass_kg"] *= discharge_mass_kg / s
        return seg

    used_sigma_m = sigma_m
    seg_contrib = _simulate(used_sigma_m)

    if auto_adjust:
        grow = 1.35
        for _ in range(12):
            lot_nonzero = (seg_contrib.groupby("lot_id")["discharged_mass_kg"].sum() > min_nonzero_mass_kg).sum()
            if lot_nonzero >= 2:
                break
            used_sigma_m *= grow
            seg_contrib = _simulate(used_sigma_m)

    lot_contrib = (
        seg_contrib.groupby(["silo_id", "lot_id", "supplier"], as_index=False)["discharged_mass_kg"]
        .sum()
        .sort_values(["silo_id", "lot_id"])
        .reset_index(drop=True)
    )

    return {
        "silo_id": silo.silo_id,
        "discharged_mass_kg": discharge_mass_kg,
        "mass_flow_rate_kg_s": m_dot,
        "discharge_time_s": discharge_time_s,
        "sigma_m": used_sigma_m,
        "df_segment_contrib": seg_contrib[
            ["silo_id", "layer_index", "lot_id", "supplier", "segment_mass_kg", "discharged_mass_kg"]
        ].copy(),
        "df_lot_contrib": lot_contrib,
    }


def blend_params_from_contrib(df_contrib: pd.DataFrame, df_suppliers: pd.DataFrame) -> Dict[str, float]:
    required = {"supplier", "discharged_mass_kg"}
    missing = required - set(df_contrib.columns)
    if missing:
        raise ValueError(f"df_contrib missing columns: {missing}")

    param_cols = [c for c in df_suppliers.columns if c != "supplier"]
    if not param_cols:
        raise ValueError("df_suppliers must have at least one parameter column besides 'supplier'.")

    merged = df_contrib.merge(df_suppliers, on="supplier", how="left")
    if merged[param_cols].isna().any().any():
        raise ValueError("Some suppliers in contributions do not have complete specs in df_suppliers.")

    total_mass = float(merged["discharged_mass_kg"].sum())
    if total_mass <= 0:
        return {p: float("nan") for p in param_cols}

    out: Dict[str, float] = {}
    w = merged["discharged_mass_kg"].astype(float)
    for p in param_cols:
        out[p] = float((w * merged[p].astype(float)).sum() / total_mass)
    return out


def run_three_silo_blend(
    df_silos: pd.DataFrame,
    df_layers: pd.DataFrame,
    df_suppliers: pd.DataFrame,
    df_discharge: pd.DataFrame,
    material: Material,
    bev: BeverlooParams,
    sigma_m: float,
    steps: int = 2000,
    auto_adjust: bool = False,
) -> Dict[str, object]:
    if "silo_id" not in df_discharge.columns:
        raise ValueError("df_discharge must include 'silo_id' column.")
    _validate_suppliers(df_layers, df_suppliers)

    silos = _build_silo_map(df_silos)
    if len(silos) != 3:
        warnings.warn(f"Expected 3 silos; got {len(silos)}. Running on provided silos.")

    per_silo_results: Dict[str, Dict[str, object]] = {}
    all_segment_contrib = []
    all_lot_contrib = []
    total_discharged = 0.0

    for silo_id, silo in silos.items():
        res = estimate_discharge_contrib_for_silo(
            silo=silo,
            df_layers=df_layers,
            df_discharge=df_discharge,
            material=material,
            bev=bev,
            sigma_m=sigma_m,
            steps=steps,
            auto_adjust=auto_adjust,
        )
        per_silo_results[silo_id] = res
        per_silo_results[silo_id]["blended_params_per_silo"] = blend_params_from_contrib(
            res["df_lot_contrib"], df_suppliers
        )

        all_segment_contrib.append(res["df_segment_contrib"])
        all_lot_contrib.append(res["df_lot_contrib"])
        total_discharged += float(res["discharged_mass_kg"])

    df_segment_contrib_all = pd.concat(all_segment_contrib, ignore_index=True)
    df_lot_contrib_all = pd.concat(all_lot_contrib, ignore_index=True)
    total_blended_params = blend_params_from_contrib(df_lot_contrib_all, df_suppliers)

    return {
        "per_silo": per_silo_results,
        "df_segment_contrib_all": df_segment_contrib_all,
        "df_lot_contrib_all": df_lot_contrib_all,
        "total_discharged_mass_kg": total_discharged,
        "total_blended_params": total_blended_params,
    }


def main() -> None:
    # Example supplier specs.
    df_suppliers = pd.DataFrame(
        [
            {
                "supplier": "BBM",
                "moisture_pct": 4.2,
                "fine_extract_db_pct": 82.0,
                "wort_pH": 5.98,
                "diastatic_power_WK": 342.1,
                "total_protein_pct": 10.12,
                "wort_colour_EBC": 3.8,
            },
            {
                "supplier": "COFCO",
                "moisture_pct": 4.4,
                "fine_extract_db_pct": 81.8,
                "wort_pH": 5.93,
                "diastatic_power_WK": 317.4,
                "total_protein_pct": 11.1,
                "wort_colour_EBC": 4.0,
            },
            {
                "supplier": "Malteurop",
                "moisture_pct": 4.3,
                "fine_extract_db_pct": 81.2,
                "wort_pH": 5.97,
                "diastatic_power_WK": 336.9,
                "total_protein_pct": 10.5,
                "wort_colour_EBC": 3.8,
            },
        ]
    )

    # Incoming lots (one lot split across two silos in df_layers below).
    df_lots = pd.DataFrame(
        [
            {"lot_id": "L1001", "supplier": "BBM", "mass_kg": 2600.0},
            {"lot_id": "L1002", "supplier": "COFCO", "mass_kg": 2200.0},
            {"lot_id": "L1003", "supplier": "Malteurop", "mass_kg": 2400.0},
        ]
    )
    _ = df_lots  # kept for context; model uses df_layers as realized segment structure.

    df_silos = pd.DataFrame(
        [
            {"silo_id": "S1", "capacity_kg": 4000.0, "body_diameter_m": 3.0, "outlet_diameter_m": 0.20},
            {"silo_id": "S2", "capacity_kg": 4000.0, "body_diameter_m": 3.2, "outlet_diameter_m": 0.20},
            {"silo_id": "S3", "capacity_kg": 4000.0, "body_diameter_m": 3.1, "outlet_diameter_m": 0.21},
        ]
    )

    # Layered segments after filling (bottom->top), includes partial lot split across silos.
    df_layers = pd.DataFrame(
        [
            # S1
            {"silo_id": "S1", "layer_index": 1, "lot_id": "L1001", "supplier": "BBM", "segment_mass_kg": 1200.0},
            {"silo_id": "S1", "layer_index": 2, "lot_id": "L1002", "supplier": "COFCO", "segment_mass_kg": 900.0},
            {"silo_id": "S1", "layer_index": 3, "lot_id": "L1003", "supplier": "Malteurop", "segment_mass_kg": 700.0},
            # S2 (L1001 split here too)
            {"silo_id": "S2", "layer_index": 1, "lot_id": "L1001", "supplier": "BBM", "segment_mass_kg": 1400.0},
            {"silo_id": "S2", "layer_index": 2, "lot_id": "L1003", "supplier": "Malteurop", "segment_mass_kg": 1000.0},
            {"silo_id": "S2", "layer_index": 3, "lot_id": "L1002", "supplier": "COFCO", "segment_mass_kg": 600.0},
            # S3
            {"silo_id": "S3", "layer_index": 1, "lot_id": "L1002", "supplier": "COFCO", "segment_mass_kg": 700.0},
            {"silo_id": "S3", "layer_index": 2, "lot_id": "L1003", "supplier": "Malteurop", "segment_mass_kg": 700.0},
        ]
    )

    # Per-silo discharge: can use mass or fraction.
    df_discharge = pd.DataFrame(
        [
            {"silo_id": "S1", "discharge_mass_kg": 1600.0},
            {"silo_id": "S2", "discharge_fraction": 0.50},
            {"silo_id": "S3", "discharge_mass_kg": 800.0},
        ]
    )

    material = Material(rho_bulk_kg_m3=610.0, grain_diameter_m=0.004)
    bev = BeverlooParams(C=0.58, k=1.4, g_m_s2=9.81)

    result = run_three_silo_blend(
        df_silos=df_silos,
        df_layers=df_layers,
        df_suppliers=df_suppliers,
        df_discharge=df_discharge,
        material=material,
        bev=bev,
        sigma_m=0.12,
        steps=2000,
        auto_adjust=True,
    )

    for silo_id, r in result["per_silo"].items():
        print(f"\n=== Silo {silo_id} ===")
        print(f"discharged_mass_kg: {r['discharged_mass_kg']:.3f}")
        print(f"mass_flow_rate_kg_s: {r['mass_flow_rate_kg_s']:.3f}")
        print(f"discharge_time_s: {r['discharge_time_s']:.3f}")
        print(f"sigma_m used: {r['sigma_m']:.4f}")
        print("Lot contributions (kg):")
        print(r["df_lot_contrib"][["lot_id", "supplier", "discharged_mass_kg"]].to_string(index=False))
        print("Blended params per silo:")
        for k, v in r["blended_params_per_silo"].items():
            print(f"  {k}: {v:.4f}")

    print("\n=== TOTAL COLLECTED BLEND ===")
    print(f"total_discharged_mass_kg: {result['total_discharged_mass_kg']:.3f}")
    for k, v in result["total_blended_params"].items():
        print(f"{k}: {v:.4f}")


if __name__ == "__main__":
    main()



=== Silo S1 ===
discharged_mass_kg: 1600.000
mass_flow_rate_kg_s: 18.464
discharge_time_s: 86.654
sigma_m used: 0.1200
Lot contributions (kg):
lot_id  supplier  discharged_mass_kg
 L1001       BBM         1119.990509
 L1002     COFCO          435.248848
 L1003 Malteurop           44.760643
Blended params per silo:
  moisture_pct: 4.2572
  fine_extract_db_pct: 81.9232
  wort_pH: 5.9661
  diastatic_power_WK: 335.2354
  total_protein_pct: 10.3972
  wort_colour_EBC: 3.8544

=== Silo S2 ===
discharged_mass_kg: 1500.000
mass_flow_rate_kg_s: 18.464
discharge_time_s: 81.238
sigma_m used: 0.1200
Lot contributions (kg):
lot_id  supplier  discharged_mass_kg
 L1001       BBM         1196.893143
 L1002     COFCO           15.540446
 L1003 Malteurop          287.566410
Blended params per silo:
  moisture_pct: 4.2212
  fine_extract_db_pct: 81.8446
  wort_pH: 5.9776
  diastatic_power_WK: 340.8472
  total_protein_pct: 10.2030
  wort_colour_EBC: 3.8021

=== Silo S3 ===
discharged_mass_kg: 800.000
mass_