# Multi-Agent System

This notebook demonstrates how to create a multi-agent system where multiple AI agents work together to solve complex problems.
Each agent has specialized roles and capabilities, and they coordinate to achieve common goals.

## Features:
- Multiple specialized agents with different roles
- Agent communication and coordination
- Task delegation and workflow management
- Collaborative problem solving
- Agent supervision and orchestration

## Setup and Installation

In [None]:
# Install required packages
!pip install openai python-dotenv asyncio

## Import Libraries

In [None]:
import openai
import os
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')

## Agent Framework

In [None]:
class AgentRole(Enum):
    COORDINATOR = "coordinator"
    RESEARCHER = "researcher"
    WRITER = "writer"
    CRITIC = "critic"
    EXECUTOR = "executor"

@dataclass
class Message:
    sender: str
    receiver: str
    content: str
    message_type: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Task:
    id: str
    description: str
    assigned_to: Optional[str] = None
    status: str = "pending"
    result: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.now)
    completed_at: Optional[datetime] = None

class BaseAgent:
    def __init__(self, name: str, role: AgentRole, system_prompt: str, model: str = "gpt-3.5-turbo"):
        self.name = name
        self.role = role
        self.system_prompt = system_prompt
        self.model = model
        self.message_history: List[Message] = []
        self.tasks: List[Task] = []
        self.is_active = True
    
    def receive_message(self, message: Message):
        """Receive a message from another agent"""
        self.message_history.append(message)
        return self.process_message(message)
    
    def process_message(self, message: Message) -> Optional[str]:
        """Process received message - to be implemented by specific agents"""
        return None
    
    def send_message(self, receiver: str, content: str, message_type: str = "general") -> Message:
        """Send a message to another agent"""
        message = Message(
            sender=self.name,
            receiver=receiver,
            content=content,
            message_type=message_type
        )
        return message
    
    def generate_response(self, prompt: str, max_tokens: int = 200) -> str:
        """Generate AI response using OpenAI API"""
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=max_tokens,
                temperature=0.7
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error generating response: {str(e)}"
    
    def assign_task(self, task: Task):
        """Assign a task to this agent"""
        task.assigned_to = self.name
        self.tasks.append(task)
    
    def complete_task(self, task_id: str, result: str):
        """Mark a task as completed"""
        for task in self.tasks:
            if task.id == task_id:
                task.status = "completed"
                task.result = result
                task.completed_at = datetime.now()
                break

## Specialized Agent Classes

In [None]:
class CoordinatorAgent(BaseAgent):
    """Coordinates tasks and manages other agents"""
    
    def __init__(self, name: str = "Coordinator"):
        system_prompt = """
        You are a project coordinator agent. Your role is to:
        1. Break down complex tasks into smaller subtasks
        2. Assign tasks to appropriate specialist agents
        3. Monitor progress and coordinate between agents
        4. Synthesize results from different agents
        5. Ensure project goals are met efficiently
        
        Be organized, clear in communications, and strategic in task delegation.
        """
        super().__init__(name, AgentRole.COORDINATOR, system_prompt)
    
    def plan_project(self, project_description: str) -> List[Task]:
        """Break down a project into tasks"""
        prompt = f"""
        Project: {project_description}
        
        Break this project down into 3-5 specific tasks that can be assigned to specialist agents.
        For each task, specify:
        1. Task description
        2. Which type of agent should handle it (researcher, writer, critic, executor)
        
        Format as a JSON list with tasks.
        """
        
        response = self.generate_response(prompt, max_tokens=400)
        
        # Simple task creation (in real implementation, parse JSON response)
        tasks = [
            Task(id="task_1", description="Research phase: " + project_description[:50]),
            Task(id="task_2", description="Writing phase: " + project_description[:50]),
            Task(id="task_3", description="Review phase: " + project_description[:50])
        ]
        
        return tasks

class ResearcherAgent(BaseAgent):
    """Specializes in research and information gathering"""
    
    def __init__(self, name: str = "Researcher"):
        system_prompt = """
        You are a research specialist agent. Your role is to:
        1. Gather relevant information on given topics
        2. Analyze and synthesize research findings
        3. Provide fact-based insights and recommendations
        4. Identify key sources and references
        5. Present research in a structured format
        
        Be thorough, accurate, and cite sources when possible.
        """
        super().__init__(name, AgentRole.RESEARCHER, system_prompt)
    
    def conduct_research(self, topic: str) -> str:
        """Conduct research on a given topic"""
        prompt = f"""
        Research topic: {topic}
        
        Provide a comprehensive research summary including:
        1. Key concepts and definitions
        2. Current trends and developments
        3. Important statistics or data points
        4. Potential challenges or considerations
        5. Recommended next steps or areas for further investigation
        """
        
        return self.generate_response(prompt, max_tokens=500)

class WriterAgent(BaseAgent):
    """Specializes in content creation and writing"""
    
    def __init__(self, name: str = "Writer"):
        system_prompt = """
        You are a content writing specialist agent. Your role is to:
        1. Create clear, engaging, and well-structured content
        2. Adapt writing style to different audiences and purposes
        3. Transform research and data into readable narratives
        4. Ensure content is grammatically correct and well-formatted
        5. Create compelling introductions and conclusions
        
        Be creative, clear, and audience-focused in your writing.
        """
        super().__init__(name, AgentRole.WRITER, system_prompt)
    
    def write_content(self, topic: str, research_input: str = "", content_type: str = "article") -> str:
        """Write content based on topic and research"""
        prompt = f"""
        Content type: {content_type}
        Topic: {topic}
        Research input: {research_input}
        
        Write a well-structured {content_type} that:
        1. Has an engaging introduction
        2. Presents information in a logical flow
        3. Uses clear and accessible language
        4. Includes relevant examples or analogies
        5. Ends with a strong conclusion
        """
        
        return self.generate_response(prompt, max_tokens=600)

class CriticAgent(BaseAgent):
    """Specializes in review, critique, and quality assurance"""
    
    def __init__(self, name: str = "Critic"):
        system_prompt = """
        You are a quality assurance and critique specialist agent. Your role is to:
        1. Review content for accuracy, clarity, and completeness
        2. Identify areas for improvement or potential issues
        3. Provide constructive feedback and suggestions
        4. Ensure content meets quality standards
        5. Recommend revisions or alternative approaches
        
        Be thorough, constructive, and objective in your critiques.
        """
        super().__init__(name, AgentRole.CRITIC, system_prompt)
    
    def review_content(self, content: str, criteria: str = "general quality") -> str:
        """Review and critique content"""
        prompt = f"""
        Content to review:
        {content}
        
        Review criteria: {criteria}
        
        Provide a detailed review covering:
        1. Overall quality assessment
        2. Strengths of the content
        3. Areas needing improvement
        4. Specific suggestions for enhancement
        5. Final recommendation (approve, revise, or rewrite)
        """
        
        return self.generate_response(prompt, max_tokens=400)

## Multi-Agent System Orchestrator

In [None]:
class MultiAgentSystem:
    """Orchestrates multiple agents working together"""
    
    def __init__(self):
        self.agents: Dict[str, BaseAgent] = {}
        self.message_queue: List[Message] = []
        self.active_tasks: List[Task] = []
        self.completed_tasks: List[Task] = []
        self.workflow_log: List[str] = []
    
    def add_agent(self, agent: BaseAgent):
        """Add an agent to the system"""
        self.agents[agent.name] = agent
        self.log(f"Added agent: {agent.name} ({agent.role.value})")
    
    def log(self, message: str):
        """Log system events"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        log_entry = f"[{timestamp}] {message}"
        self.workflow_log.append(log_entry)
        print(log_entry)
    
    def send_message(self, sender_name: str, receiver_name: str, content: str, message_type: str = "general") -> bool:
        """Send message between agents"""
        if sender_name not in self.agents or receiver_name not in self.agents:
            self.log(f"Error: Invalid agent names - {sender_name} to {receiver_name}")
            return False
        
        message = Message(
            sender=sender_name,
            receiver=receiver_name,
            content=content,
            message_type=message_type
        )
        
        self.message_queue.append(message)
        response = self.agents[receiver_name].receive_message(message)
        
        self.log(f"Message: {sender_name} → {receiver_name}: {content[:50]}...")
        return True
    
    def execute_workflow(self, project_description: str) -> Dict[str, Any]:
        """Execute a complete workflow using multiple agents"""
        self.log(f"Starting workflow: {project_description}")
        
        # Step 1: Coordinator plans the project
        if "Coordinator" not in self.agents:
            self.log("Error: No coordinator agent available")
            return {"success": False, "error": "No coordinator"}
        
        coordinator = self.agents["Coordinator"]
        tasks = coordinator.plan_project(project_description)
        self.active_tasks.extend(tasks)
        
        results = {}
        
        # Step 2: Execute research phase
        if "Researcher" in self.agents:
            self.log("Starting research phase")
            research_result = self.agents["Researcher"].conduct_research(project_description)
            results["research"] = research_result
            self.send_message("Researcher", "Writer", f"Research completed: {research_result[:100]}...", "research_results")
        
        # Step 3: Execute writing phase
        if "Writer" in self.agents:
            self.log("Starting writing phase")
            research_input = results.get("research", "")
            written_content = self.agents["Writer"].write_content(project_description, research_input)
            results["content"] = written_content
            if "Critic" in self.agents:
                self.send_message("Writer", "Critic", f"Content ready for review: {written_content[:100]}...", "review_request")
        
        # Step 4: Execute review phase
        if "Critic" in self.agents and "content" in results:
            self.log("Starting review phase")
            review_result = self.agents["Critic"].review_content(results["content"])
            results["review"] = review_result
            self.send_message("Critic", "Coordinator", f"Review completed: {review_result[:100]}...", "review_results")
        
        # Step 5: Coordinator synthesizes final result
        final_prompt = f"""
        Project: {project_description}
        
        Results from team:
        - Research: {results.get('research', 'Not available')[:200]}...
        - Content: {results.get('content', 'Not available')[:200]}...
        - Review: {results.get('review', 'Not available')[:200]}...
        
        Provide a final project summary and recommendations.
        """
        
        final_summary = coordinator.generate_response(final_prompt, max_tokens=300)
        results["final_summary"] = final_summary
        
        self.log("Workflow completed successfully")
        
        return {
            "success": True,
            "project": project_description,
            "results": results,
            "workflow_log": self.workflow_log.copy()
        }
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get current system status"""
        return {
            "agents": {name: agent.role.value for name, agent in self.agents.items()},
            "active_tasks": len(self.active_tasks),
            "completed_tasks": len(self.completed_tasks),
            "messages_processed": len(self.message_queue),
            "workflow_steps": len(self.workflow_log)
        }

## Example Usage

In [None]:
# Create the multi-agent system
mas = MultiAgentSystem()

# Add specialized agents
mas.add_agent(CoordinatorAgent("Coordinator"))
mas.add_agent(ResearcherAgent("Researcher"))
mas.add_agent(WriterAgent("Writer"))
mas.add_agent(CriticAgent("Critic"))

print("\n=== Multi-Agent System Initialized ===")
status = mas.get_system_status()
for key, value in status.items():
    print(f"{key}: {value}")

## Example 1: Content Creation Project

In [None]:
print("\n=== Example 1: Content Creation Project ===")

project_description = "Create a comprehensive guide about the benefits and challenges of remote work for businesses"

result = mas.execute_workflow(project_description)

if result["success"]:
    print(f"\nProject: {result['project']}")
    print("\n--- Research Results ---")
    print(result['results'].get('research', 'Not available')[:300] + "...")
    
    print("\n--- Written Content ---")
    print(result['results'].get('content', 'Not available')[:300] + "...")
    
    print("\n--- Review Feedback ---")
    print(result['results'].get('review', 'Not available')[:300] + "...")
    
    print("\n--- Final Summary ---")
    print(result['results'].get('final_summary', 'Not available'))
else:
    print(f"Project failed: {result.get('error')}")

## Example 2: Product Analysis Project

In [None]:
print("\n=== Example 2: Product Analysis Project ===")

# Clear previous workflow log for cleaner output
mas.workflow_log = []

product_analysis = "Analyze the potential market impact of AI-powered personal assistants in the healthcare industry"

result2 = mas.execute_workflow(product_analysis)

if result2["success"]:
    print(f"\nProject: {result2['project']}")
    print("\n--- Key Findings ---")
    for phase, content in result2['results'].items():
        if phase != 'final_summary':
            print(f"\n{phase.title()}: {content[:200]}...")
    
    print("\n--- Executive Summary ---")
    print(result2['results'].get('final_summary', 'Not available'))
else:
    print(f"Analysis failed: {result2.get('error')}")

## Agent Communication Example

In [None]:
print("\n=== Example 3: Direct Agent Communication ===")

# Direct communication between agents
mas.send_message(
    "Coordinator", 
    "Researcher", 
    "Please research the latest trends in artificial intelligence for education",
    "research_request"
)

mas.send_message(
    "Researcher", 
    "Writer", 
    "AI in education shows promising results in personalized learning and automated assessment",
    "research_findings"
)

mas.send_message(
    "Writer", 
    "Critic", 
    "I've drafted an article about AI in education based on the research findings",
    "draft_ready"
)

mas.send_message(
    "Critic", 
    "Coordinator", 
    "The article is well-written but needs more specific examples and data points",
    "review_feedback"
)

print("\nMessage Exchange Completed")
print(f"Total messages processed: {len(mas.message_queue)}")

## System Monitoring and Analytics

In [None]:
def analyze_agent_interactions(mas: MultiAgentSystem):
    """Analyze interactions between agents"""
    print("\n=== Agent Interaction Analysis ===")
    
    # Message flow analysis
    message_flow = {}
    for message in mas.message_queue:
        flow_key = f"{message.sender} → {message.receiver}"
        message_flow[flow_key] = message_flow.get(flow_key, 0) + 1
    
    print("\nMessage Flow:")
    for flow, count in message_flow.items():
        print(f"  {flow}: {count} messages")
    
    # Agent activity
    agent_activity = {}
    for message in mas.message_queue:
        agent_activity[message.sender] = agent_activity.get(message.sender, 0) + 1
    
    print("\nAgent Activity (messages sent):")
    for agent, count in sorted(agent_activity.items(), key=lambda x: x[1], reverse=True):
        print(f"  {agent}: {count} messages")
    
    # Recent workflow steps
    print("\nRecent Workflow Steps:")
    for step in mas.workflow_log[-10:]:
        print(f"  {step}")

# Run analysis
analyze_agent_interactions(mas)

## Custom Agent Example

In [None]:
class DataAnalystAgent(BaseAgent):
    """Custom agent specializing in data analysis"""
    
    def __init__(self, name: str = "DataAnalyst"):
        system_prompt = """
        You are a data analysis specialist agent. Your role is to:
        1. Analyze numerical data and identify patterns
        2. Create data-driven insights and recommendations
        3. Suggest appropriate statistical methods and visualizations
        4. Interpret data in business context
        5. Identify data quality issues and limitations
        
        Be analytical, precise, and data-driven in your responses.
        """
        super().__init__(name, AgentRole.EXECUTOR, system_prompt)
    
    def analyze_data(self, data_description: str) -> str:
        """Analyze data and provide insights"""
        prompt = f"""
        Data to analyze: {data_description}
        
        Provide a comprehensive data analysis including:
        1. Key metrics and statistical measures
        2. Patterns and trends identified
        3. Notable insights or anomalies
        4. Business recommendations based on the data
        5. Suggested next steps for further analysis
        """
        
        return self.generate_response(prompt, max_tokens=400)

# Add custom agent to the system
mas.add_agent(DataAnalystAgent("DataAnalyst"))

# Example usage with custom agent
print("\n=== Custom Agent Example ===")
data_analysis_project = "Analyze customer retention data showing 15% decrease in monthly active users over the past 6 months"

# Direct task assignment to custom agent
if "DataAnalyst" in mas.agents:
    analysis_result = mas.agents["DataAnalyst"].analyze_data(data_analysis_project)
    print(f"Data Analysis Result:\n{analysis_result}")
    
    # Send results to coordinator
    mas.send_message(
        "DataAnalyst", 
        "Coordinator", 
        f"Data analysis completed: {analysis_result[:100]}...",
        "analysis_results"
    )

## Advanced Features and Extensions

This multi-agent system can be extended with:

### 1. Asynchronous Processing
- Parallel task execution
- Non-blocking agent communication
- Task queuing and prioritization

### 2. Learning and Adaptation
- Agent performance tracking
- Dynamic task assignment based on agent strengths
- Feedback loops for continuous improvement

### 3. Fault Tolerance
- Agent failure detection and recovery
- Task reassignment mechanisms
- Redundancy and backup agents

### 4. Advanced Coordination
- Consensus mechanisms
- Voting systems for decision making
- Hierarchical agent organizations

### 5. External Integrations
- Database connections
- API integrations
- File system operations
- Web scraping capabilities

## Summary

This multi-agent system demonstrates:

1. **Agent Specialization**: Different agents with specific roles and capabilities
2. **Coordination**: Central orchestration of complex workflows
3. **Communication**: Message passing between agents
4. **Collaboration**: Agents working together to achieve common goals
5. **Extensibility**: Easy addition of new agent types and capabilities

### Key Benefits:
- **Modularity**: Each agent focuses on specific tasks
- **Scalability**: Can add more agents as needed
- **Flexibility**: Adaptable to different problem domains
- **Robustness**: Distributed processing with fault tolerance
- **Efficiency**: Parallel processing and specialized optimization

### Use Cases:
- Content creation workflows
- Research and analysis projects
- Customer service automation
- Business process automation
- Software development assistance
- Educational tutoring systems