# Agents with Human-in-the-Loop

前面的章节中，我们首先使用 langgraph 创建了一个邮件助手（email assistant），该助手有一个分类路由模块（triage router）和一个邮件回复模块（response agent）。紧接着，我们通过精心设计的测试用例对每个单独模块进行了单元测试，也对整个邮件助手进行了端到端测试。

也许测试完成后，email assistant 的效果很好，但是面对邮件处理这样一项如此重要的任务，我们真的敢完全交给 AI 来自动处理吗？这个答案应该是否定的，人应该参与到这个事情的处理进程中来。

下图是整个 Agent 的流程图，接下来我们要实现的功能就是图中高亮的部分 **人类反馈（Human Feedback）** 模块。
![HITL overview ](img/overview_hitl.png)

## Human in the Loop
Human-in-the-loop (HITL）是早些年提出的一个机器学习领域用于人机协同的概念，Google Cloud 提供了关于 HITL 的解释：
> 人机协同 (HITL) 机器学习是一种协作方法，它将人类输入和专业知识融入到机器学习 (ML) 和人工智能系统的生命周期中。人类参与机器学习模型的训练、评估或运行，提供有价值的指导、反馈和注释。通过这种协作，HITL 旨在结合人类和机器的独特优势，提高机器学习系统的准确性、可靠性和适应性。

LangGraph 框架提供了对 Human-in-the-loop 理念的支持，通过中断（Interrupt）和持久化（Persistence）功能，可以让我们在任务执行过程中，适时的中断任务，等待人类的反馈，然后根据人类的反馈来决定下一步的任务执行。


## Adding HITL to out email assistant

接下来，我们将会把 HITL 的能力加入到 agent 中。下图具体展示了需要用户提供反馈的节点：
1. 对于通知类的邮件，在结束任务之前，需要确保用户已经收到通知；
2. 对于需要回复的邮件，需要在发送邮件之前，让用户确认邮件内容。

![HITL 示意图](img/hitl_schematic.png)

In [2]:
from dotenv import load_dotenv
load_dotenv("../.env")
import os
import sys
sys.path.append("..")

### Tools
像之前的所有章节一样，我们需要先定义出所有能用的工具，因为这限定了 agent 的能力范围。

不同的是，这里会比之前的邮件助手多定义一个工具，这个工具可以让 agent 向用户提问。

In [3]:
from typing import Literal
from datetime import datetime
from pydantic import BaseModel

from langchain.chat_models import init_chat_model
from langchain.tools import tool

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt

from src.prompts import (triage_system_prompt, triage_user_prompt,
                         default_triage_instructions,
                         agent_system_prompt_hitl, default_background,
                         default_response_preferences, default_cal_preferences)
from src.tools.default.prompt_templates import HITL_TOOLS_PROMPT
from src.schemas import State, RouterSchema, StateInput
from src.utils import parse_email, format_for_display, format_email_markdown

In [4]:
from rich.markdown import Markdown
Markdown(HITL_TOOLS_PROMPT)

`HITL_TOOLS_PROMPT` 中说明了 agent 需要的5个工具，下面的代码将具体定义他们。

In [4]:
@tool
def write_email(to: str, subject: str, content: str) -> str:
    """Write and send an email."""
    # Placeholder response - in real app would send email
    return f"Email sent to {to} with subject '{subject}' and content: {content}"

@tool
def schedule_meeting(
    attendees: list[str], subject: str, duration_minutes: int, preferred_day: datetime, start_time: int
) -> str:
    """Schedule a calendar meeting."""
    # Placeholder response - in real app would check calendar and schedule
    date_str = preferred_day.strftime("%A, %B %d, %Y")
    return f"Meeting '{subject}' scheduled on {date_str} at {start_time} for {duration_minutes} minutes with {len(attendees)} attendees"

@tool
def check_calendar_availability(day: str) -> str:
    """Check calendar availability for a given day."""
    # Placeholder response - in real app would check actual calendar
    return f"Available times on {day}: 9:00 AM, 2:00 PM, 4:00 PM"

@tool
# This is new! 
class Question(BaseModel):
      """Question to ask user."""
      content: str
    
@tool
class Done(BaseModel):
      """E-mail has been sent."""
      done: bool

tools = [
    write_email,
    schedule_meeting,
    check_calendar_availability,
    Question,
    Done,
]

tools_by_name = {tool.name: tool for tool in tools}

### LLMs
初始化两个 LLMs，一个用于 triage router（通过 `RouterSchema` 格式化了输出），一个用于 response agent（添加了工具调用）。

我们为两个 llm 选择了相同的模型，实际的项目中，可以根据效果、速度、成本等几个因素，选择不同的模型。

In [5]:
model_name = os.getenv("OPENAI_MODEL")
model_provider = os.getenv("MODEL_PROVIDER")

llm = init_chat_model(model_name, model_provider=model_provider, temperature=0.0)
llm_router = llm.with_structured_output(RouterSchema)

llm = init_chat_model(model_name, model_provider=model_provider, temperature=0.0)
llm_with_tools = llm.bind_tools(tools, tool_choice="required", parallel_tool_calls=False)

### Triage Node
邮件分类节点（Triage node）就像我们之前实现的那样，负责分析邮件内容，决定邮件应该如何处理。与之前不同的是，对于分类为 `notify` 的邮件，我们需要中断 graph 的执行，允许用户来查看邮件内容。

所以，在 triage node 之后，会加入一个新的节点 `triage_interrupt_node` 。

In [6]:
def triage_router(state: State) -> Command[Literal["triage_interrupt_handler", "response_agent", "__end__"]]:
    """Analyze email content to decide if we should respond, notify, or ignore."""

    # Parse the email input
    author, to, subject, email_thread = parse_email(state["email_input"])
    user_prompt = triage_user_prompt.format(
        author=author, to=to, subject=subject, email_thread=email_thread
    )

    # Create email markdown for Agent Inbox in case of notification
    email_markdown = format_email_markdown(subject, author, to, email_thread)

    # Format system prompt with backgroud and triage instructions
    system_prompt = triage_system_prompt.format(
    
        background=default_background,
        triage_instructions=default_triage_instructions,
    )

    # Run the router LLM
    result = llm_router.invoke({
        [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
    })

    # Decision
    classification = result.classification

    # Process the classifiction decision
    if classification == "respond":
        print("📧 Classification: RESPOND - This email requires a response")
        # Next node
        goto = "response_agent"
        # Update the state
        update = {
            "classification_decision": result.classification,
            "messages": [
                {
                    "role": "user",
                    "content": f"Respond to the email: \n\n{email_markdown}",
                }
            ],
        }
    elif classification == "ignore":
        print("📧 Classification: IGNORE - This email does not require a response")
        # Next node
        goto = "__end__"
        # Update the state
        update = {
            "classification_decision": result.classification,
            "messages": [
                {
                    "role": "user",
                    "content": f"Ignore the email: \n\n{email_markdown}",
                }
            ],
        }
    elif classification == "notify":
        print("🔔 Classification: NOTIFY - This email contains important information")
        # This is NEW!
        goto = "triage_interrupt_handler"
        # Update the state
        update = {
            "classification_decision": result.classification,
        }
    else:
        raise ValueError(f"Invalid classification: {classification}")
    return Command(goto=goto, update=update)

### Triage Interrupt Handler

如果邮件的分类是 `notify`, 我们就中断 graph ，如下图左边绿色部分所示，中断之后，用户可以根据实际情况选择 `ignore` 或者 `respond`。
![overview-img](img/HITL_flow_triage.png)

为此，graph 中会加入一个新的 node，`triage_interrupt_handler`，它将会：
1. 如果分类的结果是 `notify`, 将会中断，并将一个包含分类结果的 `dict` 展示给用户；
2. 用户可以根据实际的邮件内容做出回应。

为了使用好这些功能，LangChain 团队开发了 Agent Inbox，直译过来是智能体收件箱，用户所有需要与 agent 进行沟通的事情都会在这个收件箱中，方便用户统一追踪任何未完成的操作。

_关于 Agent Inbox 的使用，随着内容的深入，慢慢就会学会。_

In [7]:
def triage_interrupt_handler(state: State) -> Command[Literal["response_agent", "__end__"]]:
    """Handles interrupts from the triage step."""

    # Parse the email input
    author, to, subject, email_thread = parse_email(state["email_input"])

    # Create email markdown for Agent Inbox in case of notification
    email_markdown = format_email_markdown(subject, author, to, email_thread)

    # Create messages
    messages = [{
        "role": "user",
        "content": f"Email to notify user about: {email_markdown}"
    }]

    # Create interrupt that is shown to the user
    request = {
        "action_request": {
            "action": f"Email Assistant: {state['classification_decision']}",
            "args": {}
        },
        "config": {
            "allow_ignore": True,
            "allow_respond": True,
            "allow_edit": False,
            "allow_accept": False,
        },
        # Email to show in the agent inbox
        "description": email_markdown
    }

    responses = interrupt([request])
    
    # Agent Inbox responds with a list of dicts with a single key `type`,
    # which can be `accept`, `edit`, `ignore`, or `response`.
    response = responses[0]

    # If you provide feedback, go to reponse agent and use feedbock to respond to email
    if response["type"] == "reponse":
        # Add feedback to the message
        user_input = response["args"]
        # Used by the response agent
        messages.append({
            "role": "user",
            "content": f"User wants to reply to the email. Use this feedback to respond: {user_input}"
        })
        # Go to response agent
        goto = "response_agent"
    # If you ignore, go to end
    elif response["type"] == "ignore":
        goto = END
    # Catch all other responses
    else:
        raise ValueError(f"Invalid response: {response}")

    # Update the state
    update = {
        "messages": messages,
    }
    return Command(goto=goto, update=update)


上面便是这个新增加的 node 的代码，需要重点解释一下：
- `interrupt()` 函数，通过暂停（pause） graph 的执行，来实现 HITL 的功能；
- 暂停之后，用户需要在客户端执行相应的操作，才能继续 graph 的执行，这里的客户端可以是 Agent Inbox，也可以使用 `Command(resume=xxx)` 模拟的客户端；
- 要使用 `interrupt()` 函数，需要在 graph 的编译时，指定 `checkpointer`，这样才能保存当前的状态，实现暂停和恢复的功能。


### Response Agent with Interrupts

Response agent 也需要加入中断功能，以允许用户在邮件回复生成后提供反馈。

#### LLM call

Response agent 中的 llm call 和第三节中设计的一样。

In [8]:
def llm_call(state: State):
    """LLM decides whether call a tool or not."""

    input_messages = [{
        "role": "system",
        "content": agent_system_prompt_hitl.format(
            tools_prompt=HITL_TOOLS_PROMPT,
            background=default_background,
            response_preferences=default_response_preferences, 
            cal_preferences=default_cal_preferences)
    }] + state["messages"]

    return {"messages": [llm_with_tools.invoke(input_messages)]}

#### Interrupt Handler

`interrupt_handler` 是 response agent 实现 HITL 功能的核心组件。

它的作用是检查 LLM 想要执行的工具调用，以及确定哪些工具执行前需要用户确认。具体工作内容如下：
1. Tool Selection: `interrupt_handler` 维护了一份需要用户批准才能执行的 "HITL Tools"：
    - `write_email`: 发送邮件是一件很重要的事
    - `schedule_meeting`: 安排会议会影响用户日程
    - `Question`: 向用户提问需要与用户直接交互（direct interaction）
2. Direct Execution: 不在 HITL 列表里的工具（比如 `check_calendar_availability`）可以直接执行，不需要用户确认；
3. Context Preparation: 对于那些需要用户确认的工具，`interrupt_handler` 会：
    - 将原始的邮件内容作为上下文
    - 格式化好工具调用的内容方便用户查阅
    - 针对每种工具类型，配置好哪些交互方式是允许的
4. Interrupt Creation: 根据上述信息，创建中断请求：
    - 请求操作的名称和参数
    - 配置好的允许的交互类型
    - 一份包含原始邮件和建议工具调用内容的详细描述
5. Response Processing: 中断结束之后，`iterrupt_handler` 处理用户的回应：
    - Accept: 用原始的参数执行工具调用；
    - Edit: 用修改后的参数执行工具调用；
    - Ignore: 取消工具调用；
    - Response: 记录下当前行为，并且不进行工具调用。

`interrupt_handler` 保证用户对所用重要操作都能参与其中，同时，对于一些不重要的操作，也能自动执行。

为了能精确控制 agent 的行为，用户也可以修改 HITL 工具的参数，比如邮件的内容。

整个流程如下图所示：

![邮件助手 agent 流程图](img/HITL_flow.png)

In [None]:
def interrupt_handler(state: State) -> Command[Literal["llm_call", "__end__"]]:
    """Creates an interrupt fro human review of tool calls"""

    # Store messages
    result = []

    # Go to the LLM call node next
    goto = "llm_call"

    # Allowed tools for HITL
    hitl_tools = ["write_email", "schedule_meeting", "Question"]

    # Iterate over the tool calls in the last message
    for tool_call in state["messages"][-1].tool_calls:

        # If tool is not in our HITL list, execute it directly without interruption
        if tool_call["name"] not in hitl_tools:

            # Execute tool without iterruption
            tool = tools_by_name[tool_call["name"]]
            observation = tool.invoke(tool_call["args"])
