# Hierarchical Agent Teams

## Introduction

As tasks grow in complexity or scale, managing them with a single supervisor node may become inefficient. A single supervisor can route tasks between workers, but what happens when individual tasks require intricate subtasks or when the number of workers becomes overwhelming?

In such scenarios, a hierarchical structure becomes a powerful solution. By breaking down tasks into sub-tasks and organizing workers into teams with their own supervisors, we can create a robust and scalable system. Each team handles a specific domain of tasks, and mid-level supervisors coordinate efforts before reporting to a top-level supervisor.

This approach enables efficient task distribution, better resource management, and improved scalability for complex workflows.

**In this notebook, we will**:

- Define tools for agents to access web data and manage files
- Implement utilities to streamline the creation of task workflows
- Develop teams specialized in web research and document writing
- Compose these components into a hierarchical system of supervisors and workers

## Installation of Required Packages

First, we need to install the necessary packages required for our hierarchical agent system. These packages include various components of LangChain, LangGraph, ChromaDB, DuckDuckGo search, and Wikipedia integration.

In [None]:
!pip install -qU langchain-openai
!pip install -qU langchain-anthropic
!pip install -qU langchain_community
!pip install -qU langchain_experimental
!pip install -qU langgraph
!pip install -qU chromadb
!pip install -qU duckduckgo_search
!pip install -qU wikipedia

## Importing Modules and Setting Up the Environment

In this section, we import all the necessary modules and set up the environment variables. We also define the tools that our agents will use, such as web scraping, document writing, and Python REPL execution.

In [None]:
import os

# Set a custom user agent for HTTP requests
os.environ["USER_AGENT"] = "MyApp/1.0 (https://myapp.com; contact@myapp.com)"

from typing import Annotated, List, Dict, Optional, Literal
from pathlib import Path
from tempfile import TemporaryDirectory
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langchain_core.language_models.chat_models import BaseChatModel
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
from langchain_core.messages import BaseMessage, HumanMessage, trim_messages
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from typing_extensions import TypedDict
from IPython.display import Image, display

## Creating Tools for the Research Team

Here, we define the tools that the Research Team will use. This includes a search tool using DuckDuckGo and a web scraping tool to extract information from provided URLs.

In [None]:
# --------------------------------------------------------------------------------------------------------
# Create Tools
# --------------------------------------------------------------------------------------------------------

# Research Team Tools
# Search tool using DuckDuckGo
search_tool = DuckDuckGoSearchRun()

@tool
def scrape_webpages(urls: List[str]) -> str:
    """
    Scrapes the provided web pages for detailed information using WebBaseLoader.
    Args:
        urls (List[str]): A list of URLs to scrape.
    Returns:
        str: A string containing the scraped content in a structured format.
    """
    loader = WebBaseLoader(urls)
    docs = loader.load()
    return "\n\n".join(
        [
            f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
            for doc in docs
        ]
    )

## Creating Tools for the Document Writing Team

The Document Writing Team requires tools to create outlines, read, write, and edit documents. We set up these tools along with a temporary directory for file operations. A temporary directory is created for file operations, and the following tools are implemented:

1. **`create_outline`**:  
   Creates an outline from a list of points and saves it to a specified file.  
   Example: `create_outline(["Introduction", "Methodology"], "outline.txt")`.

2. **`read_document`**:  
   Reads content from a file, optionally specifying a start and end line for partial reading.  
   Example: `read_document("outline.txt", start=0, end=2)`.

3. **`write_document`**:  
   Writes provided content to a specified file.  
   Example: `write_document("This is the content.", "document.txt")`.

4. **`edit_document`**:  
   Edits a document by inserting text at specific line numbers (1-indexed).  
   Example: `edit_document("document.txt", {2: "This is an inserted line."})`.

Each tool operates within the temporary directory and returns a confirmation message with the file path or the content read. These tools enable the Document Writing Team to perform file-based operations efficiently.

In [None]:
# Document Writing Team Tools
# Create a temporary directory for file operations
_TEMP_DIRECTORY = TemporaryDirectory()
WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)

@tool
def create_outline(
    points: Annotated[List[str], "List of main points or sections."],
    file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
    """
    Creates and saves an outline to a file.

    result = create_outline(["Introduction", "Methodology", "Results", "Conclusion"], "outline.txt")

    Args:
        points (List[str]): A list of main points or sections.
        file_name (str): The file path to save the outline.
    Returns:
        str: A confirmation message with the path of the saved outline file.
    """
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        for i, point in enumerate(points):
            file.write(f"{i + 1}. {point}\n")
    return f"Outline saved to {file_name}"

@tool
def read_document(
    file_name: Annotated[str, "File path to read the document from."],
    start: Annotated[Optional[int], "The start line. Default is 0"] = None,
    end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
    """
    Reads the specified document from a file.

    content = read_document("outline.txt", start=0, end=2)

    Args:
        file_name (str): The file path to read the document from.
        start (Optional[int]): The start line. Default is 0.
        end (Optional[int]): The end line. Default is None.
    Returns:
        str: A string containing the specified lines from the document.
    """
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    if start is not None:
        start = 0
    return "\n".join(lines[start:end])

@tool
def write_document(
    content: Annotated[str, "Text content to be written into the document."],
    file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
    """
    Writes content to a file.
    Args:
        content (str): The text content to write.
        file_name (str): The file path to save the document.
    Returns:
        str: A confirmation message with the path of the saved document file.
    """
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.write(content)
    return f"Document saved to {file_name}"

@tool
def edit_document(
    file_name: Annotated[str, "Path of the document to be edited."],
    inserts: Annotated[
        Dict[int, str],
        "Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
    ],
) -> Annotated[str, "Path of the edited document file."]:
    """
    Edits a document by inserting text at specific line numbers.

    result = edit_document("document.txt", {2: "This is an inserted line."})

    Args:
        file_name (str): The file path of the document to edit.
        inserts (Dict[int, str]): A dictionary where keys are line numbers (1-indexed) and values are text to insert.
    Returns:
        str: A confirmation message with the path of the edited document file.
    """
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    sorted_inserts = sorted(inserts.items())
    for line_number, text in sorted_inserts:
        if 1 <= line_number <= len(lines) + 1:
            lines.insert(line_number - 1, text + "\n")
        else:
            return f"Error: Line number {line_number} is out of range."
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.writelines(lines)
    return f"Document edited and saved to {file_name}"

## Setting Up the Python REPL Tool

The Python REPL tool allows for the execution of Python code within the agents. This can be useful for generating charts or performing computations based on the scraped data.

In [None]:
# Warning: This executes code locally, which can be unsafe when not sandboxed
repl = PythonREPL()

@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """
    Executes Python code using a REPL (Read-Eval-Print Loop).
    Args:
        code (str): The Python code to execute.
    Returns:
        str: A string containing the execution result or an error message.
    """
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    return f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"

## Defining Helper Utilities

We define helper functions and utilities that assist in creating supervisor nodes and setting up the language model (LLM) for our agents.

In [None]:
# --------------------------------------------------------------------------------------------------------
# Helper Utilities
# --------------------------------------------------------------------------------------------------------

def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
    """
    Creates a supervisor node that routes tasks to workers.

    supervisor_node = make_supervisor_node(llm, ["search", "web_scraper"])

    Args:
        llm (BaseChatModel): A language model (e.g., ChatOpenAI).
        members (list[str]): A list of worker names.
    Returns:
        str: A function (supervisor_node) that routes tasks to the appropriate worker.
    """
    options = ["FINISH"] + members
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )

    class Router(TypedDict):
        """Worker to route to next. If no workers needed, route to FINISH."""
        next: str # Literal[ ["FINISH"] + members ]

    def supervisor_node(state: MessagesState) -> Command[str]:  # Changed return type
        """
        An LLM-based router.
        Args:
            state (MessagesState): The current state of the messages.
        Returns:
            Command[str]: A command indicating the next worker to route to.
        """
        messages = [{"role": "system", "content": system_prompt}] + state["messages"]
        response = llm.with_structured_output(Router).invoke(messages)
        goto = response["next"]
        if goto == "FINISH":
            goto = END
        return Command(goto=goto)

    return supervisor_node

## Fetching API Key

This cell retrieves the OpenAI API key stored in Kaggle Secrets and initializes the `ChatOpenAI` model with the fetched API key. The `UserSecretsClient` is used to securely fetch the key, ensuring the API key remains private.

In [None]:
from kaggle_secrets import UserSecretsClient

# Fetch the OpenAI API key from Kaggle secrets
my_api_key = UserSecretsClient().get_secret("my-openai-api-key")

# Initialize the ChatOpenAI model with the fetched API key
llm = ChatOpenAI(model="gpt-4o-mini", api_key=my_api_key)

## Setting Up the Research Team

The Research Team is responsible for searching the web and scraping relevant information. We create agents for searching and web scraping, define their respective nodes, and compile them into a state graph.

In [None]:
# --------------------------------------------------------------------------------------------------------
# Research Team
# --------------------------------------------------------------------------------------------------------

# Create a search agent using the LLM and the search tool
search_agent = create_react_agent(llm, tools=[search_tool])

def search_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Executes the search agent and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    result = search_agent.invoke(state)
    print(">>> search_node >>>")
    print(result["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< search_node <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="search")
            ]
        },
        # Always route back to the supervisor after completing the task
        goto="supervisor",
    )

# Create a web scraper agent using the LLM and the scrape_webpages tool
web_scraper_agent = create_react_agent(llm, tools=[scrape_webpages])

def web_scraper_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Executes the web scraper agent and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    result = web_scraper_agent.invoke(state)
    print(">>> web_scraper_node >>>")
    print(result["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< web_scraper_node <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="web_scraper")
            ]
        },
        # Always route back to the supervisor after completing the task
        goto="supervisor",
    )

# Create a supervisor node for the research team
research_supervisor_node = make_supervisor_node(llm, ["search", "web_scraper"])

# Build the research team state graph
research_builder = StateGraph(MessagesState)
research_builder.add_node("supervisor", research_supervisor_node)  # Add supervisor node
research_builder.add_node("search", search_node)  # Add search node
research_builder.add_node("web_scraper", web_scraper_node)  # Add web scraper node

research_builder.add_edge(START, "supervisor")  # Start with the supervisor
research_graph = research_builder.compile()  # Compile the graph

# Display the research team workflow as a Mermaid diagram
display(Image(research_graph.get_graph().draw_mermaid_png()))

# Stream the research team workflow with a user query
for s in research_graph.stream(
    {"messages": [("user", "when is Taylor Swift's next tour?")]},
    {"recursion_limit": 100},  # Limit recursion to prevent infinite loops
):
    print(s)  # Print the state at each step
    print("-" * 60)  # Separator for readability

## Setting Up the Document Writing Team

The Document Writing Team handles creating, reading, editing, and writing documents based on outlines. Additionally, it can generate charts by executing Python code. We create agents for document writing, note-taking, and chart generation, define their nodes, and compile them into a state graph.

### **Document Writing Team Setup**
This block defines the **document writer agent**, which is responsible for reading, writing, and editing documents based on outlines provided by the note-taker. It uses an LLM (Large Language Model) and tools like `write_document`, `edit_document`, and `read_document`. The `doc_writing_node` function executes the agent, prints the result for debugging, and routes the output back to the supervisor.

In [None]:
# --------------------------------------------------------------------------------------------------------
# Document Writing Team
# --------------------------------------------------------------------------------------------------------

# Create a document writer agent using the LLM and document-related tools
doc_writer_agent = create_react_agent(
    llm,
    tools=[write_document, edit_document, read_document],
    state_modifier=(
        "You can read, write and edit documents based on note-taker's outlines. "
        "Don't ask follow-up questions."
    ),
)

def doc_writing_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Executes the document writer agent and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    result = doc_writer_agent.invoke(state)
    print(">>> doc_writing_node >>>")
    print(result["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< doc_writing_node <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="doc_writer")
            ]
        },
        # Always route back to the supervisor after completing the task
        goto="supervisor",
    )

### **Note-Taking Agent Setup**
This block defines the **note-taking agent**, which is responsible for reading documents and creating outlines for the document writer. It uses tools like `create_outline` and `read_document`. The `note_taking_node` function executes the agent, prints the result for debugging, and routes the output back to the supervisor.

In [None]:
# Create a note-taking agent using the LLM and outline-related tools
note_taking_agent = create_react_agent(
    llm,
    tools=[create_outline, read_document],
    state_modifier=(
        "You can read documents and create outlines for the document writer. "
        "Don't ask follow-up questions."
    ),
)

def note_taking_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Executes the note-taking agent and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    result = note_taking_agent.invoke(state)
    print(">>> note_taking_node >>>")
    print(result["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< note_taking_node <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="note_taker")
            ]
        },
        # Always route back to the supervisor after completing the task
        goto="supervisor",
    )

### **Chart-Generating Agent Setup**
This block defines the **chart-generating agent**, which is responsible for generating charts using the LLM and a Python REPL tool. It can also read documents. The `chart_generating_node` function executes the agent, prints the result for debugging, and routes the output back to the supervisor. Additionally, a **supervisor node** is created to manage the interactions between the document writer, note-taker, and chart generator.

In [None]:
# Create a chart-generating agent using the LLM and Python REPL tool
chart_generating_agent = create_react_agent(
    llm, tools=[read_document, python_repl_tool]
)

def chart_generating_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Executes the chart-generating agent and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    result = chart_generating_agent.invoke(state)
    print(">>> chart_generating_node >>>")
    print(result["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< chart_generating_node <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(
                    content=result["messages"][-1].content, name="chart_generator"
                )
            ]
        },
        # Always route back to the supervisor after completing the task
        goto="supervisor",
    )

# Create a supervisor node for the document writing team
doc_writing_supervisor_node = make_supervisor_node(
    llm, ["doc_writer", "note_taker", "chart_generator"]
)

### **State Graph Construction and Execution**
This block builds the **state graph** for the document writing team workflow. It adds nodes for the supervisor, document writer, note-taker, and chart generator. The workflow starts with the supervisor and routes tasks to the appropriate agents. The graph is compiled, and a Mermaid diagram is displayed to visualize the workflow. Finally, the workflow is executed with a user query (e.g., "Write an outline for a poem about cats and then write the poem to disk"), and the state is printed at each step for debugging.

In [None]:
# Build the document writing team state graph
paper_writing_builder = StateGraph(MessagesState)
paper_writing_builder.add_node("supervisor", doc_writing_supervisor_node)  # Add supervisor node
paper_writing_builder.add_node("doc_writer", doc_writing_node)  # Add document writer node
paper_writing_builder.add_node("note_taker", note_taking_node)  # Add note-taker node
paper_writing_builder.add_node("chart_generator", chart_generating_node)  # Add chart generator node

paper_writing_builder.add_edge(START, "supervisor")  # Start with the supervisor
paper_writing_graph = paper_writing_builder.compile()  # Compile the graph

# Display the document writing team workflow as a Mermaid diagram
display(Image(paper_writing_graph.get_graph().draw_mermaid_png()))

In [None]:
# Stream the document writing team workflow with a user query
for s in paper_writing_graph.stream(
    {
        "messages": [
            (
                "user",
                "Write an outline for poem about cats and then write the poem to disk.",
            )
        ]
    },
    {"recursion_limit": 100},  # Limit recursion to prevent infinite loops
):
    print(s)  # Print the state at each step
    print("-" * 60)  # Separator for readability

## Composing Teams into a Hierarchical Structure

To manage multiple teams effectively, we create a top-level supervisor that oversees both the Research Team and the Document Writing Team. This hierarchical structure allows for better task distribution and management.

In [None]:
# --------------------------------------------------------------------------------------------------------
# Add Layers
# --------------------------------------------------------------------------------------------------------

# Create a supervisor node for the combined teams
teams_supervisor_node = make_supervisor_node(llm, ["research_team", "writing_team"])

def call_research_team(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Calls the research team and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    response = research_graph.invoke({"messages": state["messages"][-1]})
    print(">>> call_research_team >>>")
    print(response["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< call_research_team <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(
                    content=response["messages"][-1].content, name="research_team"
                )
            ]
        },
        goto="supervisor",
    )

def call_paper_writing_team(state: MessagesState) -> Command[Literal["supervisor"]]:
    """
    Calls the document writing team and returns the result to the supervisor.
    Args:
        state (MessagesState): The current state of the messages.
    Returns:
        Command[Literal["supervisor"]]: A command to update the state and route to the supervisor.
    """
    response = paper_writing_graph.invoke({"messages": state["messages"][-1]})
    print(">>> call_paper_writing_team >>>")
    print(response["messages"][-1].content)  # Print the latest message content for debugging
    print("<<< call_paper_writing_team <<<")
    return Command(
        update={
            "messages": [
                HumanMessage(
                    content=response["messages"][-1].content, name="writing_team"
                )
            ]
        },
        goto="supervisor",
    )

# Build the combined teams state graph
super_builder = StateGraph(MessagesState)
super_builder.add_node("supervisor", teams_supervisor_node)  # Add supervisor node
super_builder.add_node("research_team", call_research_team)  # Add research team node
super_builder.add_node("writing_team", call_paper_writing_team)  # Add writing team node

super_builder.add_edge(START, "supervisor")  # Start with the supervisor
super_graph = super_builder.compile()  # Compile the graph

# Display the combined teams workflow as a Mermaid diagram
display(Image(super_graph.get_graph().draw_mermaid_png()))

# Stream the combined teams workflow with a user query
for s in super_graph.stream(
    {
        "messages": [
            ("user", "Research AI agents and write a brief report about them.")
        ],
    },
    {"recursion_limit": 150},  # Limit recursion to prevent infinite loops
):
    print(s)  # Print the state at each step
    print("-" * 60)  # Separator for readability

## Conclusion

By organizing agents into hierarchical teams with supervisors at different levels, we can efficiently manage complex tasks and a large number of workers. This structure enhances scalability and maintainability, allowing for more sophisticated and organized workflows in AI-driven applications.