As an example of a multi-agent workflow, I would like to build an application that can handle questions from various domains. We will have a set of expert agents, each specializing in different types of questions, and a router agent that will find the best-suited expert to address each query. Such an application has numerous potential use cases: from automating customer support to answering questions from colleagues in internal chats.

First, we need to create the agent state — the information that will help agents to solve the question together. I will use the following fields:

question — initial customer request.

question_type — the category that defines which agent will be working on the request.

answer — the proposed answer to the question.

feedback — a field for future use that will gather some feedback.

We don’t use any reducers, so our state will store only the latest version of each field.

In [1]:
from typing import TypedDict


class MultiAgentState(TypedDict):
    question: str
    question_type: str
    answer: str
    feedback: str

In [None]:
import os
import traceback
from pathlib import Path

import psycopg
from dotenv import load_dotenv

env_loaded = load_dotenv()
print(f"Env loaded: {env_loaded}")

In [100]:
# Global connection object (initialized by helper function)
_conn = None


def _get_db_connection():
    """Helper function to get a database connection."""
    global _conn
    db_host = os.environ.get("POSTGRES_HOST")
    db_user = os.environ.get("POSTGRES_USER")
    db_password = os.environ.get("POSTGRES_PASSWORD")
    db_name = os.environ.get("POSTGRES_DB")
    db_port = os.environ.get("POSTGRES_PORT")

    connection_string = (
        f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
    )
    print(connection_string)
    if _conn is None or _conn.closed:
        try:
            _conn = psycopg.connect(connection_string)
            _conn.autocommit = False  # Set autocommit to False for more control
        except (Exception, psycopg.Error) as error:
            print(traceback.format_exc())
            raise Exception(f"Error connecting to database: {error}")

    return _conn

In [3]:
def get_sql_data(query):
    """
    Executes the SQL query passed as an argument and returns the data from
    Postgresql database in nicely formatted textual format.

    Args:
        query (str): The SQL query to execute.

    Returns:
        str: A nicely formatted string representation of the data returned
             by the query.
             Returns "Error: No results found" if the query returns no data or
             an error message if an exception occurs.
    """
    conn = None
    try:
        conn = _get_db_connection()
        cur = conn.cursor()
        cur.execute(query)
        rows = cur.fetchall()

        if not rows:
            return "Error: No results found"

        # Get column names for headers
        column_names = [desc[0] for desc in cur.description]

        # Format data with headers
        formatted_data = ""

        # Calculate maximum width for each column
        max_widths = [len(str(col)) for col in column_names]
        for row in rows:
            for i, value in enumerate(row):
                max_widths[i] = max(max_widths[i], len(str(value)))

        # Create header line
        header_line = "|"
        for i, col in enumerate(column_names):
            header_line += f" {col.ljust(max_widths[i])} |"
        formatted_data += header_line + "\n"

        # Create separator line
        separator_line = "|"
        for width in max_widths:
            separator_line += f"-{'-'*width}-|"
        formatted_data += separator_line + "\n"

        # Create data lines
        for row in rows:
            row_line = "|"
            for i, value in enumerate(row):
                row_line += f" {str(value).ljust(max_widths[i])} |"
            formatted_data += row_line + "\n"

        cur.close()
        return formatted_data

    except (Exception, psycopg.Error) as error:
        return f"Database returned error: {error}"
    finally:
        if conn:  # if connection was established
            # Do not close the global connection object
            # conn.close()
            pass

In [4]:
from langchain_core.tools import tool
from pydantic.v1 import BaseModel, Field
from typing import Optional


class SQLQuery(BaseModel):
    query: str = Field(description="SQL query to execute")


@tool(args_schema=SQLQuery)
def execute_sql(query: str) -> str:
    """Returns the result of SQL query execution"""
    return get_sql_data(query)

Then, let’s create a router node. It will be a simple LLM model that defines the category of question (database, LangChain or general questions).

In [45]:
from langchain_openai.chat_models import AzureChatOpenAI

model = AzureChatOpenAI(
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
    api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
    temperature=0,
    max_tokens=8192,
)

In [7]:
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

question_category_prompt = """You are a senior specialist of analytical support. Your task is to classify the incoming questions. 
Depending on your answer, question will be routed to the right team, so your task is crucial for our team. 
There are 3 possible question types: 
- DATABASE - questions related to our database (tables or fields)
- LANGCHAIN- questions related to LangGraph or LangChain libraries
- GENERAL - general questions
Return in the output only one word (DATABASE, LANGCHAIN or  GENERAL).
"""


def router_node(state: MultiAgentState):
    messages = [
        SystemMessage(content=question_category_prompt),
        HumanMessage(content=state["question"]),
    ]
    response = model.invoke(messages)
    return {"question_type": response.content}

Now that we have our first node — the router — let’s build a simple graph to test the workflow.

In [50]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

In [68]:
from langgraph.graph import StateGraph, END

builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)

builder.set_entry_point("router")
builder.add_edge("router", END)

graph = builder.compile(checkpointer=memory)

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_png())

Let’s test our workflow with different types of questions to see how it performs in action. This will help us evaluate whether the router agent correctly assigns questions to the appropriate expert agents.

Langchain based question

In [None]:
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream(
    {
        "question": "Does LangChain support Ollama?",
    },
    thread,
):
    print(s)

Database Related question

In [None]:
thread = {"configurable": {"thread_id": "2"}}
for s in graph.stream(
    {
        "question": "What info do we have in airflow_test.dag_runs table?",
    },
    thread,
):
    print(s)

General Question

In [None]:
thread = {"configurable": {"thread_id": "3"}}
for s in graph.stream(
    {
        "question": "How are you?",
    },
    thread,
):
    print(s)

It is recommend to build complex graphs incrementally and test each step independently. With such an approach, one can ensure that each iteration works expectedly and can save you a significant amount of debugging time.

Next, let’s create nodes for our expert agents. We will use the ReAct agent with the SQL tool we previously built as the database agent.

In [56]:
from langgraph.prebuilt import create_react_agent

# database expert
sql_expert_system_prompt = '''
You are an expert in SQL, so you can help the team 
to gather needed data to power their decisions. 
You are very accurate and take into account all the nuances in data. 
You use SQL to get the data before answering the question.
'''

def sql_expert_node(state: MultiAgentState):
    sql_agent = create_react_agent(model, [execute_sql],
        state_modifier = sql_expert_system_prompt)
    messages = [HumanMessage(content=state['question'])]
    result = sql_agent.invoke({"messages": messages})
    return {'answer': result['messages'][-1].content}

In [60]:
from langchain_community.tools import DuckDuckGoSearchResults

search_run_tool = DuckDuckGoSearchResults()

search_expert_system_prompt = """
You are an expert in LangChain and other technologies. 
Your goal is to answer questions based on results provided by search.
You don't add anything yourself and provide only information baked by other sources. 
"""


def search_expert_node(state: MultiAgentState):
    sql_agent = create_react_agent(
        model, [search_run_tool], state_modifier=search_expert_system_prompt
    )
    messages = [HumanMessage(content=state["question"])]
    result = sql_agent.invoke({"messages": messages})
    return {"answer": result["messages"][-1].content}

For general questions, we will leverage a simple LLM model without specific tools.

In [27]:
# general model
general_prompt = """You're a friendly assistant and your goal is to answer general questions.
Please, don't provide any unchecked information and just tell that you don't know if you don't have enough info.
"""


def general_assistant_node(state: MultiAgentState):
    messages = [
        SystemMessage(content=general_prompt),
        HumanMessage(content=state["question"]),
    ]
    response = model.invoke(messages)
    return {"answer": response.content}

The last missing bit is a conditional function for routing. This will be quite straightforward—we just need to propagate the question type from the state defined by the router node.

In [25]:
def route_question(state: MultiAgentState):
    return state["question_type"]

Now, it’s time to create our graph.

In [70]:
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node("database_expert", sql_expert_node)
builder.add_node("langchain_expert", search_expert_node)
builder.add_node("general_assistant", general_assistant_node)
builder.add_conditional_edges(
    "router",
    route_question,
    {
        "DATABASE": "database_expert",
        "LANGCHAIN": "langchain_expert",
        "GENERAL": "general_assistant",
    },
)


builder.set_entry_point("router")
builder.add_edge("database_expert", END)
builder.add_edge("langchain_expert", END)
builder.add_edge("general_assistant", END)
graph = builder.compile(checkpointer=memory)

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_png())

Now, we can test the setup on a couple of questions to see how well it performs.

In [None]:
thread = {"configurable": {"thread_id": "2"}}
results = []
for s in graph.stream(
    {
        "question": "What info do we have in airflow_test.dag_runs table?",
    },
    thread
):
    print(s)
    results.append(s)
print(results[-1]["database_expert"]["answer"])

Good job! It gives a relevant result for the database-related question. Let’s try asking about LangChain.

In [None]:
thread = {"configurable": {"thread_id": "42"}}
results = []
for s in graph.stream(
    {
        "question": "Can you provide an example for LCEL ?",
    },
    thread,
):
    print(s)
    results.append(s)

print(results[-1]["langchain_expert"]["answer"])

## Adding human-in-the-loop interactions

We’ve done an excellent job creating a tool to answer questions. However, in many cases, it’s beneficial to keep a human in the loop to approve proposed actions or provide additional feedback. Let’s add a step where we can collect feedback from a human before returning the final result to the user.

The simplest approach is to add two additional nodes:

- A human node to gather feedback,
- An editor node to revisit the answer, taking into account the feedback.

Let’s create these nodes:

- Human node: This will be a dummy node, and it won’t perform any actions.
- Editor node: This will be an LLM model that receives all the relevant information (customer question, draft answer and provided feedback) and revises the final answer.

In [72]:
def human_feedback_node(state: MultiAgentState):
    pass

editor_prompt = '''You're an editor and your goal is to provide the final answer to the customer, taking into account the feedback. 
You don't add any information on your own. You use friendly and professional tone.
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.

Question from customer: 
----
{question}
----
Draft answer:
----
{answer}
----
Feedback: 
----
{feedback}
----
'''

def editor_node(state: MultiAgentState):
  messages = [
    SystemMessage(content=editor_prompt.format(question = state['question'], answer = state['answer'], feedback = state['feedback']))
  ]
  response = model.invoke(messages)
  return {"answer": response.content}

Let’s add these nodes to our graph. Additionally, we need to introduce an interruption before the human node to ensure that the process pauses for human feedback.

In [73]:
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node("database_expert", sql_expert_node)
builder.add_node("langchain_expert", search_expert_node)
builder.add_node("general_assistant", general_assistant_node)
builder.add_node("human", human_feedback_node)
builder.add_node("editor", editor_node)

builder.add_conditional_edges(
    "router",
    route_question,
    {
        "DATABASE": "database_expert",
        "LANGCHAIN": "langchain_expert",
        "GENERAL": "general_assistant",
    },
)


builder.set_entry_point("router")

builder.add_edge("database_expert", "human")
builder.add_edge("langchain_expert", "human")
builder.add_edge("general_assistant", "human")
builder.add_edge("human", "editor")
builder.add_edge("editor", END)
graph = builder.compile(checkpointer=memory, interrupt_before=["human"])

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_png())

Now, when we run the graph, the execution will be stopped before the human node.

In [None]:
thread = {"configurable": {"thread_id": "2"}}

for event in graph.stream(
    {
        "question": "What are the types of fields airflow_test.dag_runs table?",
    },
    thread,
):
    print(event)

Let’s get the customer input and update the state with the feedback.

In [None]:
user_input = input("Do I need to change anything in the answer?")

# Do I need to change anything in the answer?
# It looks wonderful. Could you only make it a bit friendlier please?

graph.update_state(thread, {"feedback": user_input}, as_node="human")

We can check the state to confirm that the feedback has been populated and that the next node in the sequence is editor.

In [None]:
print(graph.get_state(thread).values['feedback'])

print(graph.get_state(thread).next)

We can just continue the execution. Passing None as input will resume the process from the point where it was paused.

In [None]:
for event in graph.stream(None, thread, stream_mode="values"):
    print(event)

print(event["answer"])

We can implement human-in-the-loop interactions in a more agentic way by equipping our editor with the [Human](https://python.langchain.com/v0.2/docs/integrations/tools/human_tools/) tool.

In [81]:
from langchain_community.tools import HumanInputRun

human_tool = HumanInputRun()

In [82]:
state = {
    "question": "What are the types of fields in ecommerce_db.users table?",
    "answer": "The `ecommerce_db.users` table has the following fields:\n\n1. **user_id**: UInt64\n2. **country**: String\n3. **is_active**: UInt8\n4. **age**: UInt64",
}

In [None]:
editor_agent_prompt = """You're an editor and your goal is to provide the final answer to the customer, taking into the initial question.
If you need any clarifications or need feedback, please, use human. Always reach out to human to get the feedback before final answer.
You don't add any information on your own. You use friendly and professional tone. 
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.

Question from customer: 
----
{question}
----
Draft answer:
----
{answer}
----
"""

editor_agent = create_react_agent(model, [human_tool])
messages = [
    SystemMessage(
        content=editor_agent_prompt.format(
            question=state["question"], answer=state["answer"]
        )
    )
]
editor_result = editor_agent.invoke({"messages": messages})

In [None]:
print(editor_result["messages"][-1].content)

In [94]:
def editor_agent_node(state: MultiAgentState):
    editor_agent = create_react_agent(model, [human_tool])
    messages = [
        SystemMessage(
            content=editor_agent_prompt.format(
                question=state["question"], answer=state["answer"]
            )
        )
    ]
    result = editor_agent.invoke({"messages": messages})
    return {"answer": result["messages"][-1].content}

In [None]:
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node("database_expert", sql_expert_node)
builder.add_node("langchain_expert", search_expert_node)
builder.add_node("general_assistant", general_assistant_node)
builder.add_node("editor", editor_agent_node)

builder.add_conditional_edges(
    "router",
    route_question,
    {
        "DATABASE": "database_expert",
        "LANGCHAIN": "langchain_expert",
        "GENERAL": "general_assistant",
    },
)

builder.set_entry_point("router")

builder.add_edge("database_expert", "editor")
builder.add_edge("langchain_expert", "editor")
builder.add_edge("general_assistant", "editor")
builder.add_edge("editor", END)

In [96]:
graph = builder.compile(checkpointer=memory)

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_png())

In [None]:
thread = {"configurable": {"thread_id": "42"}}
results = []

for event in graph.stream(
    {
        "question": "What are the types of fields in airflow_test.dag_runs table?",
    },
    thread,
):
    print(event)
    results.append(event)