# Multi-Agent Restaurant Menu Analytics System
 
## Overview
 This notebook demonstrates **parallel agent processing** with data validation. Multiple specialized agents analyze different aspects of restaurant menu data simultaneously, their outputs are cleaned and standardized, then validated for quality assurance.

 <div align="center">
<img src="lesson_4.png" alt="Alt text" width="650"/>
</div>
 
 ### Key Concepts Covered:
 1. **Parallel Agent Execution**: Multiple agents process data simultaneously using `asyncio.gather()`
 2. **Data Loading Agent**: Specialized agent for file I/O operations
 3. **Domain-Specific Analyzers**: Agents with narrow analytical focus
 4. **Output Standardization**: Cleaning agent formats raw outputs into consistent structure
 5. **Quality Validation**: Checker agent validates completeness and accuracy
 6. **Pipeline Architecture**: Data flows through distinct processing stages

 ## 1. Setup and Configuration

In [1]:
import os
import asyncio
import pandas as pd
from dotenv import load_dotenv

from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion

 ## 2. Define Agent Instructions
 
 Each agent has precise instructions defining its role, behavior, and output format.
 This ensures consistent, predictable agent behavior.

 ### 2.1 CSV Loader Agent

In [2]:
CSV_Loader_Name = "CSVLoader"
CSV_Loader_Instructions = """
    You are a CSV Loader Agent.
    Your role is to read menu data from a CSV file, extract its contents, and return it as a clean, comma-separated string.
    You do not perform analysis — only data loading and formatting.
    Keep the output concise and ready for downstream analysis.
"""

 ### 2.2 Main Dish Analyzer Agent

In [3]:
Main_Dish_Analyzer_Name = "MainDishAnalyzer"
Main_Dish_Analyzer_Instructions = """
    AI Agent Persona: Main Dish Analytics Assistant
    Role: A specialized assistant focused exclusively on analyzing main dish menu items and calculating descriptive statistics.
    Behavior: The agent does not answer questions outside the scope of main dish analysis.
    Response Style: Always provide calculated results clearly and concisely.
    
    Agent Instructions:
    From the restaurant menu dataset provided, extract the MAIN DISHES and analyze these items only.
    Main dishes include: pasta, steak, chicken, fish, burgers, pizza, risotto, lamb, pork, seafood entrees.
    
    Calculate descriptive statistics including:
    - Mean, median, standard deviation, minimum, and maximum for price
    - Mean, median, standard deviation for calories (if available)
    - Count of items in this category
    
    Ensure all calculations are based on cleaned data (after removing any anomalies or outliers).
    Present the results in a clear, structured format for immediate interpretation.
    
    Descriptive statistics MUST be presented in JSON format, with two tables:
    1. Price Statistics Table
    2. Nutritional Statistics Table (calories, etc.)
    Add clear titles to the JSON tables.
"""

 ### 2.3 Beverage Analyzer Agent

In [4]:
Beverage_Analyzer_Name = "BeverageAnalyzer"
Beverage_Analyzer_Instructions = """
    AI Agent Persona: Beverage Analytics Assistant
    Role: A specialized assistant focused exclusively on analyzing beverage menu items and calculating descriptive statistics.
    Behavior: The agent does not answer questions outside the scope of beverage analysis.
    Response Style: Always provide calculated results clearly and concisely.
    
    Agent Instructions:
    From the restaurant menu dataset provided, extract the BEVERAGES and analyze these items only.
    Beverages include: coffee, tea, juice, soda, wine, beer, cocktails, smoothies, milkshakes, water.
    
    Calculate descriptive statistics including:
    - Mean, median, standard deviation, minimum, and maximum for price
    - Mean, median, standard deviation for calories (if available)
    - Count of items in this category
    
    Ensure all calculations are based on cleaned data (after removing any anomalies or outliers).
    Present the results in a clear, structured format for immediate interpretation.
    
    Descriptive statistics MUST be presented in JSON format, with two tables:
    1. Price Statistics Table
    2. Nutritional Statistics Table (calories, etc.)
    Add clear titles to the JSON tables.
"""

 ### 2.4 Output Cleaning Agent

In [5]:
Clean_Output_Agent_Name = "CleanOutputAgent"
Clean_Output_Agent_Instructions = """
    AI Agent Persona: Output Cleaning Specialist
    Role: To process and sanitize the raw outputs from analysis agents.
    Behavior: You do not perform new analysis — you only extract and format existing results.
    Response Style: Always output in a clean, minimal, structured format.

    Cleaning Tasks:
    1. From each analyzer's output, extract:
       - The list of identified menu items
       - The JSON table for price statistics
       - The JSON table for nutritional statistics
    2. Remove any unrelated text, explanations, commentary, or markdown formatting.
    3. Present the cleaned data in the following standardized JSON structure:

    {
        "MainDishes": {
            "IdentifiedItems": [...],
            "PriceStatistics": {...},
            "NutritionalStatistics": {...}
        },
        "Beverages": {
            "IdentifiedItems": [...],
            "PriceStatistics": {...},
            "NutritionalStatistics": {...}
        }
    }

    Output ONLY valid JSON. No explanations, no markdown, no additional text.
"""

 ### 2.5 Analysis Validation Agent

In [27]:
Analysis_Checker_Name = "AnalysisChecker"
Analysis_Checker_Instructions = """
    AI Agent Persona: Data Analysis Validation Auditor
    Role: A specialized agent responsible for verifying that analytics tasks are completed correctly by other agents.
    Behavior: The agent does not perform analysis itself but evaluates the completeness and accuracy of other agents' outputs.
    Response Style: Always provide a clear, structured validation report or approval.

    Validation Tasks:
    1. Verify Main Dish Analysis:
         Main dish items are identified
         Two JSON tables are present: one for price statistics and one for nutritional statistics
         All required statistics are present (mean, median, std, min, max)
        
    2. Verify Beverage Analysis:
         Beverage items are identified
         Two JSON tables are present: one for price statistics and one for nutritional statistics
         All required statistics are present (mean, median, std, min, max)

    Decision Logic:
    - If BOTH analyses meet ALL criteria → output: "APPROVED: All analyses complete and valid."
    - If EITHER analysis fails ANY check → output a detailed error message specifying:
      * Which category failed (Main Dishes or Beverages)
      * Which specific requirement was not met
      * What needs to be corrected
"""

 ## 3. Load Environment and Initialize Services

In [7]:
load_dotenv()
api_key = os.getenv("AZURE_OPENAI_KEY")
url = os.getenv("URL")
api_version = "2024-12-01-preview"

# Create kernel
kernel = Kernel()

# Configure Azure OpenAI service
chat_service = AzureChatCompletion(
    deployment_name="none", 
    api_key=api_key,
    base_url=url,
    api_version=api_version
)

# Register service with kernel
kernel.add_service(chat_service)

 ## 4. Instantiate All Agents

In [8]:
agent_csv_loader = ChatCompletionAgent(
    service=chat_service,
    name=CSV_Loader_Name,
    instructions=CSV_Loader_Instructions,
)

agent_main_dish_analyzer = ChatCompletionAgent(
    service=chat_service,
    name=Main_Dish_Analyzer_Name,
    instructions=Main_Dish_Analyzer_Instructions,
)

agent_beverage_analyzer = ChatCompletionAgent(
    service=chat_service,
    name=Beverage_Analyzer_Name,
    instructions=Beverage_Analyzer_Instructions,
)

agent_clean_output = ChatCompletionAgent(
    service=chat_service,
    name=Clean_Output_Agent_Name,
    instructions=Clean_Output_Agent_Instructions,
)

agent_checker = ChatCompletionAgent(
    service=chat_service,
    name=Analysis_Checker_Name,
    instructions=Analysis_Checker_Instructions,
)

 ## 5. Helper Functions

  ### 5.1 Agent Execution Helper

In [9]:
async def run_agent(agent, task_input):
    """
    Executes an agent and collects all output messages.
    
    Args:
        agent: The ChatCompletionAgent to invoke
        task_input: The input prompt/data for the agent
    
    Returns:
        List of message objects from the agent
    """
    outputs = []
    async for message in agent.invoke(task_input):
        outputs.append(message)
    return outputs

 ### 5.2 CSV Loading Function

In [10]:
def load_csv_file(file_path):
    """
    Loads a CSV file and converts it to a flat comma-separated string.
    
    Args:
        file_path: Path to the CSV file
    
    Returns:
        String representation of CSV data
    """
    df = pd.read_csv(file_path)
    # Flatten the dataframe and join as comma-separated string
    flat_data = ", ".join(map(str, df.values.flatten()))
    return flat_data

 ## 6. Parallel Analysis Function
 
 This is the **key**: using `asyncio.gather()` to run multiple agents simultaneously.
 This dramatically reduces total processing time compared to sequential execution.

In [28]:
async def parallel_analysis(task_input: str):
    """
    Runs multiple analyzer agents in parallel using asyncio.gather().
    
    This approach is much faster than sequential execution:
    - Sequential: Time = T1 + T2
    - Parallel: Time ≈ max(T1, T2)
    
    Args:
        task_input: The data to be analyzed
    
    Returns:
        Dictionary with results from both analyzers
    """
    # Execute both agents simultaneously
    results = await asyncio.gather(
        run_agent(agent_main_dish_analyzer, task_input),
        run_agent(agent_beverage_analyzer, task_input)
    )
    
    # Package results into structured dictionary
    merged_output = {
        "MainDishAnalyzer": results[0][0].content,
        "BeverageAnalyzer": results[1][0].content
    }
    return merged_output

 ## 7. Main Processing Pipeline
 
 This demonstrates the complete **four-stage pipeline**:
 1. **Load**: Read data from CSV
 2. **Analyze**: Parallel processing by specialized agents
 3. **Clean**: Standardize outputs into consistent format
 4. **Validate**: Quality check by validation agent

In [29]:
async def main():
    """
    Orchestrates the complete restaurant menu analytics pipeline.
    
    Pipeline Stages:
    Stage 1: Data Loading
    Stage 2: Parallel Analysis (Main Dishes + Beverages)
    Stage 3: Output Cleaning and Standardization
    Stage 4: Validation and Quality Assurance
    """
    
    print("=" * 70)
    print("RESTAURANT MENU ANALYTICS SYSTEM")
    print("=" * 70)
    
    # ==================== STAGE 1: DATA LOADING ====================
    print("\nSTAGE 1: Loading CSV Data")
    print("-" * 70)
    
    csv_path = "restaurant_menu.csv"  # Change to your CSV file path
    csv_data = load_csv_file(csv_path)
    print(f"CSV data loaded successfully")
    print(f"Data preview: {csv_data[:200]}...\n")
    
    # ==================== STAGE 2: PARALLEL ANALYSIS ====================
    print(" STAGE 2: Running Parallel Analysis")
    print("-" * 70)
    print("MainDishAnalyzer and BeverageAnalyzer executing simultaneously...")
    
    raw_results = await parallel_analysis(f"Analyze this restaurant menu data: {csv_data}")
    
    print("\nAnalysis complete!")
    print("\nRAW ANALYZER OUTPUTS:")
    print("-" * 70)
    for analyzer, output in raw_results.items():
        print(f"\n[{analyzer}]")
        print(output)
    
    # ==================== STAGE 3: OUTPUT CLEANING ====================
    print("\n\nSTAGE 3: Cleaning and Standardizing Outputs")
    print("-" * 70)
    
    clean_input = f"Clean the following outputs: {raw_results}"
    clean_result = await run_agent(agent_clean_output, clean_input)
    cleaned_output = clean_result[0].content
    
    print("Output cleaned and standardized")
    print("\nCLEANED OUTPUT:")
    print("-" * 70)
    print(cleaned_output)
    
    # ==================== STAGE 4: VALIDATION ====================
    print("\n\nSTAGE 4: Validating Analysis Quality")
    print("-" * 70)
    
    checker_result = await run_agent(
        agent_checker, 
        f"Check this cleaned output for completeness and accuracy: {cleaned_output}"
    )
    
    print("VALIDATION RESULT:")
    print("-" * 70)
    print(checker_result[0].content)
    
    print("\n" + "=" * 70)
    print("PIPELINE COMPLETE")
    print("=" * 70)

 ## 8. Run the Pipeline

In [30]:
await main()

RESTAURANT MENU ANALYTICS SYSTEM

STAGE 1: Loading CSV Data
----------------------------------------------------------------------
CSV data loaded successfully
Data preview: MENU5506, Spaghetti Carbonara, pasta, 17.46, 720, True, MENU2679, Fettuccine Alfredo, pasta, 15.17, 680, True, MENU2424, Penne Arrabbiata, pasta, 15.52, 620, False, MENU1488, Lasagna Bolognese, pasta,...

 STAGE 2: Running Parallel Analysis
----------------------------------------------------------------------
MainDishAnalyzer and BeverageAnalyzer executing simultaneously...

Analysis complete!

RAW ANALYZER OUTPUTS:
----------------------------------------------------------------------

[MainDishAnalyzer]
{
  "Price Statistics Table": {
    "Title": "Main Dish Price Statistics",
    "Mean": 22.26,
    "Median": 18.70,
    "StandardDeviation": 8.11,
    "Minimum": 12.65,
    "Maximum": 42.40,
    "Count": 32
  },
  "Nutritional Statistics Table": {
    "Title": "Main Dish Nutrition (Calories) Statistics",
    "Mean

 ## Key Takeaways
 
 ### 1. **Parallel Processing with asyncio.gather()**
 The most important concept in this notebook:
 ```python
 results = await asyncio.gather(
     run_agent(agent1, input),
     run_agent(agent2, input)
 )
 ```
 
 **Benefits:**
-  **Speed**: Both agents run simultaneously, not sequentially
-  **Scalability**: Can easily add more agents to gather()
-  **Cost**: Same API costs as sequential, but much faster
 
 **When to Use:**
 - Agents don't depend on each other's outputs
 - Tasks can be performed independently
 - You want to minimize total execution time
 
 ### 2. **Four-Stage Pipeline Architecture**
 
 | Stage | Agent | Purpose |
 |-------|-------|---------|
 | 1. Load | CSVLoader | Import data from file |
 | 2. Analyze | MainDish + Beverage | Parallel analysis |
 | 3. Clean | CleanOutputAgent | Standardize format |
 | 4. Validate | AnalysisChecker | Quality assurance |
 
 This creates a **quality-controlled data pipeline** where:
 - Each stage has a single responsibility
 - Outputs are progressively refined
 - Final output is validated for completeness
 
 ### 3. **Separation of Concerns**
 Notice how agents are highly specialized:
 - **Loaders** only load, don't analyze
 - **Analyzers** only analyze, don't format
 - **Cleaners** only format, don't analyze or validate
 - **Validators** only check, don't modify
 
 This makes the system:
 -  Easier to test (test each stage independently)
 -  Easier to debug (isolate failures to specific stages)
 -  Easier to modify (change one agent without affecting others)
 
 ### 4. **Output Standardization**
 The CleanOutputAgent ensures:
 - Consistent JSON structure across runs
 - Removal of unnecessary explanatory text
 - Machine-readable format for downstream systems
 - Validation-ready format
 
 ### 5. **Quality Validation Pattern**
 The AnalysisChecker implements a **quality gate**:
 - Verifies completeness (all required fields present)
 - Checks correctness (data in expected format)
 - Provides actionable feedback on failures
 - Acts as final quality control before output delivery
 
 ### 6. **Performance Comparison**
 
 **Sequential Processing:**
 ```
 Time = Load + Analyze1 + Analyze2 + Clean + Validate
 Time = 2s + 8s + 8s + 3s + 2s = 23s
 ```
 
 **Parallel Processing (this approach):**
 ```
 Time = Load + max(Analyze1, Analyze2) + Clean + Validate
 Time = 2s + max(8s, 8s) + 3s + 2s = 15s
 Speedup: 35% faster! ⚡
 ```
 
 ### 7. **Production Enhancements**
 
 **Error Handling:**
 ```python
 try:
     results = await asyncio.gather(
         run_agent(agent1, input),
         run_agent(agent2, input),
         return_exceptions=True  # Don't fail entire batch
     )
 except Exception as e:
     handle_error(e)
 ```
 
 **Retry Logic:**
 ```python
 async def run_agent_with_retry(agent, input, max_retries=3):
     for attempt in range(max_retries):
         try:
             return await run_agent(agent, input)
         except Exception:
             if attempt == max_retries - 1:
                 raise
             await asyncio.sleep(2 ** attempt)
 ```
 
 **Progress Tracking:**
 ```python
 from tqdm.asyncio import tqdm
 
 results = await tqdm.gather(
     run_agent(agent1, input),
     run_agent(agent2, input),
     desc="Analyzing"
 )
 ```
 
 **Caching:**
 ```python
 from functools import lru_cache
 
 @lru_cache(maxsize=100)
 def load_csv_file(file_path):
     # Cache expensive file I/O operations
     pass
 ```

