<b>Multiple Agents - distributed Agentic AI</b>

In [5]:
from IPython.display import Image

In [2]:
from pydantic import BaseModel, Field, ValidationError
import json

We have been building a single, highly optimized monolith. You built the context-management, the self-healing heartbeat loop, and the vector storage. In traditional software engineering terms, you have successfully containerized a single, robust Microservice. <br>
Now, we need to build the orchestration layerâ€”the Kubernetes or API Gateway of your AI system. <br>
When you transition to a hierarchical multi-agent framework (like Microsoft AutoGen or Letta), you stop asking the LLM to solve the problem directly. Instead, you introduce a Manager Agent whose sole purpose is routing, delegation, and state evaluation. <br>
Here is the architectural blueprint for wrapping your robust worker nodes into a multi-agent hierarchy.<br>

<b>The Mathematical Model: The Routing Policy </b> <br> 
In a single-agent system, the action space is the set of available Python tools. In a hierarchical multi-agent system, the action space is the set of other agents. <br> 
Let $\mathcal{W} = \{w_1, w_2, \dots, w_k\}$ represent your pool of specialized worker agents (e.g., $w_1$ is a DB Query Agent, $w_2$ is a Web Search Agent). <br> 
The Manager maintains a Global State $S_{global}$ (the overarching task). Its job is to execute a routing policy $\pi(a | S_{global})$ where the action $a \in \mathcal{W} \cup \{\text{Final Output}\}$. <br> 
The Manager evaluates the sub-task, selects $w_i$, waits for $w_i$'s internal heartbeat loop to reach a terminal state, and then absorbs $w_i$'s final output back into $S_{global}$.

<b> Step 1: Wrap Your Workers as Tools </b> <br>
To make this work in pure Python (which maps exactly to how AutoGen's GroupChatManager functions under the hood), we encapsulate the run_os_agent loop we built earlier into callable functions.

In [1]:
from pydantic import BaseModel, Field

# 1. We instantiate two isolated instances of our previous architecture
def run_db_worker(sub_task: str) -> str:
    """A worker node with access to SQL tools and schema FAISS memory."""
    # ... executes the heartbeat loop from our previous steps ...
    return final_worker_string 

def run_research_worker(sub_task: str) -> str:
    """A worker node with access to Web Search and document FAISS memory."""
    # ... executes the heartbeat loop from our previous steps ...
    return final_worker_string

# 2. We define strict Pydantic schemas for delegation
class DelegateToDBAgent(BaseModel):
    sub_task: str = Field(description="The specific SQL/Data task the worker must solve.")
    context: str = Field(description="Any prior knowledge the worker needs to know.")

class DelegateToResearchAgent(BaseModel):
    sub_task: str = Field(description="The specific research question to answer.")

<b> Step 2: The Manager's Orchestration Loop </b> <br>
The Manager has its own context window and its own loop, but it does not have access to the underlying tools (like FAISS or database connections). It only has access to the Delegation tools.

In [2]:
def run_manager_agent(complex_user_request: str):
    # 1. The Global State
    global_memory = [
        {"role": "system", "content": "You are the Orchestrator. Break down the user's request and delegate sub-tasks to your workers. Do not answer the question yourself."}
    ]
    global_memory.append({"role": "user", "content": complex_user_request})
    
    manager_steps = 0
    
    while manager_steps < 5:
        manager_steps += 1
        print(f"\n[MANAGER] Evaluating Global State (Step {manager_steps})...")
        
        # 2. The Routing Decision
        llm_response = call_llm(global_memory)
        
        if llm_response["status"] == "complete":
            return llm_response["content"] # The Manager compiles the final answer
            
        elif llm_response["status"] == "tool_call":
            tool_name = llm_response["tool_name"]
            args = llm_response["tool_args"]
            
            # 3. Thread Handoff (The Manager blocks while the Worker loops)
            print(f"[MANAGER] Delegating to {tool_name} with task: {args['sub_task']}")
            
            if tool_name == "DelegateToDBAgent":
                worker_result = run_db_worker(args["sub_task"])
            elif tool_name == "DelegateToResearchAgent":
                worker_result = run_research_worker(args["sub_task"])
                
            # 4. Global State Update
            print(f"[MANAGER] Worker returned. Updating global state.")
            global_memory.append({
                "role": "system", 
                "content": f"Worker {tool_name} completed task. Result: {worker_result}"
            })
            
            # The Manager loops, reads the worker's result, and decides if it needs 
            # to delegate to the next worker or formulate the final answer to the user.
            
    return "Error: Orchestrator failed to coordinate workers in time."

<i>How Frameworks Abstract This </i> <br>
If you look at the code above, you can see exactly why frameworks like AutoGen and CrewAI exist. Writing these nested while loops manually becomes tedious when you have 10 agents. <br>

In AutoGen (AG2): Instead of writing the DelegateTo... classes, you instantiate ConversableAgent objects. The GroupChatManager acts as the while loop, dynamically reading the chat history and calling the .generate_reply() method of the next worker agent. <br>

In CrewAI: You define Task objects and assign an Agent to them. CrewAI's Process.hierarchical setting automatically spins up an LLM Manager that acts exactly like our run_manager_agent, passing the output of the DB worker as the input string to the Research worker. <br>

By wrapping your robust worker nodes in this Manager loop, you now have a highly scalable architecture. The Manager handles the high-level cognitive planning $\Psi$, while the Workers execute the granular action policies $\pi$.

However, introducing multiple autonomous agents introduces a severe complication: Inter-Agent Conflict and Infinite Arguments. What happens when your Coder Agent writes a script, your Tester Agent runs it and finds a bug, but the Coder Agent stubbornly refuses to change the logic, resulting in the two agents burning through 50 API calls arguing with each other?

Architectural patterns for Consensus Mechanisms and Deadlock Resolution so you can forcefully break infinite loops between argumentative agents.