# 01 - Multi Agent Orchestration


## Requirements
These are the required _Libraries_ and _Environment Variables_ for this notebook


### Libraries Required


In [None]:
# for setting up Jupyter widgets and notebook features
%conda install conda-forge::ipywidgets --update-deps --force-reinstall
%conda install conda-forge::ipykernel --update-deps --force-reinstall


- [Anthropic](https://docs.claude.com/en/docs/get-started#python)

In [None]:
%conda install conda-forge::anthropic


### Variables Required

| Token Name        | `.env` Name         | Where to Get / Setting Value                                     |                                                     Reference |
| :---------------- | :------------------ | :--------------------------------------------------------------- | ------------------------------------------------------------: |
| Anthropic API Key | `ANTHROPIC_API_KEY` | [Anthropic Console](https://console.anthropic.com/settings/keys) | [Anthropic API Docs](https://docs.claude.com/en/api/overview) |


In [None]:
from pathlib import Path
import sys

ROOT = Path().resolve().parent.parent
sys.path.append(str(ROOT))


In [None]:
from utils import (
    set_env_variables,
)

ENV_FILE = ROOT / ".env"

set_env_variables(env_file=ENV_FILE)


## Actual Shenanigans


In [None]:
import json
from dataclasses import dataclass, field
from typing import List, Dict
from enum import Enum
import time

from anthropic import Anthropic


### Defining Base Classes


In [None]:
class Severity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1


@dataclass
class Issue:
    """Represents a code issue found by an agent"""

    issue_type: str
    severity: Severity
    line_number: int
    description: str
    suggested_fix: str
    agent_name: str


@dataclass
class CodeReviewResult:
    """Aggregated results from all agents"""

    issues: List[Issue] = field(default_factory=list)
    test_cases: List[str] = field(default_factory=list)
    summary: str = ""
    total_lines_reviewed: int = 0


class Agent:
    """Base class for all review agents"""

    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.tasks_completed = 0
        self.issues_found = []

    def analyze(self, code: str) -> List[Issue]:
        """Override this method in subclasses"""
        raise NotImplementedError

    def log_activity(self, message: str):
        """Log agent activity"""
        print(f"[{self.name}] {message}")

    def get_stats(self) -> Dict:
        """Return agent statistics"""
        return {
            "name": self.name,
            "role": self.role,
            "tasks_completed": self.tasks_completed,
            "issues_found": len(self.issues_found),
        }


---

### Initializing Anthropic Client

In [None]:
client = Anthropic()


---

### LLM Helper Function

In [None]:
def query_llm(prompt: str, model: str = "claude-sonnet-4-5-20250929") -> str:
    """Helper function to query Anthropic Claude API"""
    try:
        message = client.messages.create(
            model=model,
            max_tokens=1500,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
            system="You are an expert code reviewer. Provide concise, structured responses in JSON format where possible, focusing on code quality, security, and best practices.",
        )
        return message.content[0].text.strip()  # type: ignore
    except Exception as e:
        print(f"LLM Query Error: {e}")
        return json.dumps({"issues": [], "error": str(e)})


---

### Agent Definitions

**Agents**:

- Security Reviewer
- Bug Detector
- Performance Analyzer
- Style Checker
- Test Generator

**Delegation Strategy**:
- **Task-based**: Each agent has specialized responsibility (Security ‚Üí Bug ‚Üí Performance ‚Üí Style)
- **Priority-based**: Critical security issues halt workflow
- **Failure handling**: Graceful degradation on LLM errors
- **Aggregation**: Orchestrator collects, prioritizes, and generates tests

<div class="alert alert-info">

‚ÑπÔ∏è **Note**

You can find the agent architecture in the separate [readme file](./readme.md)

</div>


#### Security Reviewer Agent

In [None]:
class SecurityReviewerAgent(Agent):
    """Detects security vulnerabilities using Claude"""

    def __init__(self):
        super().__init__("SecurityReviewer", "Security Analysis")

    def analyze(self, code: str) -> List[Issue]:
        self.log_activity("Querying Claude for security analysis...")

        prompt = f"""
Analyze this Python code for security vulnerabilities (SQL injection, XSS, eval/exec usage, unsafe deserialization, hard-coded credentials, authentication/authorization flaws, etc.):

```python
{code}
```

Respond ONLY in valid JSON format (no markdown, no extra text):
{{"issues": [{{"line": 1, "description": "example", "severity": "CRITICAL", "fix": "example fix"}}, ...]}}

If no issues found, return {{"issues": []}}.
Focus on real security risks in a 500-2000 line codebase.
Prioritize CRITICAL and HIGH severity vulnerabilities.
"""

        response = query_llm(prompt)
        issues = []

        try:
            # Try to parse JSON response
            data = json.loads(response)
            for issue_data in data.get("issues", []):
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                issue = Issue(
                    issue_type="security",
                    severity=severity_map.get(
                        issue_data.get("severity", "MEDIUM"), Severity.MEDIUM
                    ),
                    line_number=issue_data.get("line", 1),
                    description=issue_data.get(
                        "description", "Security issue detected"
                    ),
                    suggested_fix=issue_data.get(
                        "fix", "Review and apply security best practices"
                    ),
                    agent_name=self.name,
                )
                issues.append(issue)
        except (json.JSONDecodeError, KeyError, TypeError) as e:
            self.log_activity(f"Failed to parse response: {e}")

        self.tasks_completed += 1
        self.issues_found.extend(issues)
        self.log_activity(f"Claude found {len(issues)} security issues")
        return issues


#### Bug Detector Agent

In [None]:
class BugDetectorAgent(Agent):
    """Identifies bugs using Claude"""

    def __init__(self):
        super().__init__("BugDetector", "Bug Detection")

    def analyze(self, code: str) -> List[Issue]:
        self.log_activity("Querying Claude for bug detection...")

        prompt = f"""
Identify logical, runtime, and potential bugs in this Python code (None/null checks, exception handling, type mismatches, off-by-one errors, uninitialized variables, infinite loops, etc.):

```python
{code}
```

Respond ONLY in valid JSON format (no markdown, no extra text):
{{"issues": [{{"line": 1, "description": "example", "severity": "HIGH", "fix": "example fix"}}, ...]}}

If no bugs found, return {{"issues": []}}.
Consider edge cases and common Python pitfalls in a 500-2000 line codebase.
"""

        response = query_llm(prompt)
        issues = []

        try:
            data = json.loads(response)
            for issue_data in data.get("issues", []):
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                issue = Issue(
                    issue_type="bug",
                    severity=severity_map.get(
                        issue_data.get("severity", "MEDIUM"), Severity.MEDIUM
                    ),
                    line_number=issue_data.get("line", 1),
                    description=issue_data.get("description", "Bug detected"),
                    suggested_fix=issue_data.get("fix", "Review and fix the bug"),
                    agent_name=self.name,
                )
                issues.append(issue)
        except (json.JSONDecodeError, KeyError, TypeError) as e:
            self.log_activity(f"Failed to parse response: {e}")

        self.tasks_completed += 1
        self.issues_found.extend(issues)
        self.log_activity(f"Claude found {len(issues)} bugs")
        return issues


#### Performance Analyzer Agent

In [None]:
class PerformanceAnalyzerAgent(Agent):
    """Identifies performance issues using Claude"""

    def __init__(self):
        super().__init__("PerformanceAnalyzer", "Performance Analysis")

    def analyze(self, code: str) -> List[Issue]:
        self.log_activity("Querying Claude for performance analysis...")

        prompt = f"""
Detect performance bottlenecks in this Python code (O(n¬≤) loops, inefficient data structures, redundant computations, memory leaks, N+1 queries, unnecessary copies, etc.):

```python
{code}
```

Respond ONLY in valid JSON format (no markdown, no extra text):
{{"issues": [{{"line": 1, "description": "example", "severity": "MEDIUM", "fix": "example fix"}}, ...]}}

If no performance issues found, return {{"issues": []}}.
Suggest optimizations with time/space complexity improvements for a 500-2000 line codebase.
"""

        response = query_llm(prompt)
        issues = []

        try:
            data = json.loads(response)
            for issue_data in data.get("issues", []):
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                issue = Issue(
                    issue_type="performance",
                    severity=severity_map.get(
                        issue_data.get("severity", "MEDIUM"), Severity.MEDIUM
                    ),
                    line_number=issue_data.get("line", 1),
                    description=issue_data.get(
                        "description", "Performance issue detected"
                    ),
                    suggested_fix=issue_data.get("fix", "Implement optimization"),
                    agent_name=self.name,
                )
                issues.append(issue)
        except (json.JSONDecodeError, KeyError, TypeError) as e:
            self.log_activity(f"Failed to parse response: {e}")

        self.tasks_completed += 1
        self.issues_found.extend(issues)
        self.log_activity(f"Claude found {len(issues)} performance issues")
        return issues


#### Style Checker Agent

In [None]:
class StyleCheckerAgent(Agent):
    """Validates code style using Claude"""

    def __init__(self):
        super().__init__("StyleChecker", "Code Style Validation")

    def analyze(self, code: str) -> List[Issue]:
        self.log_activity("Querying Claude for style check...")

        prompt = f"""
Validate code style against PEP 8, naming conventions, docstrings, line length, and professional standards in this Python code:

```python
{code}
```

Respond ONLY in valid JSON format (no markdown, no extra text):
{{"issues": [{{"line": 1, "description": "example", "severity": "LOW", "fix": "example fix"}}, ...]}}

If no style issues found, return {{"issues": []}}.
Suggest improvements for maintainability and readability in a 500-2000 line codebase.
"""

        response = query_llm(prompt)
        issues = []

        try:
            data = json.loads(response)
            for issue_data in data.get("issues", []):
                severity_map = {
                    "CRITICAL": Severity.CRITICAL,
                    "HIGH": Severity.HIGH,
                    "MEDIUM": Severity.MEDIUM,
                    "LOW": Severity.LOW,
                }
                issue = Issue(
                    issue_type="style",
                    severity=severity_map.get(
                        issue_data.get("severity", "LOW"), Severity.LOW
                    ),
                    line_number=issue_data.get("line", 1),
                    description=issue_data.get("description", "Style issue detected"),
                    suggested_fix=issue_data.get("fix", "Apply style fix"),
                    agent_name=self.name,
                )
                issues.append(issue)
        except (json.JSONDecodeError, KeyError, TypeError) as e:
            self.log_activity(f"Failed to parse response: {e}")

        self.tasks_completed += 1
        self.issues_found.extend(issues)
        self.log_activity(f"Claude found {len(issues)} style issues")
        return issues


#### Test Generator Agent

In [None]:
class TestGeneratorAgent(Agent):
    """Generates test cases using Claude"""

    def __init__(self):
        super().__init__("TestGenerator", "Test Case Generation")

    def generate_tests(self, code: str, issues: List[Issue]) -> List[str]:
        self.log_activity("Querying Claude for test generation...")

        issues_str = "\n".join(
            [
                f"Line {i.line_number}: {i.description} ({i.severity.name})"
                for i in issues[:10]
            ]
        )

        prompt = f"""
Generate comprehensive test cases (unit, integration, edge cases, error handling) for this Python code.

Focus on covering these issues:
{issues_str}

Code to test:
```python
{code}
```

Provide complete, runnable Python test functions using pytest style. Each test should have descriptive names and cover normal cases, edge cases, and error scenarios.
Include necessary imports (pytest, unittest, mocking, etc.).
"""

        response = query_llm(prompt)
        test_cases = response.split("def test_")[1:]
        test_cases = [f"def test_{tc.strip()}" for tc in test_cases if tc.strip()]

        self.tasks_completed += 1
        self.log_activity(f"‚úÖ Claude generated {len(test_cases)} test cases")
        return test_cases


#### Orchestration Agent

In [None]:
class OrchestratorAgent(Agent):
    """
    Coordinates the code review workflow using intelligent delegation.

    Delegation Strategy:
    1. Task-based delegation: Route to specialized agents based on task type
    2. Priority-based execution: Security ‚Üí Bugs ‚Üí Performance ‚Üí Style
    3. Parallel processing simulation: Independent agents work concurrently
    4. Aggregation: Collect and prioritize all findings
    """

    def __init__(self):
        super().__init__("Orchestrator", "Workflow Coordination")

        # Initialize specialized agents
        self.security_agent = SecurityReviewerAgent()
        self.bug_agent = BugDetectorAgent()
        self.performance_agent = PerformanceAnalyzerAgent()
        self.style_agent = StyleCheckerAgent()
        self.test_agent = TestGeneratorAgent()

        self.agents = [
            self.security_agent,
            self.bug_agent,
            self.performance_agent,
            self.style_agent,
            self.test_agent,
        ]

        self.delegation_log = []

    def delegate_task(self, agent: Agent, task_type: str, code: str) -> List[Issue]:
        """Delegate specific task to an agent"""
        self.log_activity(f"üì§ Delegating '{task_type}' to {agent.name}")
        self.delegation_log.append(
            {
                "from": self.name,
                "to": agent.name,
                "task": task_type,
                "timestamp": time.time(),
            }
        )

        return agent.analyze(code)

    def orchestrate_review(self, code: str) -> CodeReviewResult:
        """
        Main orchestration method implementing the delegation strategy
        """
        self.log_activity("Starting code review workflow...")
        self.log_activity(f"Code size: {len(code.splitlines())} lines")

        result = CodeReviewResult(total_lines_reviewed=len(code.splitlines()))

        # Phase 1: Security (highest priority)
        self.log_activity("\n=== PHASE 1: Security Review ===")
        security_issues = self.delegate_task(
            self.security_agent, "Security Analysis", code
        )
        result.issues.extend(security_issues)

        # Failure Mode 1: Critical security issues halt further review
        critical_security = [
            i for i in security_issues if i.severity == Severity.CRITICAL
        ]
        if critical_security:
            self.log_activity(
                f"FAILURE MODE TRIGGERED: {len(critical_security)} critical security issues found!"
            )
            self.log_activity("Halting review - security issues must be fixed first")
            result.summary = f"BLOCKED: {len(critical_security)} critical security vulnerabilities must be addressed before proceeding."
            return result

        # Phase 2: Bug Detection
        self.log_activity("\n=== PHASE 2: Bug Detection ===")
        bug_issues = self.delegate_task(self.bug_agent, "Bug Detection", code)
        result.issues.extend(bug_issues)

        # Phase 3: Performance Analysis
        self.log_activity("\n=== PHASE 3: Performance Analysis ===")
        perf_issues = self.delegate_task(
            self.performance_agent, "Performance Analysis", code
        )
        result.issues.extend(perf_issues)

        # Phase 4: Style Check
        self.log_activity("\n=== PHASE 4: Style Validation ===")
        style_issues = self.delegate_task(self.style_agent, "Style Check", code)
        result.issues.extend(style_issues)

        # Phase 5: Test Generation
        self.log_activity("\n=== PHASE 5: Test Case Generation ===")
        result.test_cases = self.test_agent.generate_tests(code, result.issues)

        # Failure Mode 2: No issues found (possible false negative)
        if len(result.issues) == 0:
            self.log_activity("FAILURE MODE TRIGGERED: No issues detected!")
            self.log_activity(
                "Recommendation: Run with stricter rule set or manual review"
            )
            result.summary = "No automated issues found. Consider manual peer review for complex logic."
        else:
            # Prioritize issues by severity
            result.issues.sort(key=lambda x: x.severity.value, reverse=True)
            result.summary = self.generate_summary(result.issues)

        self.log_activity("\nCode review workflow completed!")
        return result

    def generate_summary(self, issues: List[Issue]) -> str:
        """Generate executive summary of findings"""
        severity_counts = {
            "CRITICAL": len([i for i in issues if i.severity == Severity.CRITICAL]),
            "HIGH": len([i for i in issues if i.severity == Severity.HIGH]),
            "MEDIUM": len([i for i in issues if i.severity == Severity.MEDIUM]),
            "LOW": len([i for i in issues if i.severity == Severity.LOW]),
        }

        summary = f"Found {len(issues)} total issues: "
        summary += f"{severity_counts['CRITICAL']} critical, "
        summary += f"{severity_counts['HIGH']} high, "
        summary += f"{severity_counts['MEDIUM']} medium, "
        summary += f"{severity_counts['LOW']} low severity."

        return summary

    def get_delegation_stats(self) -> Dict:
        """Return delegation statistics"""
        return {
            "total_delegations": len(self.delegation_log),
            "agents_used": len(self.agents),
            "delegation_log": self.delegation_log,
        }


---

### Display Helpers

In [None]:
def display_results(result: CodeReviewResult):
    """Display formatted review results"""
    print("\n" + "=" * 70)
    print("Code Review Results")
    print("=" * 70)

    print(f"\nSummary: {result.summary}")
    print(f"  Lines Reviewed: {result.total_lines_reviewed}")
    print(f"  Total Issues: {len(result.issues)}")
    print(f"  Test Cases Generated: {len(result.test_cases)}")

    if result.issues:
        print("\n" + "-" * 70)
        print("Issues by Severity")
        print("-" * 70)

        for severity in [
            Severity.CRITICAL,
            Severity.HIGH,
            Severity.MEDIUM,
            Severity.LOW,
        ]:
            severity_issues = [i for i in result.issues if i.severity == severity]
            if severity_issues:
                print(f"\n{severity.name} ({len(severity_issues)} issues):")
                for issue in severity_issues:
                    print(f"\n  Line {issue.line_number} | {issue.issue_type.upper()}")
                    print(f"  Agent: {issue.agent_name}")
                    print(f"  Issue: {issue.description}")
                    print(f"  Fix: {issue.suggested_fix}")

    if result.test_cases:
        print("\n" + "-" * 70)
        print("Generated Test Cases")
        print("-" * 70)
        for i, test in enumerate(result.test_cases[:3], 1):
            print(f"\nTest {i}:")
            print(test[:200] + ("..." if len(test) > 200 else ""))
        if len(result.test_cases) > 3:
            print(f"\n... and {len(result.test_cases) - 3} more tests")


def display_agent_stats(orchestrator: OrchestratorAgent):
    """Display agent performance statistics"""
    print("\n" + "=" * 70)
    print("Agent Performance Statistics")
    print("=" * 70)

    for agent in orchestrator.agents:
        stats = agent.get_stats()
        print(f"\n{stats['name']}:")
        print(f"  Role: {stats['role']}")
        print(f"  Tasks Completed: {stats['tasks_completed']}")
        print(f"  Issues Found: {stats['issues_found']}")


---

### Running the Agent

#### Demo the agent with example queries

In [None]:
SAMPLE_CODE = '''
def process_user_data(user_input, db_connection):
    """Process user data and store in database."""
    query = "SELECT * FROM users WHERE id = " + user_input
    result = db_connection.execute(query)

    data = ""
    for item in result:
        data += str(item)  # Performance issue

    return data

def calculate_value(x, y):
    """Calculate value from inputs."""
    if x == None:  # Bug: should use 'is None'
        return 0
    try:
        value = eval("x + y")  # Security: dangerous eval
    except:  # Bug: bare except
        value = 0
    return value

def validate_and_process(data):
    """Validate and process data."""
    if data == True:  # Bug: redundant comparison
        return process_user_data(data, None)
    return None

def fetch_user_info(user_id):
    """Fetch user information."""
    api_key = "sk-1234567890abcdef"  # Security: hard-coded credentials
    headers = {"Authorization": f"Bearer {api_key}"}
    import subprocess
    result = subprocess.call("curl https://api.example.com/user/" + user_id)  # Security: command injection
    return result
'''


In [None]:
print("\n" + "=" * 70)
print("Demo")
print("=" * 70)

orchestrator = OrchestratorAgent()

result = orchestrator.orchestrate_review(SAMPLE_CODE)
display_results(result)
display_agent_stats(orchestrator)


In [None]:
print("\n" + "=" * 70)
print("Delegation Statistics")
print("=" * 70)

delegation_stats = orchestrator.get_delegation_stats()
print(f"\nTotal Delegations: {delegation_stats['total_delegations']}")
print(f"Agents Utilized: {delegation_stats['agents_used']}")


---