# Sample 05: Multi-Agent Orchestration System

This notebook demonstrates a sophisticated multi-agent architecture for building AI-powered agent systems using Microsoft Foundry Local.

## Overview

This sample implements a **multi-agent coordinator** that orchestrates specialized agents:

- 🔍 **Retrieval Agent**: Extracts relevant information from knowledge sources
- 🧠 **Reasoning Agent**: Performs step-by-step analysis and logical reasoning
- ⚡ **Execution Agent**: Creates actionable plans in structured formats
- 🎯 **Coordinator**: Orchestrates the entire agent workflow

## Architecture Pattern

```
User Goal → Coordinator
     ↓
1. Retrieval Agent → Context
     ↓
2. Reasoning Agent → Decision
     ↓
3. Execution Agent → Actions
     ↓
Structured Result
```

## Prerequisites and Setup

Make sure you have Foundry Local running with a capable model:

In [None]:
# Install required packages
!pip install openai foundry-local-sdk

## Import Libraries and Configuration

In [None]:
import os
import json
import time
from typing import Dict, Any, List
from openai import OpenAI

try:
    from foundry_local import FoundryLocalManager
    FOUNDRY_SDK_AVAILABLE = True
    print("✅ Foundry Local SDK is available")
except ImportError:
    FOUNDRY_SDK_AVAILABLE = False
    print("⚠️ Foundry Local SDK not available, will use manual configuration")

# Configuration
MODEL_ALIAS = "phi-4-mini"  # Change to your preferred model
BASE_URL = "http://localhost:8000"
API_KEY = ""

## Foundry Client Setup

Create a shared client for all agents:

In [None]:
class FoundryClient:
    """Shared client for all specialist agents."""
    
    def __init__(self, model_alias: str = MODEL_ALIAS):
        self.client = None
        self.model_name = None
        self.model_alias = model_alias
        self._initialize_client()
    
    def _initialize_client(self):
        """Initialize OpenAI client with Foundry Local or fallback configuration."""
        if FOUNDRY_SDK_AVAILABLE:
            try:
                print(f"🔄 Initializing Foundry Local with model: {self.model_alias}...")
                manager = FoundryLocalManager(self.model_alias)
                model_info = manager.get_model_info(self.model_alias)
                
                self.client = OpenAI(
                    base_url=manager.endpoint,
                    api_key=manager.api_key
                )
                self.model_name = model_info.id
                print(f"✅ Foundry Local SDK initialized with model: {self.model_name}")
                return
            except Exception as e:
                print(f"⚠️ Could not use Foundry SDK ({e}), falling back to manual configuration")
        
        # Fallback to manual configuration
        self.client = OpenAI(
            base_url=f"{BASE_URL}/v1",
            api_key=API_KEY
        )
        self.model_name = self.model_alias
        print(f"🔧 Manual configuration initialized with model: {self.model_name}")
    
    def chat(self, messages: List[Dict[str, str]], max_tokens: int = 300, temperature: float = 0.4) -> str:
        """Send chat completion request to the model."""
        try:
            response = self.client.chat.completions.create(
                model=self.model_name,
                messages=messages,
                max_tokens=max_tokens,
                temperature=temperature
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error generating response: {str(e)}"
    
    def check_health(self) -> bool:
        """Check if the client is working properly."""
        try:
            test_response = self.chat(
                [{"role": "user", "content": "Say 'OK'"}],
                max_tokens=5
            )
            return "OK" in test_response and "Error" not in test_response
        except:
            return False

# Initialize the shared client
print("Initializing Foundry Client...")
foundry_client = FoundryClient()

# Health check
if foundry_client.check_health():
    print("✅ Client health check passed!")
else:
    print("❌ Client health check failed. Please ensure Foundry Local is running with a model.")

## Specialized Agent Classes

Each agent is optimized for specific cognitive tasks:

In [None]:
class RetrievalAgent:
    """Agent specialized in retrieving relevant information from knowledge sources."""
    
    SYSTEM = """You are a specialized retrieval agent. Your job is to extract and retrieve 
    the most relevant information from knowledge sources based on a given query. Focus on key facts, 
    data points, and contextual information that would be useful for decision-making."""
    
    def __init__(self, client: FoundryClient):
        self.client = client
    
    def run(self, query: str) -> str:
        """Retrieve relevant information based on the query."""
        messages = [
            {"role": "system", "content": self.SYSTEM},
            {
                "role": "user", 
                "content": f"""Query: {query}

Retrieve the most relevant key facts, data points, and contextual information that would 
help answer this query or support decision-making around it. Provide specific, actionable 
information rather than general statements."""
            }
        ]
        return self.client.chat(messages)


class ReasoningAgent:
    """Agent specialized in step-by-step analysis and reasoning."""
    
    SYSTEM = """You are a specialized reasoning agent. Your job is to analyze inputs 
    step-by-step and produce structured, logical conclusions. Break down complex problems 
    into manageable parts and provide clear reasoning for your conclusions."""
    
    def __init__(self, client: FoundryClient):
        self.client = client
    
    def run(self, context: str, question: str) -> str:
        """Analyze context and question to produce structured conclusions."""
        messages = [
            {"role": "system", "content": self.SYSTEM},
            {
                "role": "user", 
                "content": f"""Context:
{context}

Question: {question}

Analyze this step-by-step and provide a structured, logical conclusion with clear reasoning. 
Break down the problem, consider different angles, and provide a well-reasoned decision or recommendation."""
            }
        ]
        return self.client.chat(messages, max_tokens=400)


class ExecutionAgent:
    """Agent specialized in creating actionable execution plans."""
    
    SYSTEM = """You are a specialized execution agent. Your job is to transform decisions 
    and conclusions into concrete, actionable steps. Always format your response as valid JSON 
    with an array of action items. Each action should be specific, measurable, and achievable."""
    
    def __init__(self, client: FoundryClient):
        self.client = client
    
    def run(self, decision: str) -> str:
        """Transform decision into actionable steps in JSON format."""
        messages = [
            {"role": "system", "content": self.SYSTEM},
            {
                "role": "user", 
                "content": f"""Decision/Conclusion:
{decision}

Create 3-5 specific, actionable steps to implement this decision. Format as JSON with this structure:
{{
  "actions": [
    {{
      "step": 1,
      "description": "Specific action description",
      "priority": "high/medium/low",
      "timeline": "timeframe for completion",
      "resources": ["required resources or people"]
    }}
  ]
}}"""
            }
        ]
        return self.client.chat(messages, max_tokens=400, temperature=0.3)

print("✅ Agent classes defined")

## Multi-Agent Coordinator

The coordinator orchestrates all agents to handle complex tasks:

In [None]:
class Coordinator:
    """Multi-agent coordinator that orchestrates specialist agents to handle complex tasks."""
    
    def __init__(self, client: FoundryClient):
        """Initialize the coordinator with specialist agents."""
        self.client = client
        self.retrieval = RetrievalAgent(client)
        self.reasoning = ReasoningAgent(client)
        self.execution = ExecutionAgent(client)
    
    def handle(self, user_goal: str) -> Dict[str, Any]:
        """
        Orchestrate multiple agents to handle a complex user goal.
        
        Args:
            user_goal: The user's high-level goal or request
            
        Returns:
            Dictionary containing the goal, context, decision, and actions
        """
        print(f"🎯 **Coordinator:** Processing goal: {user_goal}")
        print("=" * 60)
        
        start_time = time.time()
        
        # Step 1: Retrieve relevant context
        print("📚 **Step 1:** Retrieving context...")
        context = self.retrieval.run(user_goal)
        print(f"   ✅ Context retrieved ({len(context)} chars)")
        print(f"   📄 Preview: {context[:150]}...\n")
        
        # Step 2: Analyze and reason about the context
        print("🧠 **Step 2:** Analyzing and reasoning...")
        decision = self.reasoning.run(context, user_goal)
        print(f"   ✅ Analysis completed ({len(decision)} chars)")
        print(f"   💡 Preview: {decision[:150]}...\n")
        
        # Step 3: Create actionable execution plan
        print("⚡ **Step 3:** Creating execution plan...")
        actions = self.execution.run(decision)
        print(f"   ✅ Execution plan created ({len(actions)} chars)")
        
        # Try to parse actions as JSON for preview
        try:
            actions_json = json.loads(actions)
            action_count = len(actions_json.get('actions', []))
            print(f"   📋 Actions planned: {action_count}\n")
        except:
            print(f"   📋 Actions: {actions[:100]}...\n")
        
        end_time = time.time()
        processing_time = end_time - start_time
        
        result = {
            "goal": user_goal,
            "context": context,
            "decision": decision,
            "actions": actions,
            "agent_flow": ["retrieval", "reasoning", "execution"],
            "processing_time": processing_time,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        print(f"✅ **Coordination Complete** (⏱️ {processing_time:.2f}s)")
        return result
    
    def handle_with_feedback(self, user_goal: str, feedback_rounds: int = 1) -> Dict[str, Any]:
        """
        Handle a goal with multiple feedback rounds for refinement.
        
        Args:
            user_goal: The user's high-level goal or request
            feedback_rounds: Number of feedback rounds to perform
            
        Returns:
            Dictionary containing the refined result
        """
        result = self.handle(user_goal)
        
        for round_num in range(feedback_rounds):
            print(f"\n🔄 **Feedback Round {round_num + 1}:**")
            print("-" * 40)
            
            # Use reasoning agent to refine the execution plan
            refinement_prompt = f"""
            Original Goal: {user_goal}
            Current Decision: {result['decision']}
            Current Actions: {result['actions']}
            
            Review the above and suggest improvements or refinements to make the execution plan more effective.
            Consider potential challenges, resource optimization, and success metrics.
            """
            
            refined_decision = self.reasoning.run(result['context'], refinement_prompt)
            refined_actions = self.execution.run(refined_decision)
            
            result['decision'] = refined_decision
            result['actions'] = refined_actions
            result['refinement_rounds'] = round_num + 1
            
            print(f"   ✅ Round {round_num + 1} refinement completed")
        
        return result

# Initialize coordinator
coordinator = Coordinator(foundry_client)
print("✅ Multi-agent coordinator initialized")

## Example 1: Business Planning

Let's test the coordinator with a business planning goal:

In [None]:
# Business planning example
business_goal = "Create a plan to onboard 5 new customers this month"

print(f"🚀 **Business Planning Example**")
print(f"📋 Goal: {business_goal}")
print("=" * 80)

business_result = coordinator.handle(business_goal)

print("\n📊 **Final Result Summary:**")
print("=" * 50)
print(f"🎯 **Goal:** {business_result['goal']}")
print(f"⏱️ **Processing Time:** {business_result['processing_time']:.2f} seconds")
print(f"🕒 **Timestamp:** {business_result['timestamp']}")

print(f"\n📚 **Context (Retrieval Agent):**")
print(business_result['context'])

print(f"\n🧠 **Decision (Reasoning Agent):**")
print(business_result['decision'])

print(f"\n⚡ **Actions (Execution Agent):**")
print(business_result['actions'])

## Example 2: Strategy Development

Test with a more complex strategy development goal:

In [None]:
# Strategy development example
strategy_goal = "Develop a strategy to improve team productivity by 20% while maintaining work-life balance"

print(f"🎯 **Strategy Development Example**")
print(f"📋 Goal: {strategy_goal}")
print("=" * 80)

strategy_result = coordinator.handle(strategy_goal)

print("\n📊 **Structured Action Plan:**")
print("=" * 40)

# Try to parse and display actions in a structured format
try:
    actions_data = json.loads(strategy_result['actions'])
    if 'actions' in actions_data:
        for i, action in enumerate(actions_data['actions'], 1):
            print(f"\n📌 **Action {i}:**")
            print(f"   📝 Description: {action.get('description', 'N/A')}")
            print(f"   🔥 Priority: {action.get('priority', 'N/A')}")
            print(f"   ⏰ Timeline: {action.get('timeline', 'N/A')}")
            print(f"   🛠️ Resources: {', '.join(action.get('resources', ['N/A']))}")
    else:
        print(strategy_result['actions'])
except json.JSONDecodeError:
    print("Raw actions output:")
    print(strategy_result['actions'])

## Example 3: Feedback Loop Refinement

Demonstrate the feedback mechanism for iterative improvement:

In [None]:
# Feedback loop example
feedback_goal = "Design a customer feedback collection system for a software product"

print(f"🔄 **Feedback Loop Refinement Example**")
print(f"📋 Goal: {feedback_goal}")
print("=" * 80)

# Process with 2 feedback rounds
feedback_result = coordinator.handle_with_feedback(feedback_goal, feedback_rounds=2)

print("\n🏆 **Final Refined Result:**")
print("=" * 50)
print(f"🎯 **Goal:** {feedback_result['goal']}")
print(f"🔄 **Refinement Rounds:** {feedback_result.get('refinement_rounds', 0)}")
print(f"⏱️ **Total Processing Time:** {feedback_result['processing_time']:.2f} seconds")

print(f"\n🧠 **Final Decision:**")
print(feedback_result['decision'])

print(f"\n⚡ **Final Action Plan:**")
print(feedback_result['actions'])

## Interactive Agent Testing

Test individual agents separately to understand their specialized capabilities:

In [None]:
def test_individual_agents(query: str):
    """Test each agent individually with the same query."""
    print(f"🧪 **Individual Agent Testing**")
    print(f"❓ Query: {query}")
    print("=" * 60)
    
    # Test Retrieval Agent
    print("\n🔍 **Retrieval Agent:**")
    retrieval_result = coordinator.retrieval.run(query)
    print(retrieval_result)
    
    # Test Reasoning Agent (using retrieval result as context)
    print("\n🧠 **Reasoning Agent:**")
    reasoning_result = coordinator.reasoning.run(retrieval_result, query)
    print(reasoning_result)
    
    # Test Execution Agent (using reasoning result)
    print("\n⚡ **Execution Agent:**")
    execution_result = coordinator.execution.run(reasoning_result)
    print(execution_result)

# Test with a simple query
test_query = "How can we reduce customer support response time?"
test_individual_agents(test_query)

## Custom Goal Testing

Use this cell to test your own goals:

In [None]:
# Custom goal testing - modify the goal below
custom_goal = "Create a training program for new AI engineers joining our company"

print(f"🎨 **Custom Goal Testing**")
print(f"📋 Your Goal: {custom_goal}")
print("=" * 60)

# Choose processing method
use_feedback = True  # Set to True for feedback rounds, False for basic processing
feedback_rounds = 1  # Number of feedback rounds if use_feedback is True

if use_feedback:
    custom_result = coordinator.handle_with_feedback(custom_goal, feedback_rounds=feedback_rounds)
    print(f"\n✨ **Result with {feedback_rounds} feedback round(s):**")
else:
    custom_result = coordinator.handle(custom_goal)
    print(f"\n✨ **Basic Result:**")

print("=" * 50)
print(f"📚 **Context:** {custom_result['context'][:200]}...")
print(f"\n🧠 **Decision:** {custom_result['decision'][:200]}...")
print(f"\n⚡ **Actions:** {custom_result['actions'][:200]}...")

# Show processing stats
print(f"\n📊 **Statistics:**")
print(f"   ⏱️ Processing Time: {custom_result['processing_time']:.2f}s")
print(f"   🔄 Refinement Rounds: {custom_result.get('refinement_rounds', 0)}")
print(f"   📏 Total Content Length: {len(custom_result['context']) + len(custom_result['decision']) + len(custom_result['actions'])} chars")

## Performance Analysis

Analyze the performance of the multi-agent system:

In [None]:
def performance_benchmark(goals: List[str], iterations: int = 2) -> Dict[str, Any]:
    """Benchmark the coordinator performance with multiple goals."""
    results = []
    
    print(f"📊 **Performance Benchmark**")
    print(f"🎯 Goals: {len(goals)}")
    print(f"🔄 Iterations per goal: {iterations}")
    print("=" * 50)
    
    for i, goal in enumerate(goals, 1):
        print(f"\n🎯 **Goal {i}:** {goal[:50]}...")
        goal_times = []
        
        for j in range(iterations):
            print(f"   🔄 Iteration {j+1}/{iterations}...", end=" ")
            start_time = time.time()
            
            try:
                result = coordinator.handle(goal)
                end_time = time.time()
                processing_time = end_time - start_time
                goal_times.append(processing_time)
                print(f"✅ {processing_time:.2f}s")
            except Exception as e:
                print(f"❌ Error: {e}")
        
        if goal_times:
            avg_time = sum(goal_times) / len(goal_times)
            results.append({
                "goal": goal,
                "avg_time": avg_time,
                "min_time": min(goal_times),
                "max_time": max(goal_times),
                "times": goal_times
            })
    
    return results

# Benchmark with different types of goals
benchmark_goals = [
    "Create a social media marketing strategy",
    "Improve employee onboarding process",
    "Design a mobile app user interface",
    "Plan a product launch campaign"
]

benchmark_results = performance_benchmark(benchmark_goals, iterations=2)

# Display benchmark summary
print("\n🏆 **Benchmark Summary:**")
print("=" * 50)
for result in benchmark_results:
    print(f"📝 {result['goal'][:40]}...")
    print(f"   ⏱️ Average: {result['avg_time']:.2f}s")
    print(f"   ⚡ Fastest: {result['min_time']:.2f}s")
    print(f"   🐌 Slowest: {result['max_time']:.2f}s")
    print()

if benchmark_results:
    overall_avg = sum(r['avg_time'] for r in benchmark_results) / len(benchmark_results)
    print(f"📊 **Overall Average Processing Time:** {overall_avg:.2f}s")

## Production Deployment Helper

Example of how to wrap the coordinator for production use:

In [None]:
class ProductionCoordinator:
    """Production-ready wrapper for the multi-agent coordinator."""
    
    def __init__(self, model_alias: str = "phi-4-mini"):
        self.client = FoundryClient(model_alias)
        self.coordinator = Coordinator(self.client)
        self.request_count = 0
        self.total_processing_time = 0
    
    def process_goal(self, goal: str, include_feedback: bool = False, feedback_rounds: int = 1) -> Dict[str, Any]:
        """Process a goal with production monitoring."""
        self.request_count += 1
        
        try:
            if include_feedback:
                result = self.coordinator.handle_with_feedback(goal, feedback_rounds=feedback_rounds)
            else:
                result = self.coordinator.handle(goal)
            
            self.total_processing_time += result['processing_time']
            
            # Add production metadata
            result['request_id'] = self.request_count
            result['status'] = 'success'
            result['model'] = self.client.model_name
            
            return result
            
        except Exception as e:
            return {
                'request_id': self.request_count,
                'status': 'error',
                'error': str(e),
                'goal': goal,
                'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
            }
    
    def get_stats(self) -> Dict[str, Any]:
        """Get production statistics."""
        avg_processing_time = self.total_processing_time / max(1, self.request_count)
        
        return {
            'total_requests': self.request_count,
            'total_processing_time': self.total_processing_time,
            'average_processing_time': avg_processing_time,
            'model': self.client.model_name,
            'client_healthy': self.client.check_health()
        }

# Example production usage
prod_coordinator = ProductionCoordinator()

# Process a goal
prod_goal = "Create a quarterly business review presentation"
prod_result = prod_coordinator.process_goal(prod_goal)

print(f"🏭 **Production Processing Result:**")
print(f"📊 Status: {prod_result['status']}")
print(f"🔢 Request ID: {prod_result['request_id']}")
print(f"⏱️ Processing Time: {prod_result.get('processing_time', 'N/A')}s")
print(f"🤖 Model: {prod_result.get('model', 'N/A')}")

# Show production stats
stats = prod_coordinator.get_stats()
print(f"\n📊 **Production Statistics:**")
print(f"   📈 Total Requests: {stats['total_requests']}")
print(f"   ⏱️ Average Processing Time: {stats['average_processing_time']:.2f}s")
print(f"   💚 Client Health: {'✅ Healthy' if stats['client_healthy'] else '❌ Unhealthy'}")

## Summary and Best Practices

This notebook demonstrated a sophisticated multi-agent orchestration system:

### ✅ Key Features Demonstrated

1. **🏗️ Agent Specialization**: Each agent optimized for specific cognitive tasks
2. **🎯 Workflow Orchestration**: Coordinated multi-step processing
3. **📋 Structured Output**: JSON-formatted action plans
4. **🔄 Feedback Loops**: Multi-round refinement capabilities
5. **⚡ Performance Monitoring**: Processing time and health checks
6. **🏭 Production Ready**: Enterprise-grade wrapper with monitoring

### 🧠 Agent Roles Summary

| Agent | Purpose | Input | Output |
|-------|---------|-------|--------|
| **🔍 Retrieval** | Extract relevant information | User query | Contextual facts and data |
| **🧠 Reasoning** | Logical analysis | Context + question | Structured decision |
| **⚡ Execution** | Create action plans | Decision | JSON action steps |
| **🎯 Coordinator** | Orchestrate workflow | User goal | Complete result |

### 🚀 Use Cases

- **Business Planning**: Strategic planning and execution
- **Project Management**: Task decomposition and scheduling  
- **Research**: Information gathering and analysis
- **Decision Support**: Complex decision-making processes
- **Workflow Automation**: Multi-step business processes

### 💡 Best Practices

1. **🎯 Single Responsibility**: Each agent has one clear purpose
2. **🔗 Clear Interfaces**: Standardized input/output formats
3. **🛡️ Error Handling**: Graceful degradation on failures
4. **📊 Monitoring**: Comprehensive logging and performance tracking
5. **🔄 Feedback Loops**: Iterative improvement mechanisms
6. **⚖️ Load Balancing**: Consider parallel processing for independent tasks

### 🔮 Next Steps

- **🔧 Function Calling**: Integrate with external APIs and tools
- **🧠 Memory Systems**: Add persistent memory for agents
- **🎭 Specialized Models**: Use different models for different agents
- **👥 Human-in-the-Loop**: Add human review and approval steps
- **📊 Advanced Analytics**: Comprehensive monitoring and metrics

This multi-agent system demonstrates how to build sophisticated AI workflows that combine the strengths of specialized agents while maintaining the privacy and performance benefits of local inference with Microsoft Foundry Local.