# 使用 Microsoft Agent Framework 实现人工干预工作流

## 🎯 学习目标

在本笔记中，您将学习如何使用 Microsoft Agent Framework 的 `RequestInfoExecutor` 实现**人工干预**工作流。这种强大的模式允许您在 AI 工作流中暂停以获取人工输入，使代理更加互动，并让人类掌控关键决策。

## 🔄 什么是人工干预？

**人工干预 (HITL)** 是一种设计模式，AI 代理在继续执行之前会暂停以请求人工输入。这种模式对于以下场景至关重要：

- ✅ **关键决策** - 在采取重要行动之前获得人工批准
- ✅ **模糊情况** - 当 AI 不确定时让人类进行澄清
- ✅ **用户偏好** - 让用户在多个选项中进行选择
- ✅ **合规与安全** - 确保受监管操作有人工监督
- ✅ **互动体验** - 构建能够响应用户输入的对话式代理

## 🏗️ 在 Microsoft Agent Framework 中如何实现

框架提供了三个关键组件来支持 HITL：

1. **`RequestInfoExecutor`** - 一种特殊的执行器，可暂停工作流并发出 `RequestInfoEvent`
2. **`RequestInfoMessage`** - 用于发送给人类的类型化请求负载的基类
3. **`RequestResponse`** - 使用 `request_id` 将人工响应与原始请求关联起来

**工作流模式：**
```
Agent detects need for input
    ↓
Sends message to RequestInfoExecutor
    ↓
Workflow pauses & emits RequestInfoEvent
    ↓
Application collects human input (console, UI, etc.)
    ↓
Application sends RequestResponse via send_responses_streaming()
    ↓
Workflow resumes with human input
```

## 🏨 示例：用户确认的酒店预订

我们将在条件工作流的基础上添加人工确认功能，**在**建议替代目的地之前进行确认：

1. 用户请求一个目的地（例如，“巴黎”）
2. `availability_agent` 检查是否有房间可用
3. **如果没有房间** → `confirmation_agent` 询问“您想查看替代选项吗？”
4. 使用 `RequestInfoExecutor` 暂停工作流
5. **人工通过控制台输入**“是”或“否”进行响应
6. `decision_manager` 根据响应进行路由：
   - **是** → 显示替代目的地
   - **否** → 取消预订请求
7. 显示最终结果

这展示了如何让用户掌控代理的建议！

---

让我们开始吧！ 🚀


## 第一步：导入所需的库

我们导入标准的代理框架组件以及**人机交互特定的类**：
- `RequestInfoExecutor` - 用于暂停工作流以等待人工输入的执行器
- `RequestInfoEvent` - 在请求人工输入时触发的事件
- `RequestInfoMessage` - 用于类型化请求负载的基类
- `RequestResponse` - 将人工响应与请求相关联
- `WorkflowOutputEvent` - 用于检测工作流输出的事件


In [21]:
import asyncio
import json
import os
from dataclasses import dataclass
from typing import Annotated, Any, Never

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,          # NEW: Event when human input is requested
    RequestInfoExecutor,       # NEW: Executor that gathers human input
    RequestInfoMessage,        # NEW: Base class for request payloads
    RequestResponse,           # NEW: Correlates response with request
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,       # NEW: Event for workflow outputs
    WorkflowRunState,          # NEW: Enum of workflow run states
    WorkflowStatusEvent,       # NEW: Event for run state changes
    ai_function,
    executor,
    handler,                   # NEW: Decorator for executor methods
)

# 🤖 GitHub Models or OpenAI client integration
from agent_framework.openai import OpenAIChatClient
from dotenv import load_dotenv
from IPython.display import HTML, display
from pydantic import BaseModel

print("✅ All imports successful!")
print("🔄 Human-in-the-loop components loaded: RequestInfoExecutor, RequestInfoEvent, RequestResponse")

✅ All imports successful!
🔄 Human-in-the-loop components loaded: RequestInfoExecutor, RequestInfoEvent, RequestResponse


## 第2步：定义 Pydantic 模型以生成结构化输出

这些模型定义了代理将返回的**模式**。我们保留条件工作流中的所有模型，并新增以下内容：

**新增内容：用于人工参与：**
- `HumanFeedbackRequest` - `RequestInfoMessage` 的子类，用于定义发送给人工的请求负载
  - 包含 `prompt`（要询问的问题）和 `destination`（关于不可用城市的上下文）


In [22]:
# Existing models from conditional workflow
class BookingCheckResult(BaseModel):
    """Result from checking hotel availability at a destination."""
    destination: str
    has_availability: bool
    message: str


class AlternativeResult(BaseModel):
    """Suggested alternative destination when no rooms available."""
    alternative_destination: str
    reason: str


class BookingConfirmation(BaseModel):
    """Booking suggestion when rooms are available."""
    destination: str
    action: str
    message: str


# NEW: Pydantic model for agent's response format
class ConfirmationQuestion(BaseModel):
    """
    Pydantic model used by confirmation_agent's response_format.
    This is what the agent will output as JSON.
    """
    question: str  # The question to ask the user
    destination: str  # The unavailable destination for context


# NEW: Dataclass for RequestInfoExecutor
@dataclass
class HumanFeedbackRequest(RequestInfoMessage):
    """
    Request sent to RequestInfoExecutor asking if user wants alternatives.
    
    MUST be a dataclass subclassing RequestInfoMessage for type compatibility.
    This is what gets sent to the RequestInfoExecutor.
    """
    prompt: str = ""  # The question to ask the user
    destination: str = ""  # The unavailable destination for context


print("✅ Pydantic models defined:")
print("   - BookingCheckResult (availability check)")
print("   - AlternativeResult (alternative suggestion)")
print("   - BookingConfirmation (booking confirmation)")
print("   - ConfirmationQuestion (agent response format) 🆕")
print("   - HumanFeedbackRequest (RequestInfoMessage for HITL) 🆕")

✅ Pydantic models defined:
   - BookingCheckResult (availability check)
   - AlternativeResult (alternative suggestion)
   - BookingConfirmation (booking confirmation)
   - ConfirmationQuestion (agent response format) 🆕
   - HumanFeedbackRequest (RequestInfoMessage for HITL) 🆕


## 第三步：创建酒店预订工具

与条件工作流中的工具相同——检查目的地是否有空房。


In [23]:
@ai_function(description="Check hotel room availability for a destination city")
def hotel_booking(destination: Annotated[str, "The destination city to check for hotel rooms"]) -> str:
    """
    Simulates checking hotel room availability.
    
    Returns JSON string with availability status.
    """
    display(
        HTML(f"""
        <div style='padding: 15px; background: #e3f2fd; border-left: 4px solid #2196f3; border-radius: 4px; margin: 10px 0;'>
            <strong>🔍 Tool Invoked:</strong> hotel_booking("{destination}")
        </div>
    """)
    )

    # Simulate availability check
    cities_with_rooms = ["stockholm", "seattle", "tokyo", "london", "amsterdam"]
    has_rooms = destination.lower() in cities_with_rooms

    result = {"has_availability": has_rooms, "destination": destination}

    return json.dumps(result)


print("✅ hotel_booking tool created with @ai_function decorator")

✅ hotel_booking tool created with @ai_function decorator


## 第四步：定义路由的条件函数

我们需要为人工干预工作流定义**四个条件函数**：

**来自条件工作流：**
1. `has_availability_condition` - 当酒店有空房时进行路由
2. `no_availability_condition` - 当酒店没有空房时进行路由

**新增的人工干预条件：**
3. `user_wants_alternatives_condition` - 当用户选择“是”以接受替代方案时进行路由
4. `user_declines_alternatives_condition` - 当用户选择“否”以拒绝替代方案时进行路由


In [24]:
# Existing condition functions from conditional workflow
def has_availability_condition(message: Any) -> bool:
    """Condition for routing when hotels ARE available."""
    if not isinstance(message, AgentExecutorResponse):
        return True

    try:
        result = BookingCheckResult.model_validate_json(message.agent_run_response.text)
        display(
            HTML(f"""
            <div style='padding: 12px; background: #c8e6c9; border-left: 4px solid #4caf50; border-radius: 4px; margin: 10px 0;'>
                <strong>✅ Condition Check:</strong> has_availability = <strong>{result.has_availability}</strong> for {result.destination}
            </div>
        """)
        )
        return result.has_availability
    except Exception as e:
        display(HTML(f"""<div style='padding: 12px; background: #ffcdd2; border-left: 4px solid #f44336; border-radius: 4px; margin: 10px 0;'><strong>⚠️  Error:</strong> {str(e)}</div>"""))
        return False


def no_availability_condition(message: Any) -> bool:
    """Condition for routing when hotels are NOT available."""
    if not isinstance(message, AgentExecutorResponse):
        return False

    try:
        result = BookingCheckResult.model_validate_json(message.agent_run_response.text)
        display(
            HTML(f"""
            <div style='padding: 12px; background: #ffecb3; border-left: 4px solid #ff9800; border-radius: 4px; margin: 10px 0;'>
                <strong>❌ Condition Check:</strong> no_availability for {result.destination}
            </div>
        """)
        )
        return not result.has_availability
    except Exception as e:
        return False


# NEW: Condition functions for human-in-the-loop routing
def user_wants_alternatives_condition(message: Any) -> bool:
    """
    Condition for routing when user WANTS to see alternatives.
    
    Checks the AgentExecutorResponse from decision_manager to see if user said 'yes'.
    """
    if not isinstance(message, AgentExecutorResponse):
        return False

    try:
        # The decision_manager yields a simple text response
        response_text = message.agent_run_response.text.lower().strip()
        
        display(
            HTML(f"""
            <div style='padding: 12px; background: #e1f5fe; border-left: 4px solid #0288d1; border-radius: 4px; margin: 10px 0;'>
                <strong>🔍 User Decision:</strong> User wants alternatives = <strong>{response_text == 'yes'}</strong>
            </div>
        """)
        )
        
        return response_text == "yes"
    except Exception as e:
        return False


def user_declines_alternatives_condition(message: Any) -> bool:
    """
    Condition for routing when user DECLINES alternatives.
    
    Checks the AgentExecutorResponse from decision_manager to see if user said 'no'.
    """
    if not isinstance(message, AgentExecutorResponse):
        return False

    try:
        response_text = message.agent_run_response.text.lower().strip()
        
        display(
            HTML(f"""
            <div style='padding: 12px; background: #fce4ec; border-left: 4px solid #c2185b; border-radius: 4px; margin: 10px 0;'>
                <strong>🚫 User Decision:</strong> User declined alternatives = <strong>{response_text == 'no'}</strong>
            </div>
        """)
        )
        
        return response_text == "no"
    except Exception as e:
        return False


print("✅ Condition functions defined:")
print("   - has_availability_condition (routes when rooms exist)")
print("   - no_availability_condition (routes when no rooms)")
print("   - user_wants_alternatives_condition (routes when user says yes) 🆕")
print("   - user_declines_alternatives_condition (routes when user says no) 🆕")

✅ Condition functions defined:
   - has_availability_condition (routes when rooms exist)
   - no_availability_condition (routes when no rooms)
   - user_wants_alternatives_condition (routes when user says yes) 🆕
   - user_declines_alternatives_condition (routes when user says no) 🆕


## 第五步：创建决策管理器执行器

这是**人机协作模式的核心**！`DecisionManager` 是一个自定义的 `Executor`，它能够：

1. **接收人工反馈**，通过 `RequestResponse` 对象实现
2. **处理用户的决策**（是/否）
3. **引导工作流**，通过发送消息给适当的代理完成

主要特点：
- 使用 `@handler` 装饰器将方法暴露为工作流步骤
- 接收 `RequestResponse[HumanFeedbackRequest, str]`，其中包含原始请求和用户的回答
- 生成简单的“是”或“否”消息，用于触发我们的条件函数


In [25]:
class DecisionManager(Executor):
    """
    Coordinates workflow routing based on human feedback.
    
    This executor receives RequestResponse objects from the RequestInfoExecutor
    and makes routing decisions by sending simple messages that trigger
    condition functions.
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "decision_manager")

    @handler
    async def on_human_feedback(
        self,
        feedback: RequestResponse[HumanFeedbackRequest, str],
        ctx: WorkflowContext[AgentExecutorRequest],
    ) -> None:
        """
        Process human feedback and route workflow accordingly.
        
        The RequestResponse contains:
        - feedback.data: The user's string reply (e.g., "yes" or "no")
        - feedback.original_request: The HumanFeedbackRequest with context
        
        We send a simple message that will be caught by our condition functions.
        """
        user_reply = (feedback.data or "").strip().lower()
        destination = getattr(feedback.original_request, "destination", "unknown")

        display(
            HTML(f"""
            <div style='padding: 15px; background: #f3e5f5; border-left: 4px solid #9c27b0; border-radius: 4px; margin: 10px 0;'>
                <strong>🎯 Decision Manager:</strong> Processing user reply: <strong>"{user_reply}"</strong> for {destination}
            </div>
        """)
        )

        if user_reply == "yes":
            # User wants alternatives - send message to trigger alternative_agent
            display(
                HTML("""
                <div style='padding: 12px; background: #c8e6c9; border-left: 4px solid #4caf50; border-radius: 4px; margin: 10px 0;'>
                    <strong>➡️  Routing:</strong> User wants alternatives → Sending to alternative_agent
                </div>
            """)
            )
            # Send a message requesting alternatives for the destination
            user_msg = ChatMessage(
                Role.USER,
                text=f"The user wants to see alternative destinations near {destination}. Please suggest one.",
            )
            await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
        
        elif user_reply == "no":
            # User declined - send message to trigger cancellation
            display(
                HTML("""
                <div style='padding: 12px; background: #ffcdd2; border-left: 4px solid #f44336; border-radius: 4px; margin: 10px 0;'>
                    <strong>🚫 Routing:</strong> User declined alternatives → Sending cancellation message
                </div>
            """)
            )
            # Send a simple "no" message that will be caught by the condition
            user_msg = ChatMessage(
                Role.USER,
                text="no",
            )
            await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
        
        else:
            # Handle unexpected input
            display(
                HTML(f"""
                <div style='padding: 12px; background: #fff3e0; border-left: 4px solid #ff9800; border-radius: 4px; margin: 10px 0;'>
                    <strong>⚠️  Warning:</strong> Unexpected input "{user_reply}" - treating as decline
                </div>
            """)
            )
            user_msg = ChatMessage(Role.USER, text="no")
            await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))


print("✅ DecisionManager executor created with @handler method for human feedback")

✅ DecisionManager executor created with @handler method for human feedback


## 第六步：创建自定义显示执行器

与条件工作流中的显示执行器相同——将最终结果作为工作流输出生成。


In [26]:
@executor(id="prepare_human_request")
async def prepare_human_request(
    response: AgentExecutorResponse, 
    ctx: WorkflowContext[HumanFeedbackRequest]
) -> None:
    """
    Transform agent response into HumanFeedbackRequest for RequestInfoExecutor.
    
    This executor bridges the type gap between:
    - confirmation_agent outputs AgentExecutorResponse with ConfirmationQuestion JSON
    - request_info_executor expects HumanFeedbackRequest (RequestInfoMessage dataclass)
    """
    display(
        HTML("""
        <div style='padding: 12px; background: #e1f5fe; border-left: 4px solid #0288d1; border-radius: 4px; margin: 10px 0;'>
            <strong>🔄 Transform:</strong> Converting ConfirmationQuestion to HumanFeedbackRequest
        </div>
    """)
    )
    
    # Parse the agent's Pydantic output (ConfirmationQuestion)
    confirmation = ConfirmationQuestion.model_validate_json(response.agent_run_response.text)
    
    # Convert to HumanFeedbackRequest dataclass for RequestInfoExecutor
    feedback_request = HumanFeedbackRequest(
        prompt=confirmation.question,
        destination=confirmation.destination
    )
    
    # Send the properly typed RequestInfoMessage to the RequestInfoExecutor
    await ctx.send_message(feedback_request)


@executor(id="display_result")
async def display_result(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    """
    Display the final result as workflow output.
    
    This executor receives the final agent response and yields it as the workflow output.
    """
    display(
        HTML("""
        <div style='padding: 15px; background: #f3e5f5; border-left: 4px solid #9c27b0; border-radius: 4px; margin: 10px 0;'>
            <strong>📤 Display Executor:</strong> Yielding workflow output
        </div>
    """)
    )

    await ctx.yield_output(response.agent_run_response.text)


print("✅ prepare_human_request executor created with @executor decorator")
print("✅ display_result executor created with @executor decorator")

✅ prepare_human_request executor created with @executor decorator
✅ display_result executor created with @executor decorator


## 第七步：加载环境变量

配置 LLM 客户端（GitHub Models、Azure OpenAI 或 OpenAI）。


In [27]:
# Load environment variables
load_dotenv()

# Check for GitHub Models or OpenAI
chat_client = OpenAIChatClient(
    base_url=os.environ.get("GITHUB_ENDPOINT"), 
    api_key=os.environ.get("GITHUB_TOKEN"), 
    model_id="gpt-4o"
)

print("✅ Chat client configured with GitHub Models")

✅ Chat client configured with GitHub Models


## 第八步：创建 AI 代理和执行器

我们创建了**六个工作流组件**：

**代理（封装在 AgentExecutor 中）：**
1. **availability_agent** - 使用工具检查酒店可用性
2. **confirmation_agent** - 🆕 准备发送给用户的确认请求
3. **alternative_agent** - 建议替代城市（当用户回答“是”时）
4. **booking_agent** - 鼓励预订（当有房间可用时）
5. **cancellation_agent** - 🆕 处理取消消息（当用户回答“否”时）

**特殊执行器：**
6. **request_info_executor** - 🆕 `RequestInfoExecutor`，暂停工作流以等待用户输入
7. **decision_manager** - 🆕 自定义执行器，根据用户响应进行路由（已在上文定义）


In [28]:
# Agent 1: Check availability with tool (same as conditional workflow)
availability_agent = AgentExecutor(
    chat_client.create_agent(
        instructions=(
            "You are a hotel booking assistant that checks room availability. "
            "Use the hotel_booking tool to check if rooms are available at the destination. "
            "Return JSON with fields: destination (string), has_availability (bool), and message (string). "
            "The message should summarize the availability status."
        ),
        tools=[hotel_booking],
        response_format=BookingCheckResult,
    ),
    id="availability_agent",
)

# Agent 2: NEW - Prepare human confirmation request
confirmation_agent = AgentExecutor(
    chat_client.create_agent(
        instructions=(
            "You are a helpful assistant. The user's requested destination has no available hotel rooms. "
            "Create a polite message asking if they would like to see alternative destinations nearby. "
            "Return a JSON with: destination (the unavailable city), and question (a friendly yes/no question). "
            "Keep the question concise and friendly."
        ),
        response_format=ConfirmationQuestion,  # Use Pydantic model for agent output
    ),
    id="confirmation_agent",
)

# Agent 3: Suggest alternative (when user says yes)
alternative_agent = AgentExecutor(
    chat_client.create_agent(
        instructions=(
            "You are a helpful travel assistant. When a user cannot find hotels in their requested city, "
            "suggest an alternative nearby city that has availability. "
            "Return JSON with fields: alternative_destination (string) and reason (string). "
            "Make your suggestion sound appealing and helpful."
        ),
        response_format=AlternativeResult,
    ),
    id="alternative_agent",
)

# Agent 4: Suggest booking (when rooms available)
booking_agent = AgentExecutor(
    chat_client.create_agent(
        instructions=(
            "You are a booking assistant. The user has found available hotel rooms. "
            "Encourage them to book by highlighting the destination's appeal. "
            "Return JSON with fields: destination (string), action (string), and message (string). "
            "The action should be 'book_now' and message should be encouraging."
        ),
        response_format=BookingConfirmation,
    ),
    id="booking_agent",
)

# Agent 5: NEW - Handle cancellation when user declines alternatives
class CancellationMessage(BaseModel):
    """Message when user declines alternatives."""
    status: str
    message: str

cancellation_agent = AgentExecutor(
    chat_client.create_agent(
        instructions=(
            "You are a helpful assistant. The user has declined to see alternative hotel destinations. "
            "Create a polite cancellation message. "
            "Return JSON with: status (should be 'cancelled'), and message (a friendly acknowledgment). "
            "Keep the message brief and understanding."
        ),
        response_format=CancellationMessage,
    ),
    id="cancellation_agent",
)

# NEW: RequestInfoExecutor - pauses workflow to gather human input
request_info_executor = RequestInfoExecutor(id="request_info")

# NEW: DecisionManager instance - routes based on human feedback
decision_manager = DecisionManager(id="decision_manager")

display(
    HTML("""
    <div style='padding: 15px; background: #e3f2fd; border-left: 4px solid #2196f3; border-radius: 4px; margin: 10px 0;'>
        <strong>✅ Created Workflow Components:</strong>
        <ul style='margin: 10px 0 0 0;'>
            <li><strong>availability_agent</strong> - Checks availability with hotel_booking tool</li>
            <li><strong>confirmation_agent</strong> 🆕 - Prepares human confirmation request</li>
            <li><strong>alternative_agent</strong> - Suggests alternative cities</li>
            <li><strong>booking_agent</strong> - Encourages booking</li>
            <li><strong>cancellation_agent</strong> 🆕 - Handles user declining alternatives</li>
            <li><strong>request_info_executor</strong> 🆕 - Pauses workflow for human input</li>
            <li><strong>decision_manager</strong> 🆕 - Routes based on human response</li>
        </ul>
    </div>
""")
)

## 第九步：构建包含人工参与的工作流

现在我们构建包含**条件路由**的工作流图，其中包括人工参与路径：

**工作流结构：**
```
availability_agent (START)
        ↓
   Evaluate conditions
        ↙                    ↘
[no_availability]        [has_availability]
        ↓                        ↓
confirmation_agent          booking_agent
        ↓                        ↓
prepare_human_request      display_result
        ↓
request_info_executor (PAUSE)
        ↓
decision_manager
   ↙         ↘
[yes]        [no]
   ↓           ↓
alternative  cancellation
   ↓           ↓
display_result
```

**关键路径：**
- `availability_agent → confirmation_agent`（当没有房间时）
- `confirmation_agent → prepare_human_request`（转换类型）
- `prepare_human_request → request_info_executor`（暂停以等待人工处理）
- `request_info_executor → decision_manager`（始终 - 提供 RequestResponse）
- `decision_manager → alternative_agent`（当用户选择“是”时）
- `decision_manager → cancellation_agent`（当用户选择“否”时）
- `availability_agent → booking_agent`（当有房间时）
- 所有路径最终到达 `display_result`


In [29]:
# Build the workflow with human-in-the-loop routing
workflow = (
    WorkflowBuilder()
    .set_start_executor(availability_agent)
    
    # NO AVAILABILITY PATH (with human-in-the-loop)
    .add_edge(availability_agent, confirmation_agent, condition=no_availability_condition)
    .add_edge(confirmation_agent, prepare_human_request)  # Transform to HumanFeedbackRequest
    .add_edge(prepare_human_request, request_info_executor)  # Send to RequestInfoExecutor
    .add_edge(request_info_executor, decision_manager)    # Always goes to decision manager
    
    # Decision manager routes based on user response
    .add_edge(decision_manager, alternative_agent, condition=user_wants_alternatives_condition)
    .add_edge(decision_manager, cancellation_agent, condition=user_declines_alternatives_condition)
    .add_edge(alternative_agent, display_result)
    .add_edge(cancellation_agent, display_result)
    
    # HAS AVAILABILITY PATH (no human input needed)
    .add_edge(availability_agent, booking_agent, condition=has_availability_condition)
    .add_edge(booking_agent, display_result)
    
    .build()
)

display(
    HTML("""
    <div style='padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 8px; margin: 10px 0;'>
        <h3 style='margin: 0 0 15px 0;'>✅ Workflow Built Successfully!</h3>
        <p style='margin: 0; line-height: 1.6;'>
            <strong>Human-in-the-Loop Routing:</strong><br>
            • If <strong>NO availability</strong> → confirmation_agent → prepare_human_request → request_info_executor → <strong>PAUSE FOR HUMAN</strong> → decision_manager<br>
            &nbsp;&nbsp;• If user says <strong>YES</strong> → alternative_agent → display_result<br>
            &nbsp;&nbsp;• If user says <strong>NO</strong> → cancellation_agent → display_result<br>
            • If <strong>availability</strong> → booking_agent → display_result (no human input needed)
        </p>
    </div>
""")
)

## 第10步：运行测试用例1 - 无可用房间的城市（巴黎，需要人工确认）

此测试展示了**完整的人工参与流程**：

1. 请求巴黎的酒店
2. availability_agent 检查 → 无房间
3. confirmation_agent 创建面向用户的问题
4. request_info_executor **暂停工作流**并触发 `RequestInfoEvent`
5. **应用程序检测到事件并在控制台提示用户**
6. 用户输入“yes”或“no”
7. 应用程序通过 `send_responses_streaming()` 发送响应
8. decision_manager 根据响应进行路由
9. 显示最终结果

**关键模式：**
- 第一次迭代使用 `workflow.run_stream()`
- 后续迭代使用 `workflow.send_responses_streaming(pending_responses)`
- 监听 `RequestInfoEvent` 以检测何时需要人工输入
- 监听 `WorkflowOutputEvent` 以捕获最终结果


In [None]:
display(
    HTML("""
    <div style='padding: 20px; background: #fff3e0; border-left: 4px solid #ff9800; border-radius: 8px; margin: 20px 0;'>
        <h3 style='margin: 0 0 10px 0; color: #e65100;'>🧪 TEST CASE 1: Paris (No Availability - Human-in-the-Loop)</h3>
        <p style='margin: 0;'>Expected workflow path: availability_agent → confirmation_agent → request_info_executor → <strong>PAUSE</strong> → decision_manager → (depends on user input)</p>
    </div>
""")
)

# Create request for Paris
request_paris = AgentExecutorRequest(
    messages=[ChatMessage(Role.USER, text="I want to book a hotel in Paris")], 
    should_respond=True
)

# Human-in-the-loop execution pattern
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None

print("\n🔄 Starting human-in-the-loop workflow...")
print("=" * 60)

while not completed:
    # First iteration uses run_stream with the request
    # Subsequent iterations use send_responses_streaming with collected human responses
    if pending_responses:
        print(f"\n📤 Sending human responses: {pending_responses}")
        stream = workflow.send_responses_streaming(pending_responses)
        pending_responses = None  # Clear immediately after sending
    else:
        print(f"\n🚀 Starting workflow with request: 'I want to book a hotel in Paris'")
        stream = workflow.run_stream(request_paris)
    
    # Collect all events from this iteration
    events = [event async for event in stream]
    
    # Process events
    requests: list[tuple[str, str]] = []  # (request_id, prompt)
    
    for event in events:
        # Check for human input requests
        if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
            print(f"\n⏸️  WORKFLOW PAUSED - Human input requested!")
            print(f"   Request ID: {event.request_id}")
            print(f"   Destination: {event.data.destination}")
            requests.append((event.request_id, event.data.prompt))
        
        # Check for workflow outputs
        elif isinstance(event, WorkflowOutputEvent):
            workflow_output = str(event.data)
            completed = True
            print(f"\n✅ Workflow completed with output!")
    
    # If we have human requests, prompt the user
    if requests and not completed:
        responses: dict[str, str] = {}
        for req_id, prompt in requests:
            print(f"\n{'='*60}")
            print(f"💬 QUESTION FOR YOU:")
            print(f"   {prompt}")
            print(f"{'='*60}")
            
            # Get user input (in notebook, this will pause execution)
            answer = input("👉 Enter 'yes' or 'no': ").strip().lower()
            
            print(f"\n📝 You answered: {answer}")
            responses[req_id] = answer
        
        pending_responses = responses

print(f"\n{'='*60}")
print(f"🏆 FINAL WORKFLOW OUTPUT:")
print(f"{'='*60}")

# Display final result
if workflow_output:
    # Try to parse as JSON for pretty display
    try:
        result_data = json.loads(workflow_output)
        if "alternative_destination" in result_data:
            result_obj = AlternativeResult.model_validate_json(workflow_output)
            display(
                HTML(f"""
                <div style='padding: 25px; background: linear-gradient(135deg, #FFD700 0%, #FFA500 100%); border-radius: 12px; box-shadow: 0 4px 12px rgba(255,165,0,0.3); margin: 20px 0;'>
                    <h3 style='margin: 0 0 15px 0; color: #333;'>🏆 WORKFLOW RESULT</h3>
                    <div style='background: white; padding: 20px; border-radius: 8px;'>
                        <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Status:</strong> ❌ No rooms in Paris</p>
                        <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>User Decision:</strong> ✅ Accepted alternatives</p>
                        <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Alternative Suggestion:</strong> 🏨 {result_obj.alternative_destination}</p>
                        <p style='margin: 0; font-size: 14px; color: #666;'><strong>Reason:</strong> {result_obj.reason}</p>
                    </div>
                </div>
            """)
            )
        else:
            # User declined
            display(
                HTML(f"""
                <div style='padding: 25px; background: linear-gradient(135deg, #f44336 0%, #e91e63 100%); color: white; border-radius: 12px; box-shadow: 0 4px 12px rgba(244,67,54,0.3); margin: 20px 0;'>
                    <h3 style='margin: 0 0 15px 0;'>🏆 WORKFLOW RESULT</h3>
                    <div style='background: white; color: #333; padding: 20px; border-radius: 8px;'>
                        <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Status:</strong> ❌ No rooms in Paris</p>
                        <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>User Decision:</strong> 🚫 Declined alternatives</p>
                        <p style='margin: 0; font-size: 14px; color: #666;'><strong>Result:</strong> Booking request cancelled</p>
                    </div>
                </div>
            """)
            )
    except:
        print(workflow_output)


🔄 Starting human-in-the-loop workflow...

🚀 Starting workflow with request: 'I want to book a hotel in Paris'



⏸️  WORKFLOW PAUSED - Human input requested!
   Request ID: 032c8fce-b9d1-400e-ba8d-afd2248e2926
   Destination: Paris

💬 QUESTION FOR YOU:
   Unfortunately, there are no rooms available in Paris. Would you like to explore nearby alternative destinations?

📝 You answered: yes

📤 Sending human responses: {'032c8fce-b9d1-400e-ba8d-afd2248e2926': 'yes'}

🚀 Starting workflow with request: 'I want to book a hotel in Paris'

📝 You answered: yes

📤 Sending human responses: {'032c8fce-b9d1-400e-ba8d-afd2248e2926': 'yes'}

🚀 Starting workflow with request: 'I want to book a hotel in Paris'



⏸️  WORKFLOW PAUSED - Human input requested!
   Request ID: cf48dad0-ee5e-4f60-8806-341a7a292bd4
   Destination: Paris

💬 QUESTION FOR YOU:
   I'm sorry to inform you that there are no available hotel rooms in Paris. Would you like me to suggest nearby alternative destinations?

📝 You answered: 

📤 Sending human responses: {'cf48dad0-ee5e-4f60-8806-341a7a292bd4': ''}

🚀 Starting workflow with request: 'I want to book a hotel in Paris'

📝 You answered: 

📤 Sending human responses: {'cf48dad0-ee5e-4f60-8806-341a7a292bd4': ''}

🚀 Starting workflow with request: 'I want to book a hotel in Paris'


## 第11步：运行测试用例2 - 有房间可用的城市（斯德哥尔摩 - 无需人工输入）

此测试展示了房间可用时的**直接路径**：

1. 请求斯德哥尔摩的酒店
2. availability_agent 检查 → 房间可用 ✅
3. booking_agent 建议预订
4. display_result 显示确认信息
5. **无需人工输入！**

当房间可用时，工作流程完全绕过人工参与路径。


In [None]:
display(
    HTML("""
    <div style='padding: 20px; background: #e8f5e9; border-left: 4px solid #4caf50; border-radius: 8px; margin: 20px 0;'>
        <h3 style='margin: 0 0 10px 0; color: #1b5e20;'>🧪 TEST CASE 2: Stockholm (Has Availability - No Human Input)</h3>
        <p style='margin: 0;'>Expected workflow path: availability_agent → booking_agent → display_result (direct, no pause)</p>
    </div>
""")
)

# Create request for Stockholm
request_stockholm = AgentExecutorRequest(
    messages=[ChatMessage(Role.USER, text="I want to book a hotel in Stockholm")], 
    should_respond=True
)

# Run the workflow (should complete without human input)
events_stockholm = await workflow.run(request_stockholm)
outputs_stockholm = events_stockholm.get_outputs()

# Display results
if outputs_stockholm:
    result_stockholm = BookingConfirmation.model_validate_json(outputs_stockholm[0])

    display(
        HTML(f"""
        <div style='padding: 25px; background: linear-gradient(135deg, #4caf50 0%, #8bc34a 100%); color: white; border-radius: 12px; box-shadow: 0 4px 12px rgba(76,175,80,0.3); margin: 20px 0;'>
            <h3 style='margin: 0 0 15px 0;'>🏆 WORKFLOW RESULT (Stockholm - No Human Input)</h3>
            <div style='background: white; color: #333; padding: 20px; border-radius: 8px;'>
                <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Status:</strong> ✅ Rooms Available!</p>
                <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Destination:</strong> 🏨 {result_stockholm.destination}</p>
                <p style='margin: 0 0 10px 0; font-size: 16px;'><strong>Action:</strong> {result_stockholm.action}</p>
                <p style='margin: 0 0 10px 0; font-size: 14px; color: #666;'><strong>Message:</strong> {result_stockholm.message}</p>
                <p style='margin: 10px 0 0 0; font-size: 12px; color: #999; font-style: italic;'>Note: No human input was requested because rooms were available!</p>
            </div>
        </div>
    """)
    )

## 关键要点与人机协作最佳实践

### ✅ 你学到了什么：

#### 1. **RequestInfoExecutor 模式**
Microsoft Agent Framework 中的人机协作模式使用了三个关键组件：
- `RequestInfoExecutor` - 暂停工作流并发出事件
- `RequestInfoMessage` - 类型化请求负载的基类（需要继承此类！）
- `RequestResponse` - 将人工响应与原始请求相关联

**关键理解：**
- `RequestInfoExecutor` 本身并不收集输入 - 它仅仅是暂停工作流
- 你的应用程序代码必须监听 `RequestInfoEvent` 并收集输入
- 你必须使用一个将 `request_id` 映射到用户答案的字典调用 `send_responses_streaming()`

#### 2. **流式执行模式**
```python
# First iteration
stream = workflow.run_stream(initial_request)

# Subsequent iterations (after human input)
stream = workflow.send_responses_streaming(pending_responses)

# Always process events
events = [event async for event in stream]
```

#### 3. **事件驱动架构**
通过监听特定事件来控制工作流：
- `RequestInfoEvent` - 需要人工输入（工作流暂停）
- `WorkflowOutputEvent` - 最终结果可用（工作流完成）
- `WorkflowStatusEvent` - 状态变化（IN_PROGRESS、IDLE_WITH_PENDING_REQUESTS 等）

#### 4. **使用 @handler 的自定义执行器**
`DecisionManager` 展示了如何创建执行器：
- 使用 `@handler` 装饰器将方法暴露为工作流步骤
- 接收类型化消息（例如，`RequestResponse[HumanFeedbackRequest, str]`）
- 通过向其他执行器发送消息来路由工作流
- 通过 `WorkflowContext` 访问上下文

#### 5. **基于人工决策的条件路由**
你可以创建评估人工响应的条件函数：
```python
def user_wants_alternatives_condition(message: Any) -> bool:
    response_text = message.agent_run_response.text.lower()
    return response_text == "yes"
```

### 🎯 实际应用场景：

1. **审批工作流**
   - 在处理报销单之前获取经理批准
   - 在发送自动化邮件之前需要人工审核
   - 在执行高价值交易之前进行确认

2. **内容审核**
   - 标记可疑内容供人工审核
   - 让审核员对边界案例做出最终决定
   - 当 AI 信心不足时升级到人工处理

3. **客户服务**
   - 让 AI 自动处理常规问题
   - 将复杂问题升级给人工客服
   - 询问客户是否希望与人工客服沟通

4. **数据处理**
   - 让人工解决模糊的数据条目
   - 确认 AI 对不清晰文档的解释
   - 让用户在多个有效解释中进行选择

5. **安全关键系统**
   - 在执行不可逆操作之前需要人工确认
   - 在访问敏感数据之前获取批准
   - 在受监管行业（如医疗、金融）中确认决策

6. **交互式代理**
   - 构建能够提出后续问题的对话机器人
   - 创建引导用户完成复杂流程的向导
   - 设计与人类逐步协作的智能代理

### 🔄 对比：有与没有人机协作的工作流

| 功能 | 条件工作流 | 人机协作工作流 |
|------|------------|----------------|
| **执行** | 单次 `workflow.run()` | 使用 `run_stream()` + `send_responses_streaming()` 的循环 |
| **用户输入** | 无（完全自动化） | 通过 `input()` 或 UI 的交互式提示 |
| **组件** | Agents + Executors | + RequestInfoExecutor + DecisionManager |
| **事件** | 仅 AgentExecutorResponse | RequestInfoEvent, WorkflowOutputEvent 等 |
| **暂停** | 无暂停 | 工作流在 RequestInfoExecutor 处暂停 |
| **人工控制** | 无人工控制 | 人工进行关键决策 |
| **使用场景** | 自动化 | 协作与监督 |

### 🚀 高级模式：

#### 多个人工决策点
在同一工作流中可以有多个 `RequestInfoExecutor` 节点：
```python
.add_edge(agent1, request_info_1)  # First human decision
.add_edge(decision_manager_1, agent2)
.add_edge(agent2, request_info_2)  # Second human decision
.add_edge(decision_manager_2, final_agent)
```

#### 超时处理
为人工响应实现超时机制：
```python
import asyncio

try:
    answer = await asyncio.wait_for(
        asyncio.to_thread(input, "Enter yes/no: "),
        timeout=60.0
    )
except asyncio.TimeoutError:
    answer = "no"  # Default to safe option
```

#### 丰富的 UI 集成
替代 `input()`，与 Web UI、Slack、Teams 等集成：
```python
if isinstance(event, RequestInfoEvent):
    # Send notification to user's preferred channel
    await slack_client.send_message(
        user_id=current_user,
        text=event.data.prompt,
        request_id=event.request_id
    )
```

#### 条件人机协作
仅在特定情况下请求人工输入：
```python
def needs_human_approval_condition(message: Any) -> bool:
    # Only route to human if confidence is low or value is high
    if result.confidence < 0.7 or result.value > 10000:
        return True
    return False
```

### ⚠️ 最佳实践：

1. **始终继承 RequestInfoMessage**
   - 提供类型安全性和验证
   - 为 UI 渲染提供丰富的上下文
   - 明确每种请求类型的意图

2. **使用描述性提示**
   - 包含关于你所询问内容的上下文
   - 解释每种选择的后果
   - 确保问题简单明了

3. **处理意外输入**
   - 验证用户响应
   - 为无效输入提供默认值
   - 提供清晰的错误信息

4. **跟踪 Request ID**
   - 使用 request_id 和响应之间的关联
   - 不要尝试手动管理状态

5. **设计为非阻塞**
   - 不要阻塞线程等待输入
   - 全程使用异步模式
   - 支持并发工作流实例

### 📚 相关概念：

- **Agent Middleware** - 拦截代理调用（参见前一篇笔记）
- **工作流状态管理** - 在运行之间持久化工作流状态
- **多代理协作** - 将人机协作与代理团队结合
- **事件驱动架构** - 使用事件构建响应式系统

---

### 🎓 恭喜！

你已经掌握了 Microsoft Agent Framework 的人机协作工作流！你现在知道如何：
- ✅ 暂停工作流以收集人工输入
- ✅ 使用 RequestInfoExecutor 和 RequestInfoMessage
- ✅ 使用事件处理流式执行
- ✅ 使用 @handler 创建自定义执行器
- ✅ 基于人工决策路由工作流
- ✅ 构建与人类协作的交互式 AI 代理

**这是构建可信赖、可控 AI 系统的关键模式！** 🚀



---

**免责声明**：  
本文档使用AI翻译服务 [Co-op Translator](https://github.com/Azure/co-op-translator) 进行翻译。尽管我们努力确保翻译的准确性，但请注意，自动翻译可能包含错误或不准确之处。原始语言的文档应被视为权威来源。对于关键信息，建议使用专业人工翻译。我们不对因使用此翻译而产生的任何误解或误读承担责任。
