# Advanced Agents with DSPy

This notebook demonstrates how to build sophisticated AI agents using DSPy. We'll cover:

1. Agent architecture and design patterns
2. Tool integration and function calling
3. Memory and state management
4. Multi-agent coordination
5. Planning and execution strategies
6. Error handling and recovery

Agents are autonomous systems that can perceive their environment, make decisions, and take actions to achieve goals.

In [None]:
import dspy
import json
import time
from typing import List, Dict, Any, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod

# Configure DSPy
lm = dspy.OpenAI(model="gpt-4o")
dspy.settings.configure(lm=lm)

## Core Agent Components

In [None]:
class AgentState(Enum):
    """Possible states for an agent."""
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    WAITING = "waiting"
    ERROR = "error"
    COMPLETED = "completed"

@dataclass
class AgentMemory:
    """Memory system for agents."""
    short_term: List[Dict[str, Any]] = field(default_factory=list)
    long_term: Dict[str, Any] = field(default_factory=dict)
    working_memory: Dict[str, Any] = field(default_factory=dict)
    max_short_term_size: int = 10
    
    def add_to_short_term(self, memory: Dict[str, Any]):
        """Add memory to short-term storage."""
        self.short_term.append(memory)
        if len(self.short_term) > self.max_short_term_size:
            # Move oldest to long-term if important
            old_memory = self.short_term.pop(0)
            if old_memory.get('important', False):
                key = f"memory_{len(self.long_term)}"
                self.long_term[key] = old_memory
    
    def update_working_memory(self, key: str, value: Any):
        """Update working memory."""
        self.working_memory[key] = value
    
    def clear_working_memory(self):
        """Clear working memory."""
        self.working_memory.clear()

@dataclass
class Tool:
    """Represents a tool that an agent can use."""
    name: str
    description: str
    function: Callable
    parameters: Dict[str, str] = field(default_factory=dict)
    required_permissions: List[str] = field(default_factory=list)

@dataclass
class Action:
    """Represents an action an agent can take."""
    type: str
    tool_name: Optional[str] = None
    parameters: Dict[str, Any] = field(default_factory=dict)
    priority: int = 1
    timestamp: float = field(default_factory=time.time)

@dataclass
class Plan:
    """Represents a plan with multiple actions."""
    goal: str
    actions: List[Action] = field(default_factory=list)
    estimated_time: float = 0.0
    success_criteria: List[str] = field(default_factory=list)
    created_at: float = field(default_factory=time.time)

## Tool System

In [None]:
class ToolRegistry:
    """Registry for managing agent tools."""
    
    def __init__(self):
        self.tools: Dict[str, Tool] = {}
    
    def register_tool(self, tool: Tool):
        """Register a new tool."""
        self.tools[tool.name] = tool
    
    def get_tool(self, name: str) -> Optional[Tool]:
        """Get a tool by name."""
        return self.tools.get(name)
    
    def list_tools(self) -> List[str]:
        """List all available tools."""
        return list(self.tools.keys())
    
    def get_tools_description(self) -> str:
        """Get formatted description of all tools."""
        descriptions = []
        for tool in self.tools.values():
            param_str = ", ".join([f"{k}: {v}" for k, v in tool.parameters.items()])
            descriptions.append(f"{tool.name}({param_str}): {tool.description}")
        return "\n".join(descriptions)

# Sample tools
def calculator_tool(expression: str) -> float:
    """Safely evaluate mathematical expressions."""
    try:
        # Simple evaluation (in practice, use a safe evaluator)
        allowed_chars = set('0123456789+-*/.() ')
        if all(c in allowed_chars for c in expression):
            return eval(expression)
        else:
            raise ValueError("Invalid characters in expression")
    except Exception as e:
        return f"Error: {str(e)}"

def web_search_tool(query: str) -> str:
    """Simulate web search (in practice, integrate with real search API)."""
    # Simulate search results
    mock_results = {
        "weather": "Current weather is sunny, 72°F",
        "news": "Latest news: Technology stocks are up 2%",
        "python": "Python is a high-level programming language",
        "AI": "Artificial Intelligence is transforming industries"
    }
    
    for key in mock_results:
        if key.lower() in query.lower():
            return mock_results[key]
    
    return f"Search results for '{query}': No specific results found"

def file_manager_tool(action: str, filename: str = "", content: str = "") -> str:
    """Simulate file operations."""
    if action == "read":
        return f"Reading file: {filename} (simulated content)"
    elif action == "write":
        return f"Writing to file: {filename} with content length: {len(content)}"
    elif action == "list":
        return "Files: document.txt, data.csv, script.py"
    else:
        return f"Unknown file action: {action}"

# Create tool registry and register tools
tool_registry = ToolRegistry()

tool_registry.register_tool(Tool(
    name="calculator",
    description="Evaluate mathematical expressions",
    function=calculator_tool,
    parameters={"expression": "string"}
))

tool_registry.register_tool(Tool(
    name="web_search",
    description="Search the web for information",
    function=web_search_tool,
    parameters={"query": "string"}
))

tool_registry.register_tool(Tool(
    name="file_manager",
    description="Manage files (read, write, list)",
    function=file_manager_tool,
    parameters={"action": "string", "filename": "string", "content": "string"}
))

print("Tool registry created with tools:")
print(tool_registry.get_tools_description())

## Planning Module

In [None]:
class PlanningSignature(dspy.Signature):
    """Generate a step-by-step plan to achieve a goal."""
    
    goal = dspy.InputField(desc="The goal to achieve")
    available_tools = dspy.InputField(desc="List of available tools and their descriptions")
    context = dspy.InputField(desc="Current context and constraints")
    
    plan_steps = dspy.OutputField(desc="Numbered list of steps to achieve the goal")
    tool_usage = dspy.OutputField(desc="Which tools to use for each step")
    success_criteria = dspy.OutputField(desc="How to determine if the goal is achieved")

class ActionSelection(dspy.Signature):
    """Select the next action to take based on the current plan and state."""
    
    current_plan = dspy.InputField(desc="The current plan being executed")
    completed_actions = dspy.InputField(desc="Actions already completed")
    current_state = dspy.InputField(desc="Current agent state and context")
    available_tools = dspy.InputField(desc="Available tools")
    
    next_action = dspy.OutputField(desc="The next action to take")
    tool_to_use = dspy.OutputField(desc="Tool to use for this action, or 'none'")
    parameters = dspy.OutputField(desc="Parameters for the tool as JSON")
    reasoning = dspy.OutputField(desc="Reasoning for this action choice")

class AgentPlanner(dspy.Module):
    """Planning module for agents."""
    
    def __init__(self, tool_registry: ToolRegistry):
        super().__init__()
        self.tool_registry = tool_registry
        self.planner = dspy.ChainOfThought(PlanningSignature)
        self.action_selector = dspy.ChainOfThought(ActionSelection)
    
    def create_plan(self, goal: str, context: str = "") -> Plan:
        """Create a plan to achieve the given goal."""
        available_tools = self.tool_registry.get_tools_description()
        
        result = self.planner(
            goal=goal,
            available_tools=available_tools,
            context=context or "No specific context provided"
        )
        
        # Parse the plan steps
        steps = [step.strip() for step in result.plan_steps.split('\n') if step.strip()]
        actions = []
        
        for i, step in enumerate(steps):
            # Extract action type from step
            action_type = "task"
            if "calculate" in step.lower():
                action_type = "calculation"
            elif "search" in step.lower():
                action_type = "search"
            elif "file" in step.lower():
                action_type = "file_operation"
            
            actions.append(Action(
                type=action_type,
                parameters={"description": step},
                priority=i + 1
            ))
        
        # Parse success criteria
        criteria = [c.strip() for c in result.success_criteria.split('\n') if c.strip()]
        
        return Plan(
            goal=goal,
            actions=actions,
            success_criteria=criteria
        )
    
    def select_next_action(self, plan: Plan, completed_actions: List[Action], 
                          current_state: str) -> Optional[Dict[str, Any]]:
        """Select the next action to execute."""
        if not plan.actions or len(completed_actions) >= len(plan.actions):
            return None
        
        # Format current plan
        plan_text = f"Goal: {plan.goal}\nSteps:\n"
        for i, action in enumerate(plan.actions, 1):
            status = "✓ DONE" if i <= len(completed_actions) else "○ PENDING"
            plan_text += f"{i}. {action.parameters.get('description', action.type)} {status}\n"
        
        # Format completed actions
        completed_text = "\n".join([
            f"- {action.parameters.get('description', action.type)}"
            for action in completed_actions
        ]) or "None"
        
        available_tools = self.tool_registry.get_tools_description()
        
        result = self.action_selector(
            current_plan=plan_text,
            completed_actions=completed_text,
            current_state=current_state,
            available_tools=available_tools
        )
        
        # Parse tool parameters
        try:
            parameters = json.loads(result.parameters) if result.parameters.strip() else {}
        except json.JSONDecodeError:
            parameters = {"raw_params": result.parameters}
        
        return {
            "action": result.next_action,
            "tool": result.tool_to_use if result.tool_to_use.lower() != "none" else None,
            "parameters": parameters,
            "reasoning": result.reasoning
        }

## Core Agent Class

In [None]:
class BaseAgent:
    """Base agent class with core functionality."""
    
    def __init__(self, name: str, tool_registry: ToolRegistry):
        self.name = name
        self.tool_registry = tool_registry
        self.state = AgentState.IDLE
        self.memory = AgentMemory()
        self.planner = AgentPlanner(tool_registry)
        self.current_plan: Optional[Plan] = None
        self.completed_actions: List[Action] = []
        self.execution_log: List[Dict[str, Any]] = []
    
    def set_goal(self, goal: str, context: str = ""):
        """Set a new goal for the agent."""
        self.state = AgentState.PLANNING
        self.current_plan = self.planner.create_plan(goal, context)
        self.completed_actions = []
        
        self.memory.add_to_short_term({
            "type": "goal_set",
            "goal": goal,
            "context": context,
            "timestamp": time.time(),
            "important": True
        })
        
        self.log(f"Goal set: {goal}")
        self.log(f"Plan created with {len(self.current_plan.actions)} actions")
    
    def execute_next_action(self) -> bool:
        """Execute the next action in the plan. Returns True if action was executed."""
        if not self.current_plan:
            self.log("No current plan to execute")
            return False
        
        self.state = AgentState.EXECUTING
        
        # Get current state description
        current_state = f"Agent {self.name} executing plan for: {self.current_plan.goal}"
        if self.memory.working_memory:
            current_state += f"\nWorking memory: {self.memory.working_memory}"
        
        # Select next action
        action_info = self.planner.select_next_action(
            self.current_plan, self.completed_actions, current_state
        )
        
        if not action_info:
            self.state = AgentState.COMPLETED
            self.log("All actions completed")
            return False
        
        # Execute the action
        result = self._execute_action(action_info)
        
        # Record the action
        action = Action(
            type=action_info["action"],
            tool_name=action_info.get("tool"),
            parameters=action_info["parameters"]
        )
        self.completed_actions.append(action)
        
        # Update memory
        self.memory.add_to_short_term({
            "type": "action_executed",
            "action": action_info["action"],
            "tool": action_info.get("tool"),
            "result": result,
            "timestamp": time.time()
        })
        
        self.memory.update_working_memory("last_action_result", result)
        
        self.log(f"Executed: {action_info['action']} -> {result}")
        return True
    
    def _execute_action(self, action_info: Dict[str, Any]) -> str:
        """Execute a specific action."""
        tool_name = action_info.get("tool")
        
        if tool_name:
            tool = self.tool_registry.get_tool(tool_name)
            if tool:
                try:
                    # Execute tool with parameters
                    params = action_info["parameters"]
                    result = tool.function(**params)
                    return str(result)
                except Exception as e:
                    self.state = AgentState.ERROR
                    return f"Error executing tool {tool_name}: {str(e)}"
            else:
                return f"Tool {tool_name} not found"
        else:
            # Execute as general action
            return f"Executed action: {action_info['action']}"
    
    def run_until_completion(self, max_actions: int = 20) -> bool:
        """Run the agent until the goal is completed or max actions reached."""
        action_count = 0
        
        while (self.state not in [AgentState.COMPLETED, AgentState.ERROR] and 
               action_count < max_actions):
            
            if not self.execute_next_action():
                break
            
            action_count += 1
            time.sleep(0.1)  # Brief pause between actions
        
        success = self.state == AgentState.COMPLETED
        self.log(f"Execution {'completed' if success else 'stopped'} after {action_count} actions")
        return success
    
    def log(self, message: str):
        """Log a message with timestamp."""
        log_entry = {
            "timestamp": time.time(),
            "agent": self.name,
            "message": message
        }
        self.execution_log.append(log_entry)
        print(f"[{self.name}] {message}")
    
    def get_status(self) -> Dict[str, Any]:
        """Get current agent status."""
        return {
            "name": self.name,
            "state": self.state.value,
            "current_goal": self.current_plan.goal if self.current_plan else None,
            "actions_completed": len(self.completed_actions),
            "total_actions": len(self.current_plan.actions) if self.current_plan else 0,
            "working_memory": self.memory.working_memory,
            "recent_memories": self.memory.short_term[-3:] if self.memory.short_term else []
        }

## Specialized Agent Types

In [None]:
class ResearchAgent(BaseAgent):
    """Specialized agent for research tasks."""
    
    def __init__(self, name: str, tool_registry: ToolRegistry):
        super().__init__(name, tool_registry)
        self.research_findings: List[Dict[str, Any]] = []
    
    def _execute_action(self, action_info: Dict[str, Any]) -> str:
        """Enhanced action execution with research-specific handling."""
        result = super()._execute_action(action_info)
        
        # If this was a search action, store findings
        if action_info.get("tool") == "web_search":
            self.research_findings.append({
                "query": action_info["parameters"].get("query", ""),
                "result": result,
                "timestamp": time.time()
            })
        
        return result
    
    def synthesize_research(self) -> str:
        """Synthesize research findings into a summary."""
        if not self.research_findings:
            return "No research findings to synthesize"
        
        summary = "Research Summary:\n"
        for i, finding in enumerate(self.research_findings, 1):
            summary += f"{i}. Query: {finding['query']}\n"
            summary += f"   Finding: {finding['result']}\n\n"
        
        return summary

class DataAnalysisAgent(BaseAgent):
    """Specialized agent for data analysis tasks."""
    
    def __init__(self, name: str, tool_registry: ToolRegistry):
        super().__init__(name, tool_registry)
        self.calculations: List[Dict[str, Any]] = []
    
    def _execute_action(self, action_info: Dict[str, Any]) -> str:
        """Enhanced action execution with calculation tracking."""
        result = super()._execute_action(action_info)
        
        # If this was a calculation, store it
        if action_info.get("tool") == "calculator":
            self.calculations.append({
                "expression": action_info["parameters"].get("expression", ""),
                "result": result,
                "timestamp": time.time()
            })
        
        return result
    
    def get_calculation_summary(self) -> str:
        """Get summary of all calculations performed."""
        if not self.calculations:
            return "No calculations performed"
        
        summary = "Calculation Summary:\n"
        for calc in self.calculations:
            summary += f"  {calc['expression']} = {calc['result']}\n"
        
        return summary

## Multi-Agent Coordination

In [None]:
class AgentCoordinator:
    """Coordinates multiple agents working together."""
    
    def __init__(self):
        self.agents: Dict[str, BaseAgent] = {}
        self.message_queue: List[Dict[str, Any]] = []
        self.coordination_log: List[Dict[str, Any]] = []
    
    def add_agent(self, agent: BaseAgent):
        """Add an agent to the coordination system."""
        self.agents[agent.name] = agent
        self.log(f"Agent {agent.name} added to coordination system")
    
    def send_message(self, from_agent: str, to_agent: str, message: str, message_type: str = "info"):
        """Send a message between agents."""
        self.message_queue.append({
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "type": message_type,
            "timestamp": time.time()
        })
    
    def coordinate_task(self, task_description: str, agent_assignments: Dict[str, str]) -> Dict[str, Any]:
        """Coordinate a task across multiple agents."""
        self.log(f"Starting coordinated task: {task_description}")
        
        results = {}
        
        # Assign tasks to agents
        for agent_name, subtask in agent_assignments.items():
            if agent_name in self.agents:
                agent = self.agents[agent_name]
                agent.set_goal(subtask)
                self.log(f"Assigned to {agent_name}: {subtask}")
        
        # Execute tasks in parallel (simplified sequential execution)
        for agent_name in agent_assignments.keys():
            if agent_name in self.agents:
                agent = self.agents[agent_name]
                success = agent.run_until_completion()
                results[agent_name] = {
                    "success": success,
                    "state": agent.state.value,
                    "actions_completed": len(agent.completed_actions)
                }
        
        self.log(f"Coordinated task completed. Results: {results}")
        return results
    
    def get_agent_statuses(self) -> Dict[str, Dict[str, Any]]:
        """Get status of all agents."""
        return {name: agent.get_status() for name, agent in self.agents.items()}
    
    def log(self, message: str):
        """Log coordination events."""
        log_entry = {
            "timestamp": time.time(),
            "message": message
        }
        self.coordination_log.append(log_entry)
        print(f"[COORDINATOR] {message}")

## Example Usage: Single Agent

In [None]:
# Create a research agent
research_agent = ResearchAgent("ResearchBot", tool_registry)

# Set a research goal
research_goal = "Find information about artificial intelligence and calculate the market size"
research_agent.set_goal(research_goal)

print("\n" + "="*60)
print("SINGLE AGENT EXECUTION EXAMPLE")
print("="*60)

# Execute the plan
success = research_agent.run_until_completion()

# Display results
print("\n" + "-"*40)
print("EXECUTION RESULTS")
print("-"*40)
print(f"Execution successful: {success}")
print(f"Final state: {research_agent.state.value}")
print(f"Actions completed: {len(research_agent.completed_actions)}")

# Show research findings
if research_agent.research_findings:
    print("\n" + "-"*40)
    print("RESEARCH FINDINGS")
    print("-"*40)
    print(research_agent.synthesize_research())

# Show agent status
print("\n" + "-"*40)
print("AGENT STATUS")
print("-"*40)
status = research_agent.get_status()
for key, value in status.items():
    print(f"{key}: {value}")

## Example Usage: Multi-Agent Coordination

In [None]:
# Create multiple specialized agents
research_agent2 = ResearchAgent("DataResearcher", tool_registry)
analysis_agent = DataAnalysisAgent("DataAnalyst", tool_registry)
general_agent = BaseAgent("FileManager", tool_registry)

# Create coordinator
coordinator = AgentCoordinator()
coordinator.add_agent(research_agent2)
coordinator.add_agent(analysis_agent)
coordinator.add_agent(general_agent)

print("\n" + "="*60)
print("MULTI-AGENT COORDINATION EXAMPLE")
print("="*60)

# Define a complex task requiring multiple agents
task_description = "Research Python programming, calculate learning time estimates, and save results"

agent_assignments = {
    "DataResearcher": "Search for information about Python programming and its applications",
    "DataAnalyst": "Calculate estimated learning time for Python: beginner (40 hours) + intermediate (80 hours)", 
    "FileManager": "Prepare to save the research and analysis results to a file"
}

# Execute coordinated task
coordination_results = coordinator.coordinate_task(task_description, agent_assignments)

print("\n" + "-"*50)
print("COORDINATION RESULTS")
print("-"*50)
for agent_name, result in coordination_results.items():
    print(f"{agent_name}: Success={result['success']}, State={result['state']}, Actions={result['actions_completed']}")

# Show individual agent results
print("\n" + "-"*50)
print("INDIVIDUAL AGENT RESULTS")
print("-"*50)

# Research findings
if research_agent2.research_findings:
    print("Research Findings:")
    print(research_agent2.synthesize_research())

# Analysis results
if analysis_agent.calculations:
    print("Analysis Results:")
    print(analysis_agent.get_calculation_summary())

# Overall status
print("\n" + "-"*50)
print("ALL AGENT STATUSES")
print("-"*50)
all_statuses = coordinator.get_agent_statuses()
for agent_name, status in all_statuses.items():
    print(f"\n{agent_name}:")
    print(f"  State: {status['state']}")
    print(f"  Goal: {status['current_goal']}")
    print(f"  Progress: {status['actions_completed']}/{status['total_actions']}")

## Advanced Error Handling and Recovery

In [None]:
class RobustAgent(BaseAgent):
    """Agent with advanced error handling and recovery mechanisms."""
    
    def __init__(self, name: str, tool_registry: ToolRegistry, max_retries: int = 3):
        super().__init__(name, tool_registry)
        self.max_retries = max_retries
        self.error_count = 0
        self.recovery_strategies: List[str] = []
    
    def _execute_action(self, action_info: Dict[str, Any]) -> str:
        """Execute action with retry mechanism."""
        retry_count = 0
        
        while retry_count < self.max_retries:
            try:
                result = super()._execute_action(action_info)
                
                # Check if result indicates an error
                if "Error" in result or "error" in result.lower():
                    raise Exception(result)
                
                # Reset error count on success
                self.error_count = 0
                return result
                
            except Exception as e:
                retry_count += 1
                self.error_count += 1
                self.log(f"Action failed (attempt {retry_count}): {str(e)}")
                
                if retry_count < self.max_retries:
                    # Apply recovery strategy
                    recovery_strategy = self._get_recovery_strategy(action_info, str(e))
                    self.log(f"Applying recovery strategy: {recovery_strategy}")
                    
                    # Modify action based on recovery strategy
                    action_info = self._apply_recovery_strategy(action_info, recovery_strategy)
                    
                    time.sleep(0.5)  # Brief pause before retry
                else:
                    self.state = AgentState.ERROR
                    return f"Action failed after {self.max_retries} attempts: {str(e)}"
        
        return "Action execution failed"
    
    def _get_recovery_strategy(self, action_info: Dict[str, Any], error_msg: str) -> str:
        """Determine recovery strategy based on error type."""
        if "not found" in error_msg.lower():
            return "simplify_parameters"
        elif "invalid" in error_msg.lower():
            return "use_default_parameters"
        elif "timeout" in error_msg.lower():
            return "retry_with_delay"
        else:
            return "alternative_approach"
    
    def _apply_recovery_strategy(self, action_info: Dict[str, Any], strategy: str) -> Dict[str, Any]:
        """Apply recovery strategy to modify action."""
        modified_action = action_info.copy()
        
        if strategy == "simplify_parameters":
            # Simplify parameters
            if "parameters" in modified_action:
                params = modified_action["parameters"]
                if "expression" in params:
                    # Simplify mathematical expression
                    params["expression"] = "2 + 2"
                elif "query" in params:
                    # Simplify search query
                    params["query"] = "information"
        
        elif strategy == "use_default_parameters":
            # Use safe default parameters
            tool_name = modified_action.get("tool")
            if tool_name == "calculator":
                modified_action["parameters"] = {"expression": "1 + 1"}
            elif tool_name == "web_search":
                modified_action["parameters"] = {"query": "general information"}
            elif tool_name == "file_manager":
                modified_action["parameters"] = {"action": "list"}
        
        elif strategy == "alternative_approach":
            # Try alternative tool or approach
            if modified_action.get("tool") == "web_search":
                modified_action["tool"] = None
                modified_action["action"] = "general information gathering"
        
        self.recovery_strategies.append(strategy)
        return modified_action
    
    def get_error_report(self) -> Dict[str, Any]:
        """Get detailed error report."""
        return {
            "total_errors": self.error_count,
            "recovery_strategies_used": self.recovery_strategies,
            "current_state": self.state.value,
            "max_retries": self.max_retries
        }

# Test robust agent with error scenarios
robust_agent = RobustAgent("RobustBot", tool_registry, max_retries=2)

print("\n" + "="*60)
print("ROBUST AGENT WITH ERROR HANDLING")
print("="*60)

# Test with a goal that might cause errors
robust_goal = "Calculate complex mathematical expressions and search for advanced topics"
robust_agent.set_goal(robust_goal)

# Run with potential errors
success = robust_agent.run_until_completion(max_actions=10)

print("\n" + "-"*40)
print("ROBUST EXECUTION RESULTS")
print("-"*40)
print(f"Execution successful: {success}")
print(f"Final state: {robust_agent.state.value}")

# Show error report
error_report = robust_agent.get_error_report()
print("\nError Report:")
for key, value in error_report.items():
    print(f"  {key}: {value}")

## Performance Monitoring and Analytics

In [None]:
class AgentAnalytics:
    """Analytics system for monitoring agent performance."""
    
    def __init__(self):
        self.execution_metrics: List[Dict[str, Any]] = []
        self.agent_performances: Dict[str, List[Dict[str, Any]]] = {}
    
    def record_execution(self, agent: BaseAgent, execution_time: float, success: bool):
        """Record execution metrics for an agent."""
        metrics = {
            "agent_name": agent.name,
            "execution_time": execution_time,
            "success": success,
            "actions_completed": len(agent.completed_actions),
            "goal": agent.current_plan.goal if agent.current_plan else None,
            "final_state": agent.state.value,
            "memory_usage": len(agent.memory.short_term) + len(agent.memory.long_term),
            "timestamp": time.time()
        }
        
        self.execution_metrics.append(metrics)
        
        if agent.name not in self.agent_performances:
            self.agent_performances[agent.name] = []
        self.agent_performances[agent.name].append(metrics)
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get overall performance summary."""
        if not self.execution_metrics:
            return {"message": "No execution data available"}
        
        total_executions = len(self.execution_metrics)
        successful_executions = sum(1 for m in self.execution_metrics if m["success"])
        avg_execution_time = sum(m["execution_time"] for m in self.execution_metrics) / total_executions
        avg_actions = sum(m["actions_completed"] for m in self.execution_metrics) / total_executions
        
        return {
            "total_executions": total_executions,
            "success_rate": successful_executions / total_executions,
            "average_execution_time": avg_execution_time,
            "average_actions_per_execution": avg_actions,
            "unique_agents": len(self.agent_performances)
        }
    
    def get_agent_performance(self, agent_name: str) -> Dict[str, Any]:
        """Get performance metrics for a specific agent."""
        if agent_name not in self.agent_performances:
            return {"message": f"No data for agent {agent_name}"}
        
        performances = self.agent_performances[agent_name]
        
        return {
            "agent_name": agent_name,
            "total_executions": len(performances),
            "success_rate": sum(1 for p in performances if p["success"]) / len(performances),
            "average_execution_time": sum(p["execution_time"] for p in performances) / len(performances),
            "average_actions": sum(p["actions_completed"] for p in performances) / len(performances),
            "latest_execution": performances[-1] if performances else None
        }

# Test analytics system
analytics = AgentAnalytics()

# Simulate multiple agent executions for analytics
test_agents = [
    ResearchAgent("Researcher1", tool_registry),
    DataAnalysisAgent("Analyst1", tool_registry),
    RobustAgent("Robot1", tool_registry)
]

test_goals = [
    "Search for machine learning information",
    "Calculate basic arithmetic: 10 + 15 * 2", 
    "Perform a simple task with error handling"
]

print("\n" + "="*60)
print("AGENT PERFORMANCE ANALYTICS")
print("="*60)

# Run test executions
for i, (agent, goal) in enumerate(zip(test_agents, test_goals)):
    print(f"\nRunning test {i+1}: {agent.name} - {goal}")
    
    start_time = time.time()
    agent.set_goal(goal)
    success = agent.run_until_completion(max_actions=5)
    execution_time = time.time() - start_time
    
    analytics.record_execution(agent, execution_time, success)
    print(f"  Completed in {execution_time:.2f}s, Success: {success}")

# Display analytics
print("\n" + "-"*50)
print("PERFORMANCE SUMMARY")
print("-"*50)
summary = analytics.get_performance_summary()
for key, value in summary.items():
    if isinstance(value, float):
        print(f"{key}: {value:.3f}")
    else:
        print(f"{key}: {value}")

# Display individual agent performance
print("\n" + "-"*50)
print("INDIVIDUAL AGENT PERFORMANCE")
print("-"*50)
for agent in test_agents:
    performance = analytics.get_agent_performance(agent.name)
    print(f"\n{agent.name}:")
    for key, value in performance.items():
        if key != "latest_execution" and isinstance(value, float):
            print(f"  {key}: {value:.3f}")
        elif key != "latest_execution":
            print(f"  {key}: {value}")

## Summary and Best Practices

This notebook demonstrated advanced agent development with DSPy:

### Key Components:
1. **Agent Architecture**: Modular design with state management, memory, and planning
2. **Tool Integration**: Flexible tool registry system for agent capabilities
3. **Planning System**: Intelligent planning and action selection using DSPy
4. **Memory Management**: Short-term, long-term, and working memory systems
5. **Multi-Agent Coordination**: Coordination system for complex tasks
6. **Error Handling**: Robust error recovery and retry mechanisms
7. **Performance Analytics**: Monitoring and optimization capabilities

### Agent Types Demonstrated:
- **BaseAgent**: Core agent functionality
- **ResearchAgent**: Specialized for research and information gathering
- **DataAnalysisAgent**: Focused on calculations and data analysis
- **RobustAgent**: Enhanced with error handling and recovery

### Best Practices:
1. **Modular Design**: Separate concerns (planning, execution, memory, tools)
2. **State Management**: Clear state transitions and status tracking
3. **Error Resilience**: Implement retry mechanisms and recovery strategies
4. **Memory Efficiency**: Manage memory size and relevance
5. **Tool Abstraction**: Use consistent tool interfaces
6. **Performance Monitoring**: Track metrics and optimize based on data
7. **Coordination Protocols**: Design clear communication patterns for multi-agent systems

### Advanced Patterns:
- **Dynamic Planning**: Adapt plans based on execution results
- **Memory Hierarchies**: Different types of memory for different use cases
- **Tool Composition**: Combine tools for complex operations
- **Agent Specialization**: Create specialized agents for specific domains
- **Fault Tolerance**: Graceful degradation and recovery

### Next Steps:
- Implement async execution for better performance
- Add more sophisticated planning algorithms
- Integrate with external APIs and services
- Implement learning and adaptation mechanisms
- Add security and permission systems