## 🦜🔗 Langgraph Agenten, die vor Toolbenutzung nachfragen und Checkpoints


Manchmal möchte man, dass ein Agent etwas tun darf, aber nur nach Rückfrage. Typisch Anwendungsfälle sind z.B. Email-Versand oder der Zugriff auf das Betriebssystem.

In diesem Notebook wollen wir zwei Möglichkeiten untersuchen, dies zu tun.

- Naiver Ansatz. Wir bauen ein Terminalprompt ein. Und zwar im Codefluss genau vor der Stelle, wo die Anwendung kritische Berechtigungen braucht.
- Lösung mit LangGraph-Checkpoints. Dieser Teil ist z.T. sehr detailreich. Man muss sich wirklich nicht alles davon merken.

### Lösung 1. Mit einem Terminalprompt


In [None]:
from langchain.tools.shell import ShellTool
from helpers import llm

tools = [ShellTool()]

In [None]:
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages


class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

#### Wenn das Tool "terminal" aufgerufen wird, soll es mit einer Nutzerabfrage bestätigt werden.


In [None]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import chain
from langgraph.graph import END
from typing import Literal


@chain
def tool_executor(tool_call):
    tool = {tool.name: tool for tool in tools}[tool_call["name"]]
    return ToolMessage(
        tool.invoke(tool_call["args"]),
        tool_call_id=tool_call["id"],
        name=tool_call["name"],
    )


def agent(state):
    messages = state["messages"]
    response = llm(model="gpt-4o").bind_tools(tools).invoke(messages)
    return {"messages": [response]}


def call_tools(state):
    tool_calls = state["messages"][-1].tool_calls
    response = []
    for tool_call in tool_calls:
        if tool_call["name"] == "terminal":
            feedback = input(
                prompt=f"[y/n] continue with shell execution: {tool_call['args']['commands']}?"
            )
            if feedback == "y":
                response.append(tool_executor.invoke(tool_call))
            else:
                output = "Your terminal command was not permitted by the user. Try a different terminal command or return unfinished."
                response.append(
                    ToolMessage(
                        output, tool_call_id=tool_call["id"], name=tool_call["name"]
                    )
                )
        else:
            response.append(tool_executor.invoke(tool_call))
    return {"messages": response}


def should_continue(state) -> Literal["call_tools", END]:  # type: ignore
    return "call_tools" if state["messages"][-1].tool_calls else END

In [None]:
from langgraph.graph import StateGraph

graph_builder = StateGraph(AgentState)

graph_builder.add_node("agent", agent)
graph_builder.add_node("call_tools", call_tools)

graph_builder.set_entry_point("agent")

graph_builder.add_conditional_edges("agent", should_continue)

graph_builder.add_edge("call_tools", "agent")

human_feedback_graph = graph_builder.compile()

In [None]:
from IPython.display import Image, display

display(Image(human_feedback_graph.get_graph().draw_mermaid_png()))

In [None]:
from langchain.schema import HumanMessage


inputs = {
    "messages": [
        HumanMessage(
            content="Count the lines of all python notebooks in the current directory. Use simple shell commands."
        )
    ]
}

In [None]:
for event in human_feedback_graph.stream(inputs, stream_mode="values"):
    message: BaseMessage = event["messages"][-1]
    message.pretty_print()

Das hat funktioniert

Allerdings muss nun unsere App den GraphState und alle Objekte so lange im Memory behalten, bis ein Nutzer endlich die Rückfrage beantwortet. Asynchron ist das ganz schön blöd.

## Lösung 2. LangGraph- Checkpoints

Checkpoints sind ein essentieller Baustein von Langgraph. Bis jetzt haben wir noch nichts davon mitbekommen. Was tun Checkpoints und wozu brauchen wir die?

Weil das Konzept für eine reale App mit realen Nutzern sehr schnell relevant wird, erläutern wir es hier grob.

Wenn eine LangGraph-App mit einem Knoten fertig ist und nachschaut, wohin sie jetzt weiterhüpft (zu welchem Knoten), speichert sie erst einmal den State (und noch andere Dinge) in einen Checkpoint. Das passiert alles in einem kleinen Memory-Objekt und muss uns nicht weiter interessieren. Der nächste Knoten liest dann aus dem Checkpoint des letzten Knoten aus und setzt daran an. Dieses Memory-Objekt ist allerdings volatil und wird gelöscht, nachdem der Graph fertig durchgelaufen ist.

Man kann aber auch den State über Graph invokations hinweg persistieren. Z.B. in SQL, Redis, etc...

Was bringt das?

Nun kann ein Nutzer eine App, die ihm zu lange braucht, terminieren. Bisher abgelaufene Zwischenstände gehen nicht verloren. Er kann die App dann entweder neu starten oder auf dem letzten Zwischenstand aufsetzen.
Man kann der App befehlen, vor einem bestimmten Knoten immer zu terminieren. Der Zwischenstand speichert sich automatisch und der Nutzer muss dann die App erneut dort aufrufen (Das bauen wir jetzt).
Man kann verteilte Systeme bauen, in denen Komponenten ihren Arbeitstand untereinander mittels der Datenbank austauschen.
Man kann die Checkpoints auch für die Chathistory verwenden. Damit hat man dann einen Chatbot mit Gedächtnis.


In [None]:
def unsafe_call_tools(state):
    last_message = state["messages"][-1]
    return {"messages": tool_executor.batch(last_message.tool_calls)}


def should_continue(state) -> Literal["action", END]:  # type: ignore
    return "action" if state["messages"][-1].tool_calls else END

### Jetzt kompilieren wir erneut

Diesmal mit

unsafe_execute_tools
Checkpointer
Interrupt vor der "action"-Node


In [None]:
# Wir nehmen hier einfach den In-Memory Checkpointer und keine umständliche Datenbank
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()

graph_builder = StateGraph(AgentState)

graph_builder.add_node("agent", agent)
graph_builder.add_node("action", unsafe_call_tools)
graph_builder.set_entry_point("agent")
graph_builder.add_conditional_edges("agent", should_continue)
graph_builder.add_edge("action", "agent")

checkpoint_agent_executor = graph_builder.compile(
    checkpointer=memory, interrupt_before=["action"]
)
checkpoint_agent_executor.stream_mode = "values"

In [None]:
config = {"configurable": {"thread_id": "3"}}
for event in checkpoint_agent_executor.stream(inputs, config):
    message: BaseMessage = event["messages"][-1]
    message.pretty_print()

#### Oh, jetzt hat er tatsächlich abgebrochen. Mal sehen, was der State ist:


In [None]:
current_state = checkpoint_agent_executor.get_state(config)
for message in current_state.values["messages"]:
    message.pretty_print()

#### Wir können auch sehen, was die nächste Node im Flow wäre.


In [None]:
current_state.next

#### Weiter ausführen geht mit None


In [None]:
for event in checkpoint_agent_executor.stream(None, config):
    message: BaseMessage = event["messages"][-1]
    message.pretty_print()

In [None]:
current_state = checkpoint_agent_executor.get_state(config)
current_state.next

## ✅ Aufgabe

### State modifizieren


In [None]:
config = {"configurable": {"thread_id": "17"}}
for event in checkpoint_agent_executor.stream(inputs, config):
    message: BaseMessage = event["messages"][-1]
    message.pretty_print()

In [None]:
current_state = checkpoint_agent_executor.get_state(config)
current_state.values

### Wir überschreiben jetzt einfach das Terminal Tool Call Argument im State...


In [None]:
new_tool_call = "echo 'YOUR CREATIVE ECHO MESSAGE GOES HERE'"

current_state.values["messages"][-1].tool_calls[0]["args"]["commands"] = [new_tool_call]

checkpoint_agent_executor.update_state(config, current_state.values)

checkpoint_agent_executor.get_state(config).values

In [None]:
for event in checkpoint_agent_executor.stream(None, config):
    message: BaseMessage = event["messages"][-1]
    message.pretty_print()