## Overview

The previous [notebook](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agent-engine/intro_agent_engine.ipynb) covered: Defining Tools, Defining a Router, Building a LangGraph Application, Local Testing, Deploying to Vertex AI, Remote Testing, and Cleaning Up Resources.


- **Reviewing Tool Calls:** Implement human oversight after tool use, allowing for verification and correction of actions before proceeding.
- **Fetching State History:** Retrieve the complete execution history of the LangGraph application for auditing, analysis, and potential state reversion.
- **Time Travel:** Examine the state of the agent at a specific point in time to understand past decisions.
- **Replay:** Restart execution from a specific checkpoint without modifications to ensure consistent results.
- **Branching:** Create alternative execution paths based on a past state, enabling the agent to explore different possibilities or correct previous errors.

#### Copyright 2025 Google LLC
(https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agent-engine/langgraph_human_in_the_loop.ipynb)
(https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/agent-engine/intro_agent_engine.ipynb)

In [1]:
%pip install --upgrade --user --quiet "google-cloud-aiplatform[agent_engines,langchain]" requests --force-reinstall

Note: you may need to restart the kernel to use updated packages.


ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv.


In [13]:
import IPython
import vertexai
from langchain.load import load as langchain_load
import requests
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import LangchainAgent, LanggraphAgent

In [2]:
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [2]:
PROJECT_ID = "llm-studies"
LOCATION = "us-central1"
STAGING_BUCKET = "gs:// human-in-the-loop-lq"
vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET)

In [20]:
def get_exchange_rate(
    currency_from: str = "USD",
    currency_to: str = "EUR",
    currency_date: str = "latest",
):
    """Retrieves the exchange rate between two currencies on a specified date.

    Uses the Frankfurter API (https://api.frankfurter.app/) to obtain
    exchange rate data.

    Args:
        currency_from: The base currency (3-letter currency code).
            Defaults to "USD" (US Dollar).
        currency_to: The target currency (3-letter currency code).
            Defaults to "EUR" (Euro).
        currency_date: The date for which to retrieve the exchange rate.
            Defaults to "latest" for the most recent exchange rate data.
            Can be specified in YYYY-MM-DD format for historical rates.

    Returns:
        dict: A dictionary containing the exchange rate information.
            Example: {"amount": 1.0, "base": "USD", "date": "2023-11-24",
                "rates": {"EUR": 0.95534}}
    """

    response = requests.get(
        f"https://api.frankfurter.app/{currency_date}",
        params={"from": currency_from, "to": currency_to},
    )
    return response.json()

In [21]:
checkpointer_kwargs = None
def checkpointer_builder(**kwargs):
    from langgraph.checkpoint.memory import MemorySaver
    return MemorySaver()

In [22]:
agent = LanggraphAgent(
    model="gemini-2.0-flash",
    tools=[get_exchange_rate],
    model_kwargs={"temperature": 0, "max_retries": 6},
    checkpointer_kwargs=checkpointer_kwargs,
    checkpointer_builder=checkpointer_builder,
)

In [23]:
agent.set_up()

In [24]:
inputs = {
    "messages": [
        ("user", "What is the exchange rate from US dollars to Swedish currency?")
    ]
}

In [25]:
response = agent.query(
    input=inputs,
    config={"configurable": {"thread_id": "synchronous-thread-id"}},
)

response["messages"][-1]["kwargs"]["content"]

'Could you please specify the symbols or the names of the currencies you are interested in? Also, do you have a specific date for the exchange rate you want to know? If not, I will use the latest available data.'

In [26]:
for state_values in agent.stream_query(
    input=inputs,
    stream_mode="values",
    config={"configurable": {"thread_id": "streaming-thread-values"}},
):
    print(state_values)

{'messages': [{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'HumanMessage'], 'kwargs': {'content': 'What is the exchange rate from US dollars to Swedish currency?', 'type': 'human', 'id': 'ed68c05b-67a7-48a3-b4f3-9ebf3059685d'}}]}
{'messages': [{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'HumanMessage'], 'kwargs': {'content': 'What is the exchange rate from US dollars to Swedish currency?', 'type': 'human', 'id': 'ed68c05b-67a7-48a3-b4f3-9ebf3059685d'}}, {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Could you please specify the symbols or the full name of the Swedish currency you are interested in?\n', 'response_metadata': {'is_blocked': False, 'safety_ratings': [], 'usage_metadata': {'prompt_token_count': 134, 'candidates_token_count': 20, 'total_token_count': 154, 'prompt_tokens_details': [{'modality': 1, 'token_count': 134}], 'candidates_tokens_details': [{'m

In [27]:
for state_updates in agent.stream_query(
    input=inputs,
    stream_mode="updates",
    config={"configurable": {"thread_id": "streaming-thread-updates"}},
):
    print(state_updates)

{'agent': {'messages': [{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Could you please specify the symbols or the full name of the Swedish currency you are interested in?\n', 'response_metadata': {'is_blocked': False, 'safety_ratings': [], 'usage_metadata': {'prompt_token_count': 134, 'candidates_token_count': 20, 'total_token_count': 154, 'prompt_tokens_details': [{'modality': 1, 'token_count': 134}], 'candidates_tokens_details': [{'modality': 1, 'token_count': 20}], 'cached_content_token_count': 0, 'cache_tokens_details': []}, 'finish_reason': 'STOP', 'avg_logprobs': -0.0729248046875, 'model_name': 'gemini-2.0-flash'}, 'type': 'ai', 'id': 'run-d8a5cd7f-94dc-433b-91bb-ce7ff96a1e6a-0', 'usage_metadata': {'input_tokens': 134, 'output_tokens': 20, 'total_tokens': 154}, 'tool_calls': [], 'invalid_tool_calls': []}}]}}


In [56]:
response = agent.query(
    input=inputs,
    interrupt_before=["tools"],  # Before invoking the tool.
    interrupt_after=["tools"],  # After getting a tool message.
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

The process was interrupted *before invoking the tool*.

After review, we assume the LLM-generated tool call (`AI Message`) is correct and proceed to resume execution.

In [57]:
response = agent.query(
    input=None,  # Resume (continue with the tool call AI Message).
    interrupt_before=["tools"],
    interrupt_after=["tools"],
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

The process is interrupted again *after receiving the tool message*.

Upon review, if the LLM-generated `Tool Message` appears correct, we can resume execution.

In [15]:
response = agent.query(
    input=None,  # Resume (continue with the Tool Message).
    interrupt_before=["tools"],
    interrupt_after=["tools"],
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

### Fetching State History

You can fetch the state history by calling `.get_state_history`.

In [16]:
for state_snapshot in agent.get_state_history(
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
):
    if state_snapshot["metadata"]["step"] >= 0:
        print(f'step {state_snapshot["metadata"]["step"]}: {state_snapshot["config"]}')
        state_snapshot["values"]["messages"][-1].pretty_print()
        print("\n")

### Time Travel

LangGraph's [Time Travel](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/time-travel/) demonstrates how to build a conversational agent with persistent memory, enabling human intervention to correct past actions.  Essentially, it "rewinds" the conversation to a previous state, allows for mistake correction, and permits the agent to continue from that corrected point.

You can "time travel" by calling `.get_state`. By default, the agent retrieves the `latest state`.

In [17]:
state = agent.get_state(
    config={
        "configurable": {
            "thread_id": "human-in-the-loop-deepdive",
        }
    }
)

print(f'step {state["metadata"]["step"]}: {state["config"]}')
state["values"]["messages"][-1].pretty_print()

To retrieve an earlier state, you need to specify the `checkpoint_id` (and `checkpoint_ns`).

In [18]:
snapshot_config = {}
for state_snapshot in agent.get_state_history(
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
):
    if state_snapshot["metadata"]["step"] == 1:
        snapshot_config = state_snapshot["config"]
        break

snapshot_config

In [19]:
state = agent.get_state(config=snapshot_config)
print(f'step {state["metadata"]["step"]}: {state["config"]}')
state["values"]["messages"][-1].pretty_print()

In [20]:
state

### Replay

LangGraph's [Replay](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/time-travel/#replay-a-state) feature allows you to resume or replay a conversation from any specific point in its history.

You can initiate a replay by passing the `state["config"]` back to the agent. Note that the execution resumes exactly where it was left off, executing a tool call.

In [21]:
state["config"]

In [22]:
for state_values in agent.stream_query(
    input=None,  # resume
    stream_mode="values",
    config=state["config"],
):
    langchain_load(state_values["messages"][-1]).pretty_print()

### Branching

LangGraph's [Branching](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/time-travel/#branch-off-a-past-state) feature allows you to modify and re-run a LangGraph conversation from a specific point in its history (rather than just from the latest state).  This enables the agent to explore alternate trajectories or allows a user to "version control" changes in a workflow.

In this example, you will:
* Update the tool calls from a previous step.
* Call `.update_state` to rerun the step with the updated configuration.

In [23]:
last_message = state["values"]["messages"][-1]
print(last_message)
print(last_message.tool_calls)

Update the tool calls from the previous step.

In [24]:
last_message.tool_calls[0]["args"]["currency_date"] = "2024-09-01"
last_message.tool_calls

Call `.update_state` to rerun the step with the updated configuration.

In [25]:
branch_config = agent.update_state(
    config=state["config"],
    values={"messages": [last_message]},  # the update we want to make
)
branch_config

In [26]:
for state_values in agent.stream_query(
    input=None,  # resume
    stream_mode="values",
    config=branch_config,
):
    langchain_load(state_values["messages"][-1]).pretty_print()

## Deploying the Agent

In [27]:
remote_agent = agent_engines.create(
    LanggraphAgent(
        model="gemini-2.0-flash",
        tools=[get_exchange_rate],
        model_kwargs={"temperature": 0, "max_retries": 6},
        checkpointer_kwargs=checkpointer_kwargs,
        checkpointer_builder=checkpointer_builder,
    ),
    requirements=[
        "google-cloud-aiplatform[agent_engines,langchain]",
        "requests",
    ],
)

remote_agent

## Querying the Remote Agent

### Remote testing

In [28]:
for state_updates in remote_agent.stream_query(
    input=inputs,
    stream_mode="updates",
    config={"configurable": {"thread_id": "remote-streaming-thread-updates"}},
):
    print(state_updates)

In [29]:
for state_values in remote_agent.stream_query(
    input=inputs,
    stream_mode="values",
    config={"configurable": {"thread_id": "remote-human-in-the-loop-overall"}},
):
    print(state_values)

### Reviewing Tool Calls

In [33]:
response = remote_agent.query(
    input=inputs,
    interrupt_before=["tools"],  # Before invoking the tool.
    interrupt_after=["tools"],  # After getting a tool message.
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

In [34]:
response = remote_agent.query(
    input=None,  # Resume (continue with the tool call AI Message).
    interrupt_before=["tools"],
    interrupt_after=["tools"],
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

In [35]:
response = agent.query(
    input=None,  # Resume (continue with the Tool Message).
    interrupt_before=["tools"],
    interrupt_after=["tools"],
    config={"configurable": {"thread_id": "human-in-the-loop-deepdive"}},
)
langchain_load(response["messages"][-1]).pretty_print()

## Cleaning up

After you've finished experimenting, it's a good practice to clean up your cloud resources. You can delete the deployed Agent Engine instance to avoid any unexpected charges on your Google Cloud account.

In [54]:
remote_agent.delete()