# Human in the loop

如何进入agent的流程，监视agent的执行？

In [2]:
# libraries
from dotenv import load_dotenv
import os

# load environment variables from .env file
_ = load_dotenv()
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_ollama import ChatOllama
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.checkpoint.sqlite import SqliteSaver

In [3]:
from uuid import uuid4
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage

"""
In previous examples we've annotated the `messages` state key
with the default `operator.add` or `+` reducer, which always
appends new messages to the end of the existing messages array.

Now, to support replacing existing messages, we annotate the
`messages` key with a customer reducer function, which replaces
messages with the same `id`, and appends them otherwise.
"""
def reduce_messages(left: list[AnyMessage], right: list[AnyMessage]) -> list[AnyMessage]:
    # assaign ids to messages that don't have them
    for message in right:
        if not message.id:
            message.id = str(uuid4())
    # merge the new messages with the existing messages
    merged = left.copy()
    for message in right:
        for i, existing in enumerate(merged):
            # replace existing message with the same id
            if existing.id == message.id:
                merged[i] = message
                break
        else:
            # append new messages
            merged.append(message)
    return merged

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], reduce_messages]

In [4]:
tool = TavilySearchResults(max_results=2)

In [5]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openollama)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges(
            "llm",
            self.exists_action,
            {True: "action", False: END}
        )
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile( 
            checkpointer=checkpointer,  # 这里添加checkpoint
            interrupt_before=["action"]   # 将action参数传递给interrupt_before，这将在我们调用action之前添加中断。这么做的目的是让我们能在运行每个工具前添加一个手动同意许可，当你希望确保工具被正确执行时，这是非常有用的。
            )    
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openollama(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}
    
    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0
    
    def take_action(self, state: AgentState):
        import threading
        print(f"take_action called in thread: {threading.current_thread().name}")
        tool_calls = state['messages'][-1].tool_calls
        results = []
        print(f"take_action called with tool_calls: {tool_calls}")
        for t in tool_calls:
            print(f"Calling: {t}") 
            if not t['name'] in self.tools:      # check for bad tool name from LLM
                print("\n ....bad tool name....")
                result = "bad tool name, retry"  # instruct LLM to retry if bad
            else:
                result = self.tools[t['name']].invoke(t['args'])
                print(f"action {t['name']}, result: {result}")
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [6]:
prompt = """You are a smart research assistant. Use the search engine to look up information.  \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""

model = ChatOllama(
    model="qwen2.5:14b",
    temperature=0
)

query = "What is the weather in New York?"
messages = [HumanMessage(content=query)]
thread = {"configurable": {"thread_id": "1"}}  

with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)
    
    state_info = abot.graph.get_state(thread)
    print("Current state:", state_info)

    next_info = abot.graph.get_state(thread).next
    print(next_info)
    


{'messages': [AIMessage(content='', additional_kwargs={}, response_metadata={'model': 'qwen2.5:14b', 'created_at': '2025-03-14T02:43:47.847898Z', 'done': True, 'done_reason': 'stop', 'total_duration': 12494459458, 'load_duration': 580753667, 'prompt_eval_count': 245, 'prompt_eval_duration': 7057000000, 'eval_count': 62, 'eval_duration': 4638000000, 'message': Message(role='assistant', content='', images=None, tool_calls=None)}, id='run-ddd54716-bc5a-4ad6-bcd0-77536f37ad88-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in New York'}, 'id': '2bb3ad9f-bf82-48db-a25a-306e772e6044', 'type': 'tool_call'}], usage_metadata={'input_tokens': 245, 'output_tokens': 62, 'total_tokens': 307})]}
()
Current state: StateSnapshot(values={'messages': [HumanMessage(content='What is the weather in New York?', additional_kwargs={}, response_metadata={}, id='30d02d4b-e5e6-4c1b-881a-af2314eca39b'), AIMessage(content='', additional_kwargs={}, response_metadata={'model

In [7]:
model = ChatOllama(
    model="llama3.2",
    temperature=0
)

query = "What is the weather in New York?"
messages = [HumanMessage(content=query)]
thread = {"configurable": {"thread_id": "2"}}  

with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)
    
    while abot.graph.get_state(thread).next:
        print(abot.graph.get_state(thread))
        _input = input("proceed? (y/n): ")
        if _input != "y":
            print("aborting")
            break
        for event in abot.graph.stream(None, thread):
            for v in event.values():
                print(v)

{'messages': [AIMessage(content='', additional_kwargs={}, response_metadata={'model': 'llama3.2', 'created_at': '2025-03-14T02:43:51.483235Z', 'done': True, 'done_reason': 'stop', 'total_duration': 3457266625, 'load_duration': 1191380041, 'prompt_eval_count': 260, 'prompt_eval_duration': 1443000000, 'eval_count': 23, 'eval_duration': 447000000, 'message': Message(role='assistant', content='', images=None, tool_calls=None)}, id='run-33c392d4-b44e-49de-9ace-1e868ebe0d86-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'New York weather'}, 'id': '2b335d94-7efa-48b6-8e90-5b8067ed87f9', 'type': 'tool_call'}], usage_metadata={'input_tokens': 260, 'output_tokens': 23, 'total_tokens': 283})]}
()
StateSnapshot(values={'messages': [HumanMessage(content='What is the weather in New York?', additional_kwargs={}, response_metadata={}, id='32191b6c-1284-4991-b6f0-ac46b7232718'), AIMessage(content='', additional_kwargs={}, response_metadata={'model': 'llama3.2', 'created_at': '

以下是一个修改state后的state状态的示例。很重要的一点是，实际上我们在保留所有这些状态的运行列表，因此当我们修改状态时，我们实际上创建了一个新状态，然后它就变成了新的状态。每次我们使用节点的结果来更新它时，实际上是一个接一个地创建了一个新的状态。这是因为实际上，它会允许我们回到以前的状态，我们将其称之为时空穿梭。

In [33]:
query = "What is the weather in shanghai?"
messages = [HumanMessage(content=query)]
thread = {"configurable": {"thread_id": "3"}}  
model = ChatOllama(
    model="qwen2.5:7b",
    temperature=0
)

with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    for event in abot.graph.stream({"messages": messages}, thread):
        for v in event.values():
            print(v)

    print("\n### EXAMINING CURRENT STATE AND TOOL CALLS ###\n")
    
    current_values = abot.graph.get_state(thread)
    print(current_values.values['messages'][-1])
    print(current_values.values['messages'][-1].tool_calls)

    print("\n### MODIFYING STATE BY CHANGING TOOL CALL QUERY ###\n")

    _id = current_values.values['messages'][-1].tool_calls[0]['id']
    current_values.values['messages'][-1].tool_calls = [
        {'name': 'tavily_search_results_json',
         'args': {'query': 'current weather in Louisiana'},
         'id': _id}
    ]
    abot.graph.update_state(thread, current_values.values)

    current_values = abot.graph.get_state(thread)
    print(current_values.values['messages'][-1])
    print(current_values.values['messages'][-1].tool_calls)

    print("\n### RE-EXECUTING GRAPH WITH MODIFIED STATE ###\n")

    for event in abot.graph.stream(None, thread):
        for v in event.values():
            print(v)

    print("\n### EXAMINING STATE HISTORY AND REPLAYING FROM PREVIOUS STATE ###\n")

    states = []
    for state in abot.graph.get_state_history(thread):
        print(state)
        print('--')
        states.append(state)
    
    # Choose a state that has AI response with tool calls
    # Usually this will be states[1] or states[0] (not states[-1])
    # Let's find a state with tool calls
    replay_state = None
    for state in states:
        for msg in state.values.get('messages', []):
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                replay_state = state
                break
        if replay_state:
            break
    
    if not replay_state:
        print("No state with tool calls found")
    else:
        print("\n### FOUND STATE WITH TOOL CALLS TO REPLAY FROM ###\n")
        print(replay_state)

        for event in abot.graph.stream(None, replay_state.config):
            for k, v in event.items():
                print(v)
        
        print("\n### MODIFYING STATE FROM PAST BY CHANGING TOOL CALL QUERY ###\n")

        _id = replay_state.values['messages'][-1].tool_calls[0]['id']
        replay_state.values['messages'][-1].tool_calls = [
            {'name': 'tavily_search_results_json',
             'args': {'query': 'current weather in New York, accuweather'},
             'id': _id}
        ]
        branch_state = abot.graph.update_state(replay_state.config, replay_state.values)
        print(branch_state)

        for event in abot.graph.stream(None, branch_state):
            for k, v in event.items():
                if k != "__end__":
                    print(v)
    
    # 我们可以在任何时间点向状态添加消息
    # 现在假设我们不是实际调用Tavily，而是想要模拟一个响应，我们可以通过将新消息附加到状态中来实现这点
    print("\n### MODIFYING STATE BY ADDING A NEW MESSAGE ###\n")
    _id = replay_state.values['messages'][-1].tool_calls[0]['id']   # 获取我们应该进行的工具调用的ID
    state_update = {"messages": [ToolMessage(
        tool_call_id=_id,
        name='tavily_search_results_json',
        content="54 degree celcius",
    )]}
    branch_and_add = abot.graph.update_state(
        replay_state.config,
        state_update,
        as_node="action"    # 这在说明，我们做的状态更新不仅仅是一个修改，而是一个新的action。这个原因是相关的，因为在我们添加这条消息之前，图的当前状态是即将进入action。但当我们添加这条消息后，我们不希望它再进入action。
    )
    for event in abot.graph.stream(None, branch_and_add):
        for k, v in event.items():
            print(v)


{'messages': [AIMessage(content='', additional_kwargs={}, response_metadata={'model': 'qwen2.5:7b', 'created_at': '2025-03-14T07:44:55.743812Z', 'done': True, 'done_reason': 'stop', 'total_duration': 1344814791, 'load_duration': 31538375, 'prompt_eval_count': 245, 'prompt_eval_duration': 303000000, 'eval_count': 26, 'eval_duration': 1007000000, 'message': Message(role='assistant', content='', images=None, tool_calls=None)}, id='run-80c635ba-9701-48a4-b606-0952d94f551f-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in Shanghai'}, 'id': 'b97a86cc-9588-411f-9e88-9a3ad7c8ed5b', 'type': 'tool_call'}], usage_metadata={'input_tokens': 245, 'output_tokens': 26, 'total_tokens': 271})]}
()

### EXAMINING CURRENT STATE AND TOOL CALLS ###

content='' additional_kwargs={} response_metadata={'model': 'qwen2.5:7b', 'created_at': '2025-03-14T07:44:55.743812Z', 'done': True, 'done_reason': 'stop', 'total_duration': 1344814791, 'load_duration': 31538375, 'prompt_eval_c

总结；你学习了如何在节点执行之前添加一个断点，这允许人类允许或拒绝特定的动作。你了解了如何回到过去，以及如何修改状态，无论是当前状态还是过去状态。此外，还展示了如何手动更新状态，这使得你可以手动向代理提供调用工具的结果，而不是实际调用工具。