# 06. Advanced Applications and Multi-Agent Systems

# 0. 安装依赖

In [None]:
%uv pip install langchain~=0.3 langchain-core~=0.3 langchain-community~=0.3 langchain-openai~=0.3 langgraph~=0.6

In [None]:
%uv pip install python-dotenv~=1.1

In [None]:
%uv pip install arxiv~=2.2 ddgs~=9.6 wikipedia~=1.4

In [None]:
import os

import dotenv
from langchain_openai import ChatOpenAI


class Config:
    def __init__(self):
        # By default, load_dotenv doesn't override existing environment variables and looks for a .env file in same directory as python script or searches for it incrementally higher up.
        dotenv_path = dotenv.find_dotenv(usecwd=True)
        if not dotenv_path:
            raise ValueError("No .env file found")
        dotenv.load_dotenv(dotenv_path=dotenv_path)

        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OPENAI_API_KEY is not set")

        base_url = os.getenv("OPENAI_API_BASE_URL")
        if not base_url:
            raise ValueError("OPENAI_API_BASE_URL is not set")

        model = os.getenv("OPENAI_MODEL")
        if not model:
            raise ValueError("OPENAI_MODEL is not set")

        self.api_key = api_key
        self.base_url = base_url
        self.model = model

    def new_openai_like(self, **kwargs) -> ChatOpenAI:
        # 参考：https://bailian.console.aliyun.com/?tab=api#/api/?type=model&url=2587654
        # 参考：https://help.aliyun.com/zh/model-studio/models
        # ChatOpenAI 文档参考：https://python.langchain.com/api_reference/openai/chat_models/langchain_openai.chat_models.base.ChatOpenAI.html#langchain_openai.chat_models.base.ChatOpenAI
        return ChatOpenAI(
            api_key=self.api_key, base_url=self.base_url, model=self.model, **kwargs
        )

## Agentic architectures
### Agentic RAG

## Multi-agent architectures

### Agent roles and specialization
### Consensus mechanism

### Communication protocols
#### Semantic router
#### Organizing interactions

In [None]:
%uv pip install datasets~=4.2

In [None]:
from datasets import load_dataset


ds = load_dataset("cais/mmlu", "high_school_geography")

ds_dict = ds["test"].take(100).to_dict()

ds_dict["question"][0]

In [None]:
ds_dict["choices"][0]

In [None]:
from langchain.agents import load_tools
from langgraph.prebuilt import create_react_agent


llm = Config().new_openai_like()

research_tools = load_tools(tool_names=["ddg-search", "arxiv", "wikipedia"], llm=llm)

system_prompt = (
    "You're a hard-working, curious and creative student. "
    "You're working on exam question. Think step by step."
    "Always provide an argumentation for your answer. "
    "Do not assume anything, use available tools to search "
    "for evidence and supporting statements."
)

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt.chat_agent_executor import AgentState


raw_prompt_template = (
    "Answer the following multiple-choice question. "
    "\nQUESTION:\n{question}\n\nANSWER OPTIONS:\n{options}\n"
)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("user", raw_prompt_template),
        ("placeholder", "{messages}"),
    ]
)


class ResearchState(AgentState):
    question: str
    options: str


research_agent = create_react_agent(
    model=llm, tools=research_tools, state_schema=ResearchState, prompt=prompt
)

In [None]:
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field


# 'Output in JSON format' 的必要性参见
# - Qwen 的 https://help.aliyun.com/zh/model-studio/json-mode?spm=0.0.0.i2#6f7bb9cd64o7o
# - https://platform.openai.com/docs/guides/structured-outputs/function-calling-vs-response-format#supported-schemas
reflection_prompt = (
    "You are a university professor and you're supervising a student who is "
    "working on multiple-choice exam question. "
    "nQUESTION: {question}.\nANSWER OPTIONS:\n{options}\n."
    "STUDENT'S ANSWER:\n{answer}\n"
    "Reflect on the answer and provide a feedback whether the answer "
    "is right or wrong. If you think the final answer is correct, reply with "
    "the final answer. Only provide critique if you think the answer might "
    "be incorrect or there are reasoning flaws. Do not assume anything, "
    "evaluate only the reasoning the student provided and whether there is "
    "enough evidence for their answer. "
    "Output in JSON format, where the correct answer is put in the 'answer' field, "
    "and critique is put in the 'critique' field."
)


class Response(BaseModel):
    """A final response to the user."""

    answer: str | None = Field(
        description="The final answer. It should be empty if critique has been provided.",
        default=None,
    )
    critique: str | None = Field(
        description="A critique of the initial answer. If you think it might be incorrect, provide an actionable feedback",
        default=None,
    )


reflection_chain = PromptTemplate.from_template(
    reflection_prompt
) | llm.with_structured_output(Response)

In [None]:
raw_prompt_template_with_critique = (
    "You tried to answer the exam question and you get feedback from your "
    "professor. Work on improving your answer and incorporating the feedback. "
    "\nQUESTION:\n{question}\n\nANSWER OPTIONS:\n{options}\n\n"
    "INITIAL ANSWER:\n{answer}\n\nFEEDBACK:\n{feedback}"
)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("user", raw_prompt_template_with_critique),
        ("placeholder", "{messages}"),
    ]
)


class ReflectionState(ResearchState):
    answer: str
    feedback: str


research_agent_with_critique = create_react_agent(
    model=llm, tools=research_tools, state_schema=ReflectionState, prompt=prompt
)

In [None]:
from typing import Annotated, Literal, TypedDict
from langchain_core.runnables.config import RunnableConfig
from operator import add
from langgraph.graph import StateGraph, START, END


class ReflectionAgentState(TypedDict):
    question: str
    options: str
    answer: str
    steps: Annotated[int, add]
    response: Response


def _should_end(
    state: ReflectionAgentState, config: RunnableConfig
) -> Literal["research", END]:
    max_reasoning_steps = config["configurable"].get("max_reasoning_steps", 10)
    if state.get("response") and state["response"].answer:
        return END
    if state.get("steps", 1) > max_reasoning_steps:
        return END
    return "research"


reflection_chain = PromptTemplate.from_template(
    reflection_prompt
) | llm.with_structured_output(Response)


def _reflection_step(state):
    result = reflection_chain.invoke(state)
    return {"response": result, "steps": 1}


def _research_start(state):
    answer = research_agent.invoke(state)
    return {"answer": answer["messages"][-1].content}


def _research(state):
    agent_state = {
        "answer": state["answer"],
        "question": state["question"],
        "options": state["options"],
        "feedback": state["response"].critique,
    }
    answer = research_agent_with_critique.invoke(agent_state)
    return {"answer": answer["messages"][-1].content}

In [None]:
builder = StateGraph(ReflectionAgentState)
builder.add_node("research_start", _research_start)
builder.add_node("research", _research)
builder.add_node("reflect", _reflection_step)

builder.add_edge(START, "research_start")
builder.add_edge("research_start", "reflect")
builder.add_edge("research", "reflect")
builder.add_conditional_edges("reflect", _should_end)
graph = builder.compile()

In [None]:
from IPython.display import Image, display


display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
i = 0
question = ds_dict["question"][i]
options = "\n".join([f"{i}. {a}" for i, a in enumerate(ds_dict["choices"][i])])

In [None]:
# TODO: 解决无法终止的问题
async for _, event in graph.astream(
    {"question": question, "options": options}, stream_mode=["updates"]
):
    print(event)

### LangGraph streaming

In [None]:
async for _, event in research_agent.astream(
    {"question": question, "options": options}, stream_mode=["values"]
):
    print(len(event["messages"]))

In [None]:
async for _, event in research_agent.astream(
    {"question": question, "options": options}, stream_mode=["updates"]
):
    node = list(event.keys())[0]
    print(node, len(event[node].get("messages", [])))

In [None]:
seen_events = set([])
async for event in research_agent.astream_events(
    {"question": question, "options": options}, version="v1"
):
    if event["event"] not in seen_events:
        seen_events.add(event["event"])

print(seen_events)

### Handoffs
#### Communication via a shared messages list

In [None]:
from langchain.agents import load_tools
from langgraph.prebuilt import create_react_agent


research_tools = load_tools(tool_names=["ddg-search", "arxiv", "wikipedia"], llm=llm)

system_prompt = (
    "You're a hard-working, curious and creative student. "
    "You're working on exam question. Think step by step."
    "Always provide an argumentation for your answer. "
    "Do not assume anything, use available tools to search "
    "for evidence and supporting statements."
)

research_agent = create_react_agent(
    model=llm, tools=research_tools, prompt=system_prompt
)

In [None]:
reflection_prompt = (
    "You are a university professor and you're supervising a student who is "
    "working on multiple-choice exam question. "
    "Given the dialogue above, reflect on the answer provided and give a feedback "
    " if needed. If you think the final answer is correct, reply with "
    "an empty message. Only provide critique if you think the last answer might "
    "be incorrect or there are reasoning flaws. Do not assume anything, "
    "evaluate only the reasoning the student provided and whether there is "
    "enough evidence for their answer."
)

In [None]:
from langchain_core.prompts import PromptTemplate
from langgraph.types import Command
from langchain_core.runnables import RunnableConfig


question_template = PromptTemplate.from_template(
    "QUESTION:\n{question}\n\nANSWER OPTIONS:\n{options}\n\n"
)


def _ask_question(state):
    return {"messages": [("human", question_template.invoke(state).text)]}


def _give_feedback(state, config: RunnableConfig):
    messages = event["messages"] + [("human", reflection_prompt)]
    max_messages = config["configurable"].get("max_messages", 20)

    if len(messages) > max_messages:
        return Command(update={}, goto=END)

    result = llm.invoke(messages)

    if result.content:
        return Command(
            update={
                "messages": [
                    ("assistant", result.content),
                    ("human", "Please, address the feedback above and give an answer."),
                ]
            },
            goto="research",
        )
    return Command(update={}, goto=END)

In [None]:
from langgraph.graph import StateGraph, START, MessagesState


class ReflectionAgentState(MessagesState):
    question: str
    options: str


builder = StateGraph(ReflectionAgentState)
builder.add_node("ask_question", _ask_question)
builder.add_node("research", research_agent)
builder.add_node("reflect", _give_feedback)

builder.add_edge(START, "ask_question")
builder.add_edge("ask_question", "research")
builder.add_edge("research", "reflect")
graph = builder.compile()

In [None]:
from IPython.display import Image, display


display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
async for _, event in graph.astream(
    {"question": question, "options": options}, stream_mode=["values"]
):
    print(len(event["messages"]))

## LangGraph platform
## Building adaptive systems
### Dynamic behavior adjustment
### Human-in-the-loop

In [None]:
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver


class State(MessagesState):
    home_address: str | None


def _human_input(state: State):
    address = interrupt("What is your address?")
    return {"home_address": address}


builder = StateGraph(State)
builder.add_node("human_input", _human_input)
builder.add_edge(START, "human_input")

checkpointer = MemorySaver()

graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream({"messages": [("human", "What is weather today?")]}, config):
    print(chunk)

In [None]:
from langgraph.types import Command


for chunk in graph.stream(Command(resume="Munich"), config):
    print(chunk)

## Exploring reasoning paths
### Tree of Thoughts

In [None]:
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser


class Plan(BaseModel):
    """Plan to follow in future"""

    steps: list[str] = Field(
        description="different steps to follow, should be in sorted order"
    )


parser = PydanticOutputParser(pydantic_object=Plan)

system_prompt_template = (
    "For the given task, come up with a step by step plan.\n"
    "This plan should involve individual tasks, that if executed correctly will "
    "yield the correct answer. Do not add any superfluous steps.\n"
    "The result of the final step should be the final answer. Make sure that each "
    "step has all the information needed - do not skip steps.\n"
    "Output in JSON format described as follows.\n"
)
# 注意 system 消息中的插值变量 `formatting_instructions` 需要和 partial 函数的入参完全匹配。
planner_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt_template + "{formatting_instructions}"),
        ("user", "Prepare a plan how to solve the following task:\n{task}\n"),
    ]
).partial(formatting_instructions=parser.get_format_instructions())


llm = Config().new_openai_like()

planner = planner_prompt | llm.with_structured_output(Plan)

In [None]:
from langchain.agents import load_tools
from langgraph.prebuilt import create_react_agent

llm = Config().new_openai_like()

tools = load_tools(tool_names=["ddg-search", "arxiv", "wikipedia"], llm=llm)

system_prompt = (
    "You're a smart assistant that carefully helps to solve complex tasks.\n"
    " Given a general plan to solve a task and a specific step, work on this step. "
    " Don't assume anything, keep in minds things might change and always try to "
    "use tools to double-check yourself.\nUse Search to gather "
    "information about common facts, fresh events and news, use Arxiv to get "
    "ideas on recent research and use Wikipedia for common knowledge."
)

step_template = (
    "Given the task and the plan, try to execute on a specific step of the plan.\n"
    "TASK:\n{task}\n\nPLAN:\n{previous_steps}\n\nSTEP TO EXECUTE:\n{step}\n"
)

prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("user", step_template),
    ]
)

execution_agent = prompt_template | create_react_agent(model=llm, tools=tools)

In [None]:
from langchain_core.prompts import ChatPromptTemplate


class ReplanStep(BaseModel):
    """Replanned next step in the plan."""

    steps: list[str] = Field(description="different options of the proposed next step")


llm_replanner = llm.with_structured_output(ReplanStep)

parser = PydanticOutputParser(pydantic_object=Plan)

replanner_prompt_template = (
    "Suggest next action in the plan. Do not add any superfluous steps.\n"
    "If you think no actions are needed, just return an empty list of steps. "
    "TASK: {task}\n PREVIOUS STEPS WITH OUTPUTS: {current_plan}\n"
    "Output in JSON format described as follows.\n"
)
# 参考 https://zhuanlan.zhihu.com/p/1901678624639255242
replanner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You're a helpful assistant. You goal is to help with planning actions to solve the task. Do not solve the task itself."
            "{formatting_instructions}",
        ),
        ("user", replanner_prompt_template),
    ]
).partial(formatting_instructions=parser.get_format_instructions())

replanner = replanner_prompt | llm_replanner

In [None]:
from typing import Optional


class TreeNode:

    def __init__(
        self,
        node_id: int,
        step: str,
        step_output: str | None = None,
        parent: Optional["TreeNode"] = None,
    ):
        self.node_id = node_id
        self.step = step
        self.step_output = step_output
        self.parent = parent
        self.children = []
        self.final_response = None

    def __repr__(self):
        parent_id = self.parent.node_id if self.parent else "None"
        return f"Node_id: {self.node_id}, parent: {parent_id}, {len(self.children)} children."

    def get_full_plan(self) -> str:
        """Returns formatted plan with step numbers and past results."""
        steps = []
        node = self
        while node.parent:
            steps.append((node.step, node.step_output))
            node = node.parent

        full_plan = []
        for i, (step, result) in enumerate(steps[::-1]):
            if result:
                full_plan.append(f"# {i+1}. Planned step: {step}\nResult: {result}\n")
        return "\n".join(full_plan)

In [None]:
import operator
from collections import deque
from typing import Annotated, TypedDict


class PlanState(TypedDict):
    task: str
    root: TreeNode
    queue: deque[TreeNode]
    current_node: TreeNode
    next_node: TreeNode
    is_current_node_final: bool
    paths_explored: Annotated[int, operator.add]
    visited_ids: set[int]
    max_id: int
    candidates: Annotated[list[str], operator.add]
    best_candidate: str

In [None]:
from langchain_core.runnables import RunnableConfig
from langchain_core.output_parsers import StrOutputParser
from langgraph.types import Command

final_prompt = PromptTemplate.from_template(
    "You're a helpful assistant that has executed on a plan."
    "Given the results of the execution, prepare the final response.\n"
    "Don't assume anything\nTASK:\n{task}\n\nPLAN WITH RESUlTS:\n{plan}\n"
    "FINAL RESPONSE:\n"
)

responder = final_prompt | llm | StrOutputParser()


async def _run_node(state: PlanState, config: RunnableConfig):
    node = state.get("next_node")
    visited_ids = state.get("visited_ids", set())
    queue = state["queue"]
    if node is None:
        while queue and not node:
            node = state["queue"].popleft()
            if node.node_id in visited_ids:
                node = None
        if not node:
            return Command(goto="vote", update={})

    step = await execution_agent.ainvoke(
        {
            "previous_steps": node.get_full_plan(),
            "step": node.step,
            "task": state["task"],
        }
    )
    node.step_output = step["messages"][-1].content
    visited_ids.add(node.node_id)
    return {
        "current_node": node,
        "queue": queue,
        "visited_ids": visited_ids,
        "next_node": None,
    }


async def _plan_next(state: PlanState, config: RunnableConfig) -> PlanState:
    max_candidates = config["configurable"].get("max_candidates", 1)
    node = state["current_node"]
    next_step = await replanner.ainvoke(
        {"task": state["task"], "current_plan": node.get_full_plan()}
    )
    if not next_step.steps:
        return {"is_current_node_final": True}
    max_id = state["max_id"]
    for step in next_step.steps[:max_candidates]:
        child = TreeNode(node_id=max_id + 1, step=step, parent=node)
        max_id += 1
        node.children.append(child)
        state["queue"].append(child)
    return {"is_current_node_final": False, "next_node": child, "max_id": max_id}

In [None]:
from langchain_core.prompts import PromptTemplate


prompt_voting = PromptTemplate.from_template(
    "Pick the best solution for a given task. "
    "\nTASK:{task}\n\nSOLUTIONS:\n{candidates}\n"
    "Output 1-based index of the best solution.\n"
)


def _vote_for_the_best_option(state):
    candidates = state.get("candidates", [])
    if not candidates:
        return {"best_response": None}
    all_candidates = []
    for i, candidate in enumerate(candidates):
        all_candidates.append(f"OPTION {i+1}: {candidate}")

    llm_enum = Config().new_openai_like()

    result = (prompt_voting | llm_enum | StrOutputParser()).invoke(
        {"candidates": "\n".join(all_candidates), "task": state["task"]}
    )
    return {"best_candidate": candidates[int(result) - 1]}

In [None]:
_vote_for_the_best_option({"candidates": ["1", "5", "4"], "task": "How much is 2+2?"})

In [None]:
from langchain_core.output_parsers import StrOutputParser

final_prompt = PromptTemplate.from_template(
    "You're a helpful assistant that has executed on a plan."
    "Given the results of the execution, prepare the final response.\n"
    "Don't assume anything\nTASK:\n{task}\n\nPLAN WITH RESUlTS:\n{plan}\n"
    "FINAL RESPONSE:\n"
)

responder = final_prompt | llm | StrOutputParser()


async def _build_initial_plan(state: PlanState) -> PlanState:
    plan = await planner.ainvoke(state["task"])
    queue = deque()
    root = TreeNode(step=plan.steps[0], node_id=1)
    queue.append(root)
    current_root = root
    for i, step in enumerate(plan.steps[1:]):
        child = TreeNode(node_id=i + 2, step=step, parent=current_root)
        current_root.children.append(child)
        queue.append(child)
        current_root = child
    return {"root": root, "queue": queue, "max_id": i + 2}


async def _get_final_response(state: PlanState) -> PlanState:
    node = state["current_node"]
    final_response = await responder.ainvoke(
        {"task": state["task"], "plan": node.get_full_plan()}
    )
    node.final_response = final_response
    return {"paths_explored": 1, "candidates": [final_response]}


def _should_create_final_response(
    state: PlanState,
) -> Literal["run", "generate_response"]:
    return "generate_response" if state["is_current_node_final"] else "run"


def _should_continue(
    state: PlanState, config: RunnableConfig
) -> Literal["run", "vote"]:
    max_paths = config["configurable"].get("max_paths", 30)
    if state.get("paths_explored", 1) >= max_paths:
        return "vote"
    if state["queue"] or state.get("next_node"):
        return "run"
    return "vote"

In [None]:
builder = StateGraph(PlanState)
builder.add_node("initial_plan", _build_initial_plan)
builder.add_node("run", _run_node)
builder.add_node("plan_next", _plan_next)
builder.add_node("generate_response", _get_final_response)
builder.add_node("vote", _vote_for_the_best_option)

builder.add_edge(START, "initial_plan")
builder.add_edge("initial_plan", "run")
builder.add_edge("run", "plan_next")
builder.add_conditional_edges("plan_next", _should_create_final_response)
builder.add_conditional_edges("generate_response", _should_continue)
builder.add_edge("vote", END)

graph = builder.compile()

In [None]:
from IPython.display import Image, display


display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
task = "Write a strategic one-pager of building an AI startup"

# TODO: 解决执行时间过久的问题
result = await graph.ainvoke(
    {"task": task}, config={"recursion_limit": 10000, "configurable": {"max_paths": 10}}
)

In [None]:
print(len(result["candidates"]))

In [None]:
print(result["best_candidate"])

In [None]:
# 监控执行过程的脚本
async for e in graph.astream(
    {"task": task}, config={"recursion_limit": 10000, "configurable": {"max_paths": 10}}
):
    print(e)

### Trimming ToT with MCTS

## Agent memory
### Cache


In [None]:
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache


cache = InMemoryCache()
set_llm_cache(cache)

llm = Config().new_openai_like()

llm.invoke("What is the capital of UK?")

In [None]:
import langchain


print(langchain.llm_cache._cache)

In [None]:
llm._get_llm_string()

### Store

In [None]:
from langgraph.store.memory import InMemoryStore

in_memory_store = InMemoryStore()

in_memory_store.put(
    namespace=("users", "user1"), key="fact1", value={"message1": "My name is John."}
)
in_memory_store.put(
    namespace=("users", "user1", "conv1"),
    key="address",
    value={"message": "I live in Berlin."},
)

In [None]:
in_memory_store.get(namespace=("users", "user1", "conv1"), key="address")

In [None]:
in_memory_store.get(namespace=("users", "user1"), key="conv1")

In [None]:
in_memory_store.search(("users", "user1", "conv1"), query="name")

In [None]:
in_memory_store.search(("users", "user1"), query="name")