# Building a Multi-Agent System with LangGraph

Welcome to the second part of our workshop! Now that we've learned the basics of LangChain and RAG, we'll explore how to build more sophisticated AI applications using **LangGraph**. We'll create a multi-agent system where different AI agents collaborate to provide comprehensive financial analysis.

## Setting Up Our Environment

Just like in our previous session, we need to set up our environment with the necessary imports and configurations. We'll use the same OpenAI models and Bloomberg news database, but we'll add some new components for our multi-agent system.

#### Global configuration

In [None]:
import os

from IPython.display import Markdown

In [None]:
if not os.environ.get("OPENAI_API_KEY"):
    raise ValueError("Please set OPENAI_API_KEY environment variable")

LLM_MODEL = "gpt-4o-mini"
LLM_TEMPERATURE = 0

EMBEDDING_MODEL = "text-embedding-3-small"
RETRIEVAL_K = 3

#### Base model

In [None]:
from langchain_openai import ChatOpenAI

In [None]:
base_model = ChatOpenAI(model=LLM_MODEL, temperature=LLM_TEMPERATURE)

#### Vector Store and Retriever

In [None]:
import pickle

from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings

In [None]:
def load_documents(pickle_filepath: str) -> list[Document]:
    """Load documents from a pickle file."""
    with open(pickle_filepath, "rb") as file:
        return pickle.load(file)


def initialize_vector_store(document_chunks: list[Document]) -> Chroma:
    """Reset the Chroma collection and initialize a vector store using document chunks."""
    Chroma().reset_collection()
    embedding_model = OpenAIEmbeddings(model=EMBEDDING_MODEL)
    return Chroma.from_documents(documents=document_chunks, embedding=embedding_model)


data_dir = "../data/"
data_file = "bloomberg_financial_news_1k.pkl"

documents = load_documents(os.path.join(data_dir, data_file))

vector_store = initialize_vector_store(documents)
retriever = vector_store.as_retriever(search_kwargs={"k": RETRIEVAL_K})

#### Tools

In [None]:
from langchain_core.tools import tool

In [None]:
@tool
def retrieval(retrieval_query: str) -> list[Document]:
    """Retrieve documents based on a query."""
    return retriever.invoke(retrieval_query)


tools = [retrieval]
tools_by_name = {tool.name: tool for tool in tools}

## Understanding the Multi-Agent Workflow

Our system consists of three specialized agents working together:
1. **Client Interface Agent (CIA)**: Analyzes client requests and plans research tasks
2. **Bloomberg Research Agent (BRA)**: Conducts specific research using the Bloomberg news database
3. **Research Synthesis Agent (RSA)**: Synthesizes research into final recommendations

To build our workflow with **LangGraph**, we need to implements functions that receive the current state and return state updates. These functions represent the nodes of the graph. Each node is assigned a label.

We'll use the following names for our nodes:
- Node with **CIA**: `"orchestrator"`
- Node with **BRA**: `"worker"`
- Node with **RSA**: `"synthesizer"`

![Financial Assistant Workflow](../imgs/financial-assistant-workflow.png)

## LangGraph Workflow and State

**LangGraph** helps us manage communication between our agents efficiently by defining a `State` class that will convey information from a node to the next during execution.

We keep our state simple by including only two attributes, but it's possible to include more:
- Messages: The ongoing conversation chain
- Analyses: Research findings from our agents

We use Python's dataclasses with special annotations (`Annotated`) to define how the state attributes should be updated throughout the workflow.

In [None]:
import operator
from dataclasses import dataclass, field
from typing import Annotated

from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages

In [None]:
@dataclass(kw_only=True)
class State:
    """Graph state for the financial analysis workflow."""

    messages: Annotated[list[BaseMessage], add_messages] = field(default_factory=list)
    analyses: Annotated[list[str], operator.add] = field(default_factory=list)

## The Agents in Detail

### Client Interface Agent (CIA)

The CIA serves as our system's orchestrator. It:
- Evaluates client requests
- Determines if requests are within scope
- Breaks down the request into specific tasks to be conducted by the Bloomberg Research Agents

#### Structured Outputs with Pydantic

Leveraging **Pydantic**, **LangChain** allows to constrain the output of a model using `.with_structured_output()`. Similarly to `.bind_tools()`, this provides the model with all the relevant information to structure its output in the desired way.

We define two structured outputs:
- ResearchTask: Defines specific research objectives
- OrchestratorDecision: Helps the CIA answer the client's request and plan the research tasks

This approach helps maintain consistency and reliability during our workflow execution.

In [None]:
from pydantic import BaseModel, Field

In [None]:
class ResearchTask(BaseModel):
    """Task for the financial analysis workflow."""

    topic: str = Field(description="Topic of the research task.")
    description: str = Field(
        description="Brief description of the task and its objectives."
    )


class OrchestratorDecision(BaseModel):
    """List of research tasks for the financial analysis workflow."""

    response: str = Field(description="Rationale for the decision and research tasks.")
    in_scope: bool = Field(
        description="Wether the client request is in scope for the financial analysis."
    )
    research_tasks: list[ResearchTask] | None = Field(
        description="List of research tasks to be completed."
    )

Now that the we defined our structured output, we can instanciate our CIA model using `.with_structured_output()`.

In [None]:
CIA_PROMPT = """
You are a Client Interface Agent (CIA) in a financial analysis system. You have multiple Research Agents with access to Bloomberg Financial News under your supervision.

Given a client request, provide a concise, polite and professional response regarding the feasibility of the request and the approach that will be taken to address it.

If the user's request is addressable, create a short list of highly specific research topics that the Research Agents will investigate to fulfill the client's request.
"""

# TODO: Create the CIA model from the base model with structured output
cia_model = ...

#### Defining the Nodes and Flow

Let's now define our first graph Node. This section encompasses multiple **LangGraph** concepts at once and is worth spending some time on.

1. Defining the node:
    - To define our orchestrator node, we implement a function that receives the graph state and apply operations on it
    - In this case, the `"orchestrator"` calls the CIA model with structured output on the user request.

2. Defining the flow:
    - In December, **LangGraph** released [`Command`](https://blog.langchain.dev/command-a-new-tool-for-multi-agent-architectures-in-langgraph/), a novel way of defining the graph edges  directly within the nodes.
    - `Command` can return both state updates (`update`) and the next node (`goto`).
    - In the following example, the `"orchestrator"` node updates the state messages with the CIA's response.
    - If the user request is out of scope, it terminates the workflow by going to the `END` node.
    - Otherwise, it goes to a dynamically generated number of `"worker"` nodes and provide them each with a `ResearchTask`.

3. Dynamic number of nodes:
    - Instead of returning a single string in `Command` we return a list.
    - One worker is created for each research task defined by the CIA.
    - Each worker will receive a different `ResearchTask` input using [`Send`](https://langchain-ai.github.io/langgraph/concepts/low_level/#send)

In [None]:
from typing import Literal

from langchain_core.messages import SystemMessage
from langgraph.constants import Send
from langgraph.graph import END
from langgraph.types import Command

In [None]:
# Define the CIA orchestrator with the next node options
def orchestrator_node(state: State) -> Command[Literal["worker", END]]:
    """Orchestrator that generates a plan for the report."""
    display(Markdown(f"**Client request received**: {state.messages[-1].content}"))

    # Message list for the CIA model
    messages = [
        SystemMessage(CIA_PROMPT),
        *state.messages,
    ]

    # Invoke the CIA model
    cia_output = cia_model.invoke(messages)

    display(Markdown(f"**CIA Response:** {cia_output.response}"))

    return Command(
        # Update the state messages with the CIA response
        update={"messages": cia_output.response},
        # Go to the research nodes if the request is in scope, otherwise end the workflow
        goto=[Send("worker", task) for task in cia_output.research_tasks]
        if cia_output.in_scope
        else END,
    )

### Bloomberg Research Agent (BRA)

The BRA is our research specialist that:
- Receives specific research tasks from the CIA
- Queries the Bloomberg news database
- Synthesizes findings into concise research reports

This node works identically to the RAG from the first notebook.

In [None]:
from langchain_core.messages import HumanMessage, ToolMessage

In [None]:
BRA_PROMPT = """
You are a Research Agent in a financial analysis system. You are tasked with writing a concise research report on a specific topic provided by the Client Interface Agent (CIA) based on available documents.

To do so, you have access to a Bloomberg Financial News database that you can query. You should query the vector store for documents relevant to your task and write a concise summary of the information you find.

Your report should be short and informative, conveying only the most important information from the documents, to allow a Synthesis Agent to quickly generate a report for the client based on the findings of all Research Agents.
"""

# TODO: Create the BRA model from the base model with tool binding
bra_model = ...

In [None]:
# TODO: Define the worker node and the next node options
def worker_node(task: ResearchTask) -> ...:
    """Research agent that can query the vector store for relevant documents."""
    display(Markdown(f"**Researching task**: {task.topic}"))

    # TODO: Access the task topic and description from the research task
    topic = ...
    description = ...

    # Create a string with the research task topic and description
    task_str = f"Research Task: {topic}\n\n Description: {description}"

    # TODO: Message list for the BRA model
    messages = ...

    # TODO: Invoke the BRA model with the messages
    bra_output = ...

    # TODO: If the BRA model made tool calls, invoke the tool
    if bra_output.tool_calls:
        ...
        documents = ...

        display(
            Markdown(
                f"**Retrieved documents**: {[doc.metadata['Headline'] for doc in documents]}"
            )
        )

        # Combine the retrieved documents into a single string
        documents_str = "\n\n".join(
            [f"{doc.metadata['Headline']}\n\n{doc.page_content}\n" for doc in documents]
        )

        # TODO: Message list with the retrieved documents for the base model
        messages = ...

        # TODO: Invoke the base model with the messages
        bra_output = ...

    # TODO: Update the state analyses with the BRA output content and go to the synthesizer node
    # NOTE: To update `analyses` you should return a list
    return ...

### Research Synthesis Agent (RSA)
The RSA is our final processing layer that:
- Collects all research findings
- Cross-references different analyses
- Creates a cohesive final report for the client

It does not make use of tools nor structured outputs and should return an answer to the client based on the analyses in the state.

In [None]:
from langchain_core.messages import AIMessage

In [None]:
RSA_PROMPT = """
You are a Research Synthesis Agent (RSA) in a financial analysis system. You receive the reports from multiple Bloomberg Research Agents, and are tasked with synthesizing the information these reports contain into a final report for the client.

The final report should be based on the information provided by the Research Agents, covering their findings in a clear and concise manner to address the client's request.
"""

In [None]:
# TODO: Define the synthesizer node and the next node options
def synthesizer_node(state: State) -> ...:
    """Synthesize full report from research analyses."""
    display(Markdown(f"**Synthesizing research from {len(state.analyses)} BRAs.**"))

    # TODO: Access the research analyses from the state
    analyses = ...

    # Combine the research analyses into a single string
    complete_analyses = "\n\n---\n\n".join(analyses)

    # TODO: Message list for the RSA model
    messages = ...

    # TODO: Invoke the base model with the messages
    rsa_output = ...

    # TODO: Update the state messages with the RSA output content and end the workflow
    return ...

## Building the Workflow Graph

Now that our nodes and communication flow are defined, we can build the graph!

In [None]:
from IPython.display import Image
from langgraph.graph import StateGraph

In [None]:
# Create a state graph builder
graph_builder = StateGraph(State)

# Define the entry point
graph_builder.set_entry_point("orchestrator")

# TODO: Add the nodes
graph_builder.add_node("orchestrator", orchestrator_node)
...

# The edges are defined by the commands !

# Compile the workflow
app = graph_builder.compile()

Let's visualize our Financial Analyst graph. Note that because the number of `"worker"` nodes is generated dynamically, it shows up as a single node in the image.

In [None]:
display(Image(app.get_graph().draw_mermaid_png()))

## Running the Workflow

Now that our workflow is built, let's test it! Once again, we can run it with `.invoke()`.

In [None]:
request = "I want to invest in the technology sector. Can you please define an investment strategy?"

# Invoke the workflow with the client request
final_state = app.invoke({"messages": request})

In [None]:
Markdown(final_state["messages"][-1].content)

## Practical Tips

- Keep agent roles clearly defined and specialized
- Use structured outputs to ensure reliable communication
- Monitor the workflow graph for potential bottlenecks
- Adjust the number of research tasks based on query complexity

## Conclusion

You've now learned how to build a sophisticated multi-agent system using **LangGraph**! This approach allows for:
- More complex and nuanced analysis
- Better division of responsibilities
- Scalable AI applications

Feel free to experiment with different agent configurations and workflow patterns to suit your specific needs.