In [1]:
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import asyncio
import json
import random
from collections import defaultdict
from agentfield import AIConfig, Agent

# ============================================================================
# GLOBAL CONCURRENCY CONTROL
# ============================================================================
# Global semaphore to limit total concurrent AI calls across all operations
MAX_CONCURRENT_CALLS = 20
concurrency_semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS)

In [2]:
import os

app = Agent(
    node_id="simulation-engine",
    agentfield_server=f"{os.getenv('AGENTFIELD_SERVER', 'http://localhost:8080')}",
    ai_config=AIConfig(
        model=os.getenv("AI_MODEL", "openrouter/deepseek/deepseek-v3.1-terminus"),
        api_key="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",
    ),
)

In [3]:
class ScenarioAnalysis(BaseModel):
    """Simple schema for scenario decomposition"""

    entity_type: str = Field(
        description="Type of entity being simulated (e.g., 'customer', 'voter', 'employee')"
    )
    decision_type: str = Field(
        description="Type of decision (e.g., 'binary_choice', 'multi_option', 'continuous_value')"
    )
    decision_options: List[str] = Field(
        description="List of possible decisions/outcomes"
    )
    analysis: str = Field(
        description="Detailed analysis of the scenario including key factors, causal relationships, and what matters"
    )
    key_attributes: List[str] = Field(
        default=[],
        description="Top 5-7 attributes that matter most for this decision (identified from analysis)",
    )


async def decompose_scenario(
    scenario: str, context: List[str] = []
) -> ScenarioAnalysis:
    """
    Analyzes the scenario to understand what we're simulating.
    Returns entity type, decision type, and deep analysis.
    """
    context_str = (
        "\n".join([f"- {c}" for c in context])
        if context
        else "No additional context provided."
    )

    prompt = f"""You are analyzing a simulation scenario to understand what needs to be modeled.

SCENARIO:
{scenario}

CONTEXT:
{context_str}

TASK:
Analyze this scenario deeply and provide:

1. entity_type: What type of entity/person are we simulating? (e.g., "customer", "voter", "employee", "consumer")

2. decision_type: What kind of decision are they making?
   - "binary_choice" (yes/no, stay/leave)
   - "multi_option" (choose from several options)
   - "continuous_value" (a number or amount)

3. decision_options: List all possible decisions/outcomes the entity could make. Be specific and exhaustive.

4. analysis: Write a comprehensive analysis (3-4 paragraphs) covering:
   - What are the key factors that would influence this decision?
   - What causal relationships exist? (e.g., "income affects price sensitivity")
   - What attributes of the entity would matter most?
   - What psychological, economic, or social dynamics are at play?
   - Are there different segments/archetypes of entities we should consider?
   - What hidden variables or second-order effects might exist?

Be thorough in your analysis - this will guide the entire simulation.

5. key_attributes: Identify the top 5-7 attributes that will MOST influence this decision.
   These should be the most predictive factors. Examples: income, price_sensitivity, tenure, loyalty, alternatives.
   Return as a list of attribute names (e.g., ["price_sensitivity", "income", "tenure", "loyalty", "alternatives"])."""

    async with concurrency_semaphore:  # Global concurrency control
        result = await app.ai(prompt, schema=ScenarioAnalysis)

    # If key_attributes not provided, use a default set based on common patterns
    if not result.key_attributes:
        # Default key attributes for common scenarios
        if "price" in scenario.lower() or "cost" in scenario.lower():
            result.key_attributes = [
                "price_sensitivity",
                "income",
                "budget_constraint",
                "perceived_value",
                "alternatives",
            ]
        elif "upgrade" in scenario.lower() or "switch" in scenario.lower():
            result.key_attributes = [
                "loyalty",
                "tenure",
                "satisfaction",
                "alternatives",
                "switching_cost",
            ]
        else:
            # Generic defaults
            result.key_attributes = [
                "loyalty",
                "satisfaction",
                "alternatives",
                "tenure",
                "value",
            ]

    return result

In [4]:
class FactorGraph(BaseModel):
    """Schema for the causal attribute graph"""

    attributes: Dict[str, str] = Field(
        description="Dictionary of attribute_name: description. Each attribute that matters for this entity."
    )
    attribute_graph: str = Field(
        description="Detailed description of how attributes relate to each other and to the decision, including correlations, dependencies, and causal chains"
    )
    sampling_strategy: str = Field(
        description="Description of how to sample these attributes to get realistic, diverse entities"
    )


async def generate_factor_graph(
    scenario: str, scenario_analysis: ScenarioAnalysis, context: List[str] = []
) -> FactorGraph:
    """
    Creates the factor graph: what attributes matter and how they relate.
    """
    context_str = (
        "\n".join([f"- {c}" for c in context])
        if context
        else "No additional context provided."
    )

    prompt = f"""You are designing the factor graph for a simulation.

SCENARIO:
{scenario}

CONTEXT:
{context_str}

PREVIOUS ANALYSIS:
Entity Type: {scenario_analysis.entity_type}
Decision Type: {scenario_analysis.decision_type}
Possible Decisions: {', '.join(scenario_analysis.decision_options)}

Key Insights from Analysis:
{scenario_analysis.analysis}

TASK:
Design the factor graph that defines what attributes each {scenario_analysis.entity_type} should have.

1. attributes: Create a dictionary of all relevant attributes. For each attribute, provide a clear description.
   Include attributes across these categories:
   - Demographic (age, location, income, etc.)
   - Behavioral (usage patterns, preferences, history)
   - Psychographic (values, attitudes, personality traits)
   - Contextual (external factors, constraints, alternatives available)

   Keep attribute names simple and lowercase (e.g., "age", "income_level", "price_sensitivity")
   Make descriptions clear and specific.

2. attribute_graph: Write a detailed explanation (2-3 paragraphs) of:
   - How attributes influence each other (correlations and dependencies)
   - How attributes influence the final decision
   - What are strong vs weak predictors
   - Any interaction effects (e.g., "age matters more for low-income entities")
   - Which attributes cluster together to form natural segments

3. sampling_strategy: Describe how to sample these attributes to create realistic entities:
   - What are typical ranges/distributions for each attribute?
   - Which attributes are correlated and should be sampled together?
   - Are there natural segments/archetypes we should ensure are represented?
   - What makes a "realistic" vs "unrealistic" combination of attributes?

Be specific and detailed - this defines the entire simulation space."""

    async with concurrency_semaphore:  # Global concurrency control
        result = await app.ai(prompt, schema=FactorGraph)

    return result

In [5]:
class EntityProfile(BaseModel):
    """Schema for a single entity's attributes"""

    entity_id: str
    attributes: Dict[str, Any] = Field(
        description="Dictionary of attribute_name: value for this entity"
    )
    profile_summary: str = Field(
        description="2-3 sentence human-readable summary of who this entity is"
    )


class EntityBatch(BaseModel):
    """Schema for generating multiple entities at once"""

    entities: List[Dict[str, Any]] = Field(
        description="List of entity attribute dictionaries"
    )

In [6]:
async def generate_entity_batch_optimized(
    start_id: int,
    batch_size: int,
    scenario_analysis: ScenarioAnalysis,
    factor_graph: FactorGraph,
    exploration_ratio: float = 0.1,
) -> List[EntityProfile]:
    """
    üîß FIXED: Generate multiple entities in ONE AI call to save tokens.
    Generate 5-10 entities per call, then parallelize those calls.
    """
    # Generate multiple entities per AI call (but not too many)
    entities_per_call = 5  # Sweet spot for quality vs efficiency
    num_calls = (batch_size + entities_per_call - 1) // entities_per_call

    async def generate_mini_batch(call_num: int) -> List[EntityProfile]:
        start = call_num * entities_per_call
        count = min(entities_per_call, batch_size - start)

        # Determine exploration mode for this mini-batch
        exploration_mode = start < int(batch_size * exploration_ratio)

        mode_instruction = ""
        if exploration_mode:
            mode_instruction = """EXPLORATION MODE: Generate entities with unusual or edge-case attributes.
Sample from distribution tails or create surprising but realistic combinations."""
        else:
            mode_instruction = """STANDARD MODE: Generate typical, realistic entities following
normal distributions and common attribute combinations."""

        prompt = f"""Generate {count} synthetic {scenario_analysis.entity_type} entities for simulation.

AVAILABLE ATTRIBUTES:
{json.dumps(factor_graph.attributes, indent=2)}

ATTRIBUTE RELATIONSHIPS:
{factor_graph.attribute_graph}

SAMPLING GUIDANCE:
{factor_graph.sampling_strategy}

{mode_instruction}

TASK:
Generate exactly {count} diverse entities. For each entity, create:
- A complete set of attributes (all attributes from the list above)
- Values that are realistic and internally consistent
- Follow correlations and dependencies described
- Ensure diversity across the {count} entities

Return a list of {count} dictionaries, where each dictionary contains:
- All attribute names as keys
- Appropriate values (numbers, strings, booleans as needed)

Make entities feel realistic and distinct from each other."""

        # Use EntityBatch schema to get multiple entities at once
        class MiniBatchSchema(BaseModel):
            entities: List[Dict[str, Any]] = Field(
                description=f"List of exactly {count} entity attribute dictionaries"
            )

        async with concurrency_semaphore:  # Global concurrency control
            result = await app.ai(prompt, schema=MiniBatchSchema)

        # Convert to EntityProfile objects
        profiles = []
        for i, entity_attrs in enumerate(result.entities):
            entity_id = f"E_{start_id + start + i:06d}"

            # Generate a quick summary for each entity
            attrs_str = ", ".join(
                [f"{k}={v}" for k, v in list(entity_attrs.items())[:5]]
            )
            summary = f"{scenario_analysis.entity_type.title()} with {attrs_str}..."

            profile = EntityProfile(
                entity_id=entity_id, attributes=entity_attrs, profile_summary=summary
            )
            profiles.append(profile)

        return profiles

    # Parallelize the mini-batch calls
    tasks = [generate_mini_batch(i) for i in range(num_calls)]
    results = await asyncio.gather(*tasks)

    # Flatten results
    all_entities = []
    for batch in results:
        all_entities.extend(batch)

    return all_entities[:batch_size]  # Trim to exact size


# Keep the old function name for backward compatibility
async def generate_entity_batch(
    start_id: int,
    batch_size: int,
    scenario_analysis: ScenarioAnalysis,
    factor_graph: FactorGraph,
    exploration_ratio: float = 0.1,
) -> List[EntityProfile]:
    """
    Generate a batch of entities (uses optimized batching internally).
    """
    return await generate_entity_batch_optimized(
        start_id, batch_size, scenario_analysis, factor_graph, exploration_ratio
    )

In [7]:
class EntityDecision(BaseModel):
    """Schema for entity's decision - simplified to avoid JSON parsing issues"""

    entity_id: str
    decision: str = Field(
        description="The chosen decision/action from the available options"
    )
    confidence: float = Field(description="Confidence in this decision, 0.0 to 1.0")
    key_factor: str = Field(
        description="Single most important attribute that influenced this decision (max 50 words)"
    )
    trade_off: str = Field(description="Main trade-off considered (max 50 words)")
    reasoning: Optional[str] = Field(
        default="",
        description="Brief explanation (1-2 sentences, max 100 words) - optional",
    )


async def simulate_entity_decision_safe(
    entity: EntityProfile,
    scenario: str,
    scenario_analysis: ScenarioAnalysis,
    context: List[str] = [],
) -> Optional[EntityDecision]:
    """
    üîß FIXED: Simulates decision with error handling and simplified prompts.
    Only shows key attributes (5-7) instead of all attributes to reduce JSON parsing issues.
    """
    async with concurrency_semaphore:  # Global concurrency control
        try:
            context_str = "\n".join([f"- {c}" for c in context]) if context else ""

            # üîß KEY FIX: Only show top 5-7 key attributes, not all attributes
            key_attrs = scenario_analysis.key_attributes[:7]  # Max 7 attributes
            if not key_attrs:
                # Fallback: use first 5 attributes if key_attributes not set
                key_attrs = list(entity.attributes.keys())[:5]

            # Build simplified attributes string (only key attributes)
            key_attributes_str = "\n".join(
                [
                    f"  ‚Ä¢ {k}: {entity.attributes.get(k, 'N/A')}"
                    for k in key_attrs
                    if k in entity.attributes
                ]
            )

            context_section = (
                f"ADDITIONAL CONTEXT:\n{context_str}\n\n" if context else ""
            )

            prompt = f"""You are simulating the decision-making of a specific {scenario_analysis.entity_type}.

WHO YOU ARE:
{entity.profile_summary}

KEY ATTRIBUTES (most relevant for this decision):
{key_attributes_str}

SCENARIO YOU'RE FACING:
{scenario}

{context_section}AVAILABLE DECISIONS:
{', '.join(scenario_analysis.decision_options)}

TASK:
Based on who you are, decide how you would respond to this scenario.

1. decision: Choose one option from the available decisions list.

2. confidence: Rate confidence 0.0-1.0. How certain are you?

3. key_factor: What single attribute influenced this decision most? (max 50 words)

4. trade_off: What was the main trade-off you considered? (max 50 words)

5. reasoning: Optional brief explanation (1-2 sentences, max 100 words).

Be concise and realistic."""

            result = await app.ai(prompt, schema=EntityDecision)
            result.entity_id = entity.entity_id
            return result

        except Exception as e:
            print(f"‚ö†Ô∏è  Failed entity {entity.entity_id}: {str(e)[:100]}")
            # Return a default decision instead of failing
            return EntityDecision(
                entity_id=entity.entity_id,
                decision=scenario_analysis.decision_options[0]
                if scenario_analysis.decision_options
                else "unknown",
                confidence=0.0,
                key_factor="Error during decision generation",
                trade_off="Unable to evaluate",
                reasoning="Failed to generate decision",
            )


# Keep old function name for backward compatibility
async def simulate_entity_decision(
    entity: EntityProfile,
    scenario: str,
    scenario_analysis: ScenarioAnalysis,
    context: List[str] = [],
) -> EntityDecision:
    """Wrapper that uses the safe version"""
    result = await simulate_entity_decision_safe(
        entity, scenario, scenario_analysis, context
    )
    if result is None:
        raise ValueError(f"Failed to generate decision for {entity.entity_id}")
    return result

In [8]:
async def simulate_batch_decisions(
    entities: List[EntityProfile],
    scenario: str,
    scenario_analysis: ScenarioAnalysis,
    context: List[str] = [],
    parallel_batch_size: int = 20,  # Reduced from 50 to avoid overload
) -> List[EntityDecision]:
    """
    üîß FIXED: Process with error handling, rate limiting, and global concurrency control.
    - Uses return_exceptions=True to prevent one failure from killing the batch
    - Adds delays between batches to avoid rate limits
    - Filters out failed entities
    - Respects global MAX_CONCURRENT_CALLS semaphore
    """
    all_decisions = []

    # Process in batches to control concurrency
    num_batches = (len(entities) + parallel_batch_size - 1) // parallel_batch_size

    for batch_num, i in enumerate(range(0, len(entities), parallel_batch_size)):
        batch = entities[i : i + parallel_batch_size]

        # Each entity gets its own AI call, but we do them in parallel
        # Use return_exceptions=True so one failure doesn't kill the batch
        tasks = [
            simulate_entity_decision_safe(entity, scenario, scenario_analysis, context)
            for entity in batch
        ]

        # üîß KEY FIX: Use return_exceptions=True to handle failures gracefully
        batch_results = await asyncio.gather(*tasks, return_exceptions=True)

        # Filter out exceptions and None values
        valid_decisions = []
        for result in batch_results:
            if isinstance(result, EntityDecision):
                valid_decisions.append(result)
            elif isinstance(result, Exception):
                print(f"‚ö†Ô∏è  Exception in batch: {str(result)[:100]}")
            # None values are already filtered

        all_decisions.extend(valid_decisions)

        # Progress reporting
        print(
            f"   Batch {batch_num + 1}/{num_batches}: Completed {len(all_decisions)}/{len(entities)} decisions..."
        )

        # üîß KEY FIX: Add delay between batches to avoid rate limits
        # Only delay if not the last batch
        if i + parallel_batch_size < len(entities):
            await asyncio.sleep(0.5)  # 0.5 second delay between batches

    print(
        f"   ‚úÖ Successfully generated {len(all_decisions)}/{len(entities)} decisions"
    )
    return all_decisions

In [9]:
class SimulationInsights(BaseModel):
    """Schema for final simulation results"""

    outcome_distribution: Dict[str, float] = Field(
        description="Percentage for each decision option"
    )
    key_insight: str = Field(
        description="One sentence summary of the most important finding"
    )
    detailed_analysis: str = Field(
        description="Comprehensive analysis (4-5 paragraphs) covering: overall patterns, segment differences, causal drivers, surprising findings, and implications"
    )
    segment_patterns: str = Field(
        description="Description of how different types of entities decided differently, organized by meaningful segments"
    )
    causal_drivers: str = Field(
        description="Analysis of which attributes most strongly predicted decisions, with specific examples and correlations"
    )


async def aggregate_and_analyze(
    scenario: str,
    scenario_analysis: ScenarioAnalysis,
    factor_graph: FactorGraph,
    entities: List[EntityProfile],
    decisions: List[EntityDecision],
    context: List[str] = [],
) -> SimulationInsights:
    """
    üîß FIXED: Only pass intelligent summaries to AI, not all raw data.
    Pre-compute statistics, create attribute distributions, and sample representative examples.
    """
    # Compute basic statistics (no AI needed)
    total = len(decisions)
    decision_counts = {}
    confidence_by_decision = {}

    for d in decisions:
        decision_counts[d.decision] = decision_counts.get(d.decision, 0) + 1
        if d.decision not in confidence_by_decision:
            confidence_by_decision[d.decision] = []
        confidence_by_decision[d.decision].append(d.confidence)

    outcome_dist = {k: v / total for k, v in decision_counts.items()}
    avg_confidence = {k: sum(v) / len(v) for k, v in confidence_by_decision.items()}

    # üîß KEY FIX: Create intelligent summaries instead of passing all data

    # 1. Attribute distribution summaries (for each attribute, show value frequencies)
    attribute_summaries = {}
    for attr_name in factor_graph.attributes.keys():
        # Get all values for this attribute
        attr_values = [
            e.attributes.get(attr_name) for e in entities if attr_name in e.attributes
        ]

        # Count frequencies
        value_counts = defaultdict(int)
        for val in attr_values:
            if val is not None:
                # Convert to string for counting, handle different types
                val_str = str(val)
                value_counts[val_str] += 1

        # Create summary (top 5 most common values)
        sorted_values = sorted(value_counts.items(), key=lambda x: x[1], reverse=True)
        top_values = sorted_values[:5]
        total_with_attr = len(attr_values)

        if total_with_attr > 0:
            summary_parts = [
                f"{val} ({count}/{total_with_attr}, {count*100/total_with_attr:.1f}%)"
                for val, count in top_values
            ]
            attribute_summaries[attr_name] = {
                "distribution": ", ".join(summary_parts),
                "total": total_with_attr,
            }

    # 2. Decision patterns by attribute (which attributes correlate with which decisions)
    decision_by_attribute = defaultdict(lambda: defaultdict(int))
    for entity, decision in zip(entities, decisions):
        for attr_name, attr_value in entity.attributes.items():
            if attr_value is not None:
                attr_str = str(attr_value)
                decision_by_attribute[attr_name][(attr_str, decision.decision)] += 1

    # Create summary of strongest correlations
    attribute_decision_patterns = {}
    for attr_name in factor_graph.attributes.keys():
        if attr_name in decision_by_attribute:
            patterns = decision_by_attribute[attr_name]
            # Find the strongest pattern for this attribute
            if patterns:
                top_pattern = max(patterns.items(), key=lambda x: x[1])
                (attr_val, decision_type), count = top_pattern
                total_for_attr = sum(patterns.values())
                attribute_decision_patterns[attr_name] = (
                    f"When {attr_name}={attr_val}: {count}/{total_for_attr} chose '{decision_type}' "
                    f"({count*100/total_for_attr:.1f}%)"
                )

    # 3. Sample representative examples intelligently
    sample_size = min(30, len(entities))  # Max 30 examples to AI
    samples_per_decision = max(3, sample_size // len(decision_counts))

    sampled_examples = []
    for decision_type in decision_counts.keys():
        # Get entities that made this decision
        matching_decisions = [d for d in decisions if d.decision == decision_type]
        matching_entity_ids = {d.entity_id for d in matching_decisions}
        matching_entities = {
            e.entity_id: e for e in entities if e.entity_id in matching_entity_ids
        }

        # Sample some of them
        sample_count = min(samples_per_decision, len(matching_entities))
        if sample_count > 0:
            sampled_ids = random.sample(list(matching_entities.keys()), sample_count)

            for entity_id in sampled_ids:
                entity = matching_entities[entity_id]
                decision = next(
                    d for d in matching_decisions if d.entity_id == entity_id
                )
                sampled_examples.append(
                    {
                        "attributes": entity.attributes,
                        "decision": decision.decision,
                        "key_factor": decision.key_factor,
                        "trade_off": decision.trade_off,
                        "reasoning": decision.reasoning or "",
                        "confidence": decision.confidence,
                    }
                )

    # 4. Create segment summaries (group by common attribute combinations)
    # Find entities with similar attribute patterns
    segment_examples = []
    # Group by 2-3 key attributes to create segments
    key_attributes = list(factor_graph.attributes.keys())[:3]  # Top 3 attributes

    if key_attributes:
        segment_groups = defaultdict(list)
        for entity, decision in zip(entities, decisions):
            # Create a segment key from top attributes
            segment_key = tuple(
                str(entity.attributes.get(attr, "unknown")) for attr in key_attributes
            )
            segment_groups[segment_key].append((entity, decision))

        # Get one example from each major segment
        for segment_key, group in list(segment_groups.items())[:10]:  # Top 10 segments
            if group:
                entity, decision = group[0]
                segment_examples.append(
                    {
                        "segment": f"{', '.join(f'{k}={v}' for k, v in zip(key_attributes, segment_key))}",
                        "count": len(group),
                        "example_decision": decision.decision,
                        "example_attributes": entity.attributes,
                    }
                )

    # Prepare context
    context_str = "\n".join([f"- {c}" for c in context]) if context else ""

    # üîß Send intelligent summaries, not raw data!
    attribute_summaries_str = json.dumps(attribute_summaries, indent=2)
    attribute_patterns_str = "\n".join(
        [f"  ‚Ä¢ {k}: {v}" for k, v in list(attribute_decision_patterns.items())[:10]]
    )
    segment_summaries_str = json.dumps(segment_examples, indent=2)
    sampled_examples_str = json.dumps(sampled_examples, indent=2)

    if context:
        context_block = "CONTEXT:\n" + context_str + "\n\n"
    else:
        context_block = ""

    prompt = f"""Analyze simulation results from {total} {scenario_analysis.entity_type} entities.

SCENARIO:
{scenario}

{context_block}ATTRIBUTES TRACKED:
{', '.join(factor_graph.attributes.keys())}

OUTCOME DISTRIBUTION (from {total} entities):
{json.dumps(outcome_dist, indent=2)}

AVERAGE CONFIDENCE BY DECISION:
{json.dumps(avg_confidence, indent=2)}

ATTRIBUTE DISTRIBUTIONS (summary of value frequencies):
{attribute_summaries_str}

STRONGEST ATTRIBUTE-DECISION PATTERNS:
{attribute_patterns_str}

SEGMENT SUMMARIES (grouped by key attributes):
{segment_summaries_str}

REPRESENTATIVE EXAMPLES ({len(sampled_examples)} of {total} entities):
{sampled_examples_str}

TASK:
Analyze these results and provide insights:

1. outcome_distribution: Return this exact dictionary: {outcome_dist}

2. key_insight: ONE sentence capturing the most important finding.

3. detailed_analysis: 4-5 paragraphs covering:
   - Overall pattern and dominant outcome
   - Distinct segments and their behaviors
   - Key drivers (which attributes predicted decisions)
   - Surprising or counterintuitive findings
   - Implications and recommendations

4. segment_patterns: 2-3 paragraphs analyzing how different entity types decided:
   - Group entities by meaningful attribute combinations
   - Describe each segment's typical decision and why
   - Note certainty levels by segment
   - Identify interesting edge cases

5. causal_drivers: 2-3 paragraphs on attribute influence:
   - Which attributes most strongly influenced decisions
   - Specific examples from the data
   - Interaction effects between attributes
   - Rank drivers by importance

Be specific and reference the summarized data provided."""

    async with concurrency_semaphore:  # Global concurrency control
        result = await app.ai(prompt, schema=SimulationInsights)

    # Override with our precise computed values
    result.outcome_distribution = outcome_dist

    return result

In [10]:
# ============================================================================
# MAIN ORCHESTRATOR (FIXED FOR SCALABILITY üîß)
# ============================================================================


async def run_simulation(
    scenario: str,
    population_size: int,
    context: List[str] = [],
    parallel_batch_size: int = 20,  # Reduced to avoid overload (respects global MAX_CONCURRENT_CALLS)
    exploration_ratio: float = 0.1,
) -> Dict[str, Any]:
    """
    üîß FIXED: Scalable orchestrator with proper batching at each phase.

    Handles large N by:
    1. Generating entities in optimized batches (5 per AI call)
    2. Simulating decisions in parallel batches (50 concurrent)
    3. Sampling data for analysis (max 30 examples to AI)
    """
    print(f"üöÄ Starting simulation: {population_size} entities")

    # Phase 1: Understand scenario (single call, always fast)
    print("\nüìã Phase 1: Analyzing scenario...")
    scenario_analysis = await decompose_scenario(scenario, context)
    print(f"   Entity type: {scenario_analysis.entity_type}")
    print(f"   Decision type: {scenario_analysis.decision_type}")
    print(f"   Options: {scenario_analysis.decision_options}")

    # Phase 2: Build factor graph (single call, always fast)
    print("\nüï∏Ô∏è  Phase 2: Building factor graph...")
    factor_graph = await generate_factor_graph(scenario, scenario_analysis, context)
    print(f"   Tracking {len(factor_graph.attributes)} attributes")

    # Phase 3: Generate entities in optimized batches
    print(f"\nüë• Phase 3: Generating {population_size} entities...")

    # üîß Generate in smart batches (5 entities per AI call, parallelize calls)
    entities_per_batch = (
        100  # Process 100 entities at a time (20 parallel AI calls of 5 each)
    )
    all_entities = []

    num_batches = (population_size + entities_per_batch - 1) // entities_per_batch
    for batch_num in range(num_batches):
        start_id = batch_num * entities_per_batch
        batch_size = min(entities_per_batch, population_size - start_id)

        print(
            f"   Batch {batch_num + 1}/{num_batches}: Generating {batch_size} entities..."
        )
        entities = await generate_entity_batch_optimized(
            start_id, batch_size, scenario_analysis, factor_graph, exploration_ratio
        )
        all_entities.extend(entities)

    print(f"   ‚úÖ Generated {len(all_entities)} entities")

    # Phase 4: Simulate decisions in controlled parallel batches
    print("\nüéØ Phase 4: Simulating decisions...")
    all_decisions = await simulate_batch_decisions(
        all_entities,
        scenario,
        scenario_analysis,
        context,
        parallel_batch_size=parallel_batch_size,
    )

    print(f"   ‚úÖ Simulated {len(all_decisions)} decisions")

    # Phase 5: Aggregate with sampled data
    print("\nüìä Phase 5: Aggregating results and generating insights...")
    insights = await aggregate_and_analyze(
        scenario, scenario_analysis, factor_graph, all_entities, all_decisions, context
    )

    print("\n‚ú® Simulation complete!")
    print(f"\nüéØ KEY INSIGHT: {insights.key_insight}")
    print("\nüìà OUTCOME DISTRIBUTION:")
    for decision, pct in insights.outcome_distribution.items():
        print(f"   {decision}: {pct*100:.1f}%")

    return {
        "scenario": scenario,
        "context": context,
        "population_size": population_size,
        "scenario_analysis": scenario_analysis,
        "factor_graph": factor_graph,
        "entities": all_entities,
        "decisions": all_decisions,
        "insights": insights,
    }

In [13]:
scenario = """
Our SaaS product currently has a free tier that 50% of our users are on.
We're considering removing the free tier and offering a $29/month starter plan instead.
How will existing free users react?
"""

context = [
    "Average free user has been with us 8 months",
    "Current paid conversion rate from free is 3%",
    "Two main competitors (CompetitorA and CompetitorB) offer free tiers",
    "Our product is primarily used by small businesses and freelancers",
]

# üîß Now scalable to large N!
# Uncomment to run full simulation:
# result = await run_simulation(
#     scenario=scenario,
#     population_size=5000,  # Can handle thousands!
#     context=context,
#     parallel_batch_size=50,  # Control concurrency
#     exploration_ratio=0.1
# )

In [14]:
# Step 1: Decompose the scenario
scenario_analysis = await decompose_scenario(scenario, context)

In [15]:
# Display the scenario analysis
print("Scenario Analysis:")
print(f"Entity Type: {scenario_analysis.entity_type}")
print(f"Decision Type: {scenario_analysis.decision_type}")
print(f"Decision Options: {scenario_analysis.decision_options}")
print(f"\nAnalysis:\n{scenario_analysis.analysis}")

Scenario Analysis:
Entity Type: customer
Decision Type: multi_option
Decision Options: ['Convert to $29/month paid plan', 'Downgrade usage or stop using entirely', "Switch to CompetitorA's free tier", "Switch to CompetitorB's free tier", 'Seek an alternative free solution not mentioned', 'Attempt to negotiate or seek a discount']

Analysis:
The primary decision for free users revolves around perceived value and alternatives when faced with a new paywall. Key factors include the economic sensitivity of small businesses and freelancers, the perceived utility of the product, and the availability of competitors' free tiers. Causal relationships exist where longer tenure increases perceived value and switching costs, while higher dependence on the product for daily operations reduces price sensitivity. Psychological dynamics include loss aversion (reacting more strongly to losing free access) and the endowment effect (valuing something more because they already have it). Economic factors in

In [16]:
# Step 2: Generate factor graph
factor_graph = await generate_factor_graph(scenario, scenario_analysis, context)

In [17]:
# Display the factor graph
print("Factor Graph Attributes:")
print(json.dumps(factor_graph.attributes, indent=2))
print("\nAttribute Graph:")
print(factor_graph.attribute_graph)
print("\nSampling Strategy:")
print(factor_graph.sampling_strategy)

Factor Graph Attributes:
{
  "age": "Age of the primary user/decision-maker for the business",
  "business_size": "Number of employees in the user's business (e.g., '1' for freelancer, '2-10' for small team)",
  "business_revenue": "Annual revenue of the user's business, a proxy for budget",
  "tenure_months": "Number of months the user has been a customer",
  "usage_frequency": "How often the user actively uses the product (e.g., 'daily', 'weekly', 'monthly')",
  "feature_dependency": "Extent to which the user relies on specific, high-value features unavailable in free competitors' tiers",
  "perceived_value": "User's subjective assessment of the product's worth to their business operations",
  "price_sensitivity": "Degree to which the user's decision is influenced by cost changes",
  "loyalty_sentiment": "Affective attachment to the brand, measured by willingness to recommend",
  "awareness_of_alternatives": "User's knowledge of competitor offerings (CompetitorA, CompetitorB, others)

In [18]:
# Step 3: Generate a small batch of entities (for testing)
# Adjust batch_size as needed
batch_size = 100
entities = await generate_entity_batch(
    start_id=0,
    batch_size=batch_size,
    scenario_analysis=scenario_analysis,
    factor_graph=factor_graph,
    exploration_ratio=0.1,
)

print(f"Generated {len(entities)} entities")

Generated 100 entities


In [19]:
# Display a sample entity to inspect
print("Sample Entity:")
sample_entity = entities[0]
print(f"Entity ID: {sample_entity.entity_id}")
print(f"Profile Summary: {sample_entity.profile_summary}")
print("\nAttributes:")
print(json.dumps(sample_entity.attributes, indent=2))

Sample Entity:
Entity ID: E_000000
Profile Summary: Customer with age=29, business_size=1, business_revenue=45000, tenure_months=14, usage_frequency=daily...

Attributes:
{
  "age": 29,
  "business_size": "1",
  "business_revenue": 45000,
  "tenure_months": 14,
  "usage_frequency": "daily",
  "feature_dependency": "extreme",
  "perceived_value": "essential",
  "price_sensitivity": "medium",
  "loyalty_sentiment": "very_high",
  "awareness_of_alternatives": "high",
  "switching_costs_perceived": "very_high",
  "dependence_on_product_for_operations": "critical",
  "freelancer_status": true,
  "income_level": "medium",
  "location": "Portland, OR",
  "decision_attitude": "accepting"
}


In [20]:
# Step 4: Simulate decisions for all entities
decisions = await simulate_batch_decisions(
    entities=entities,
    scenario=scenario,
    scenario_analysis=scenario_analysis,
    context=context,
)

print(f"Simulated {len(decisions)} decisions")

   Batch 1/5: Completed 20/100 decisions...

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

‚ö†Ô∏è  Failed entity E_000032: 'Exception' object has no attribute 'request'

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

‚ö†Ô∏è  Failed entity E_000020: 'Exception' object has no attribute 'request'

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

‚ö†Ô∏è  Failed entity E_000022: 'Exception' object has no attribute 'request'
   Batch 2/5: Completed 40/100 decisions...

[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.

‚ö†Ô∏è  Failed entity 

In [21]:
# Display a sample decision to inspect
print("Sample Decision:")
sample_decision = decisions[0]
print(f"Entity ID: {sample_decision.entity_id}")
print(f"Decision: {sample_decision.decision}")
print(f"Confidence: {sample_decision.confidence}")
print(f"Reasoning: {sample_decision.reasoning}")

Sample Decision:
Entity ID: E_000000
Decision: Switch to CompetitorA's free tier
Confidence: 0.7
Reasoning: As a price-sensitive small business owner, a new $29/month charge is significant. Since I use the product daily but know free options exist, switching is the most logical financial move.


In [22]:
# Quick summary of decisions
decision_counts = {}
for d in decisions:
    decision_counts[d.decision] = decision_counts.get(d.decision, 0) + 1

print("Decision Distribution:")
for decision, count in decision_counts.items():
    percentage = (count / len(decisions)) * 100
    print(f"  {decision}: {count} ({percentage:.1f}%)")

Decision Distribution:
  Switch to CompetitorA's free tier: 49 (49.0%)
  Convert to $29/month paid plan: 33 (33.0%)
  Seek an alternative free solution not mentioned: 4 (4.0%)
  Attempt to negotiate or seek a discount: 7 (7.0%)
  Switch to CompetitorB's free tier: 7 (7.0%)


In [23]:
# Step 5: Aggregate and analyze results
insights = await aggregate_and_analyze(
    scenario=scenario,
    scenario_analysis=scenario_analysis,
    factor_graph=factor_graph,
    entities=entities,
    decisions=decisions,
    context=context,
)

In [24]:
# Display the insights
print("=" * 60)
print("KEY INSIGHT:")
print("=" * 60)
print(insights.key_insight)

print("\n" + "=" * 60)
print("OUTCOME DISTRIBUTION:")
print("=" * 60)
for decision, pct in insights.outcome_distribution.items():
    print(f"  {decision}: {pct*100:.1f}%")

print("\n" + "=" * 60)
print("DETAILED ANALYSIS:")
print("=" * 60)
print(insights.detailed_analysis)

print("\n" + "=" * 60)
print("SEGMENT PATTERNS:")
print("=" * 60)
print(insights.segment_patterns)

print("\n" + "=" * 60)
print("CAUSAL DRIVERS:")
print("=" * 60)
print(insights.causal_drivers)

KEY INSIGHT:
Nearly half of free users will immediately defect to a known competitor's free tier upon removal of the free plan, but a core third who perceive high value will convert to paid, highlighting a sharp divide driven by price sensitivity and awareness of alternatives.

OUTCOME DISTRIBUTION:
  Switch to CompetitorA's free tier: 49.0%
  Convert to $29/month paid plan: 33.0%
  Seek an alternative free solution not mentioned: 4.0%
  Attempt to negotiate or seek a discount: 7.0%
  Switch to CompetitorB's free tier: 7.0%

DETAILED ANALYSIS:
The overall pattern reveals a clear bifurcation in user response, with 49% opting to switch to CompetitorA's free tier and 33% converting to the paid plan, indicating that the user base is sharply divided between cost-sensitive defectors and value-perceiving retainers. This split highlights the significant risk of removing the free tier, as nearly half the user base has immediate, known free alternatives, but also a substantial core sees enough v