# 中间件：人类参与
<img src="./assets/LC_HITL.png" width="300">



## 设置

In [2]:
from dotenv import load_dotenv
from env_utils import doublecheck_env

# 从 .env 加载环境变量
load_dotenv()

# 检查并打印结果
doublecheck_env(".env")


DASHSCOPE_API_KEY=****0fbe
DASHSCOPE_BASE_URL=****e/v1
LANGSMITH_API_KEY=****5ced
LANGSMITH_TRACING=true
LANGSMITH_PROJECT=****ials


In [3]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///Chinook.db")

In [4]:
from dataclasses import dataclass

@dataclass
class RuntimeContext:
    db: SQLDatabase

In [5]:
from langchain_core.tools import tool
from langgraph.runtime import get_runtime


@tool
def execute_sql(query: str) -> str:
    """Execute a SQLite command and return results."""
    runtime = get_runtime(RuntimeContext)
    db = runtime.context.db
    
    try:
        return db.run(query)
    except Exception as e:
        return f"Error: {e}"

In [6]:
SYSTEM_PROMPT = """You are a careful SQLite analyst.

Rules:
- Think step-by-step.
- When you need data, call the tool `execute_sql` with ONE SELECT query.
- Read-only only; no INSERT/UPDATE/DELETE/ALTER/DROP/CREATE/REPLACE/TRUNCATE.
- Limit to 5 rows unless the user explicitly asks otherwise.
- If the tool returns 'Error:', revise the SQL and try again.
- Prefer explicit column lists; avoid SELECT *.
- If the database is offline, ask user to try again later without further comment.
"""

In [7]:
from langchain_qwq import ChatQwen
import os
llm=ChatQwen(
    model="qwen3-max",
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    api_key=os.getenv("DASHSCOPE_API_KEY")
)

## 中间件

In [8]:
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver

agent = create_agent(
    model=llm,
    tools=[execute_sql],
    system_prompt=SYSTEM_PROMPT,
    checkpointer=InMemorySaver(),
    context_schema=RuntimeContext,
    middleware=[
        HumanInTheLoopMiddleware(
        interrupt_on={
        "execute_sql": {#工具的名称。当agent尝试调用这个工具时，中间件会拦截
            "allowed_decisions": ["approve", "reject"]#这个工具允许的人工决定类型
        }
    }
),
    ],
)

In [15]:
from langgraph.types import Command

question = "What are the names of all the employees?"
#所有员工的姓名是什么？
config = {"configurable": {"thread_id": "2"}}

result = agent.invoke(
    {"messages": [{"role": "user", "content": question}]},
    config=config,
    context=RuntimeContext(db=db)
)

if "__interrupt__" in result:# 检查执行结果中是否存在中断
    # 获取最后一个中断对象的值
    # result['__interrupt__'] 是一个列表，包含所有中断
     # ['action_requests'] 获取需要审核的操作列表
    description = result['__interrupt__'][-1].value['action_requests'][-1]['description']
    print(f"\033[1;3;31m{80 * '-'}\033[0m")
    print(
        f"\033[1;3;31m Interrupt:{description}\033[0m"
    )
    print(result['__interrupt__'][-1])
    # 调用agent继续执行，传入人工的审核决定
    result = agent.invoke(
        Command(# 使用Command对象恢复暂停的对话
            resume={# resume字段包含人工的决定列表
                "decisions": [
                    {
                        # 决定类型：拒绝该操作
                        "type": "reject",
                        # 拒绝的原因（会作为反馈发给agent）
                        "message": "the database is offline."#数据库已离线。
                    }
                ]
            }
        ),
        config=config,  # 使用相同的线程 ID 以恢复暂停的对话
        context=RuntimeContext(db=db),
    )
    print(f"\033[1;3;31m{80 * '-'}\033[0m")

print(result["messages"][-1].content)

[1;3;31m--------------------------------------------------------------------------------[0m
[1;3;31m Interrupt:Tool execution requires approval

Tool: execute_sql
Args: {'query': 'SELECT name FROM employees;'}[0m
Interrupt(value={'action_requests': [{'name': 'execute_sql', 'args': {'query': 'SELECT name FROM employees;'}, 'description': "Tool execution requires approval\n\nTool: execute_sql\nArgs: {'query': 'SELECT name FROM employees;'}"}], 'review_configs': [{'action_name': 'execute_sql', 'allowed_decisions': ['approve', 'reject']}]}, id='10ad54401ce241f0868c534d450ef78f')
[1;3;31m--------------------------------------------------------------------------------[0m
The database is currently offline. Please try again later.


KeyError: '__interrupt__'

In [16]:
config = {"configurable": {"thread_id": "3"}}

result = agent.invoke(
    {"messages": [{"role": "user", "content": question}]},
    config=config,
    context=RuntimeContext(db=db)
)

while "__interrupt__" in result:
    description = result['__interrupt__'][-1].value['action_requests'][-1]['description']
    print(f"\033[1;3;31m{80 * '-'}\033[0m")
    print(
        f"\033[1;3;31m Interrupt:{description}\033[0m"
    )
    
    result = agent.invoke(
        Command(
            resume={"decisions": [{"type": "approve"}]}
        ),
        config=config,  # 使用相同的线程 ID 以恢复暂停的对话
        context=RuntimeContext(db=db),
    )

for msg in result["messages"]:
    msg.pretty_print()

[1;3;31m--------------------------------------------------------------------------------[0m
[1;3;31m Interrupt:Tool execution requires approval

Tool: execute_sql
Args: {'query': 'SELECT name FROM employees;'}[0m
[1;3;31m--------------------------------------------------------------------------------[0m
[1;3;31m Interrupt:Tool execution requires approval

Tool: execute_sql
Args: {'query': "SELECT name FROM sqlite_master WHERE type='table';"}[0m
[1;3;31m--------------------------------------------------------------------------------[0m
[1;3;31m Interrupt:Tool execution requires approval

Tool: execute_sql
Args: {'query': 'SELECT FirstName, LastName FROM Employee;'}[0m

What are the names of all the employees?
Tool Calls:
  execute_sql (call_a561d0d7678843ab8dc44a9d)
 Call ID: call_a561d0d7678843ab8dc44a9d
  Args:
    query: SELECT name FROM employees;
Name: execute_sql

Error: (sqlite3.OperationalError) no such table: employees
[SQL: SELECT name FROM employees;]
(Background 