# Human in the loop

---

To enable any human-in-the-loop capabilities, the **first step** is to  
**attach a checkpointer** to the graph.

---

With checkpointing in place, the application can pause, resume, inspect,  
or fork based on human decisions, enabling interactive and controlled execution of LLM workflows.

In [3]:
import ast
from typing import Annotated, TypedDict

from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.documents import Document
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_core.vectorstores.in_memory import InMemoryVectorStore
from langchain_openai import AzureChatOpenAI
from langchain_ollama import OllamaEmbeddings

from langgraph.graph import START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver

@tool
def calculator(query: str) -> str:
    """A simple calculator tool. Input should be a mathematical expression."""
    return ast.literal_eval(query)

search = DuckDuckGoSearchRun()
tools = [search, calculator]

embeddings = OllamaEmbeddings(model="mxbai-embed-large")
model = AzureChatOpenAI(model="gpt-4o", temperature=0.1, azure_deployment="gpt-4o", api_version="2024-10-21") 

tools_retriever = InMemoryVectorStore.from_documents(
    [Document(tool.description, metadata={"name": tool.name}) for tool in tools],
    embeddings,
).as_retriever()

class State(TypedDict):
    messages: Annotated[list, add_messages]
    selected_tools: list[str]

def model_node(state: State) -> State:
    selected_tools = [
        tool for tool in tools if tool.name in state["selected_tools"]
    ]
    res = model.bind_tools(selected_tools).invoke(state["messages"])
    return {"messages": res}

def select_tools(state: State) -> State:
    query = state["messages"][-1].content
    tool_docs = tools_retriever.invoke(query)
    return {"selected_tools": [doc.metadata["name"] for doc in tool_docs]}

builder = StateGraph(State)
builder.add_node("select_tools", select_tools)
builder.add_node("model", model_node)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "select_tools")
builder.add_edge("select_tools", "model")
builder.add_conditional_edges("model", tools_condition)
builder.add_edge("tools", "model")

graph = builder.compile(checkpointer=MemorySaver()) # Add checkpointer

When a checkpointer is attached, the graph returns an instance that:

- **Saves the state** at the end of each step
- Ensures that **subsequent invocations** do not start from scratch

Each time the graph is called:

1. It first uses the **checkpointer** to load the most recent saved state (if available).
2. It then **merges the new input** with this saved state.
3. Only after that does it begin executing the first nodes.

This mechanism is essential for enabling **human-in-the-loop (HITL)** functionality,  
as it allows the system to "remember" and build upon prior steps.

---

**Interrupt Mode**

The simplest form of human control is the **interrupt** mechanism.  
In this mode, the user:

- Watches the **streaming output** in real time
- **Manually interrupts** execution when needed (see Figure 8-3)

When interrupted:

- The graph’s **state is saved** as of the last **fully completed step**
- The user is then presented with options:

---

1. **Resume**  
   Continue execution from the point of interruption.  
   The graph proceeds as if it had never been paused.

2. **Restart**  
   Provide **new input** (e.g., a new message in a chatbot).  
   This **cancels any pending steps** and initiates a new computation from the updated input.

3. **Do Nothing**  
   Simply leave the execution as-is.  
   No further steps will be executed unless triggered again.

---

This interrupt-resume-restart flow provides developers and users  
with fine-grained control over complex LLM workflows,  
helping to improve reliability, responsiveness, and user trust.

Let’s see how to do this in LangGraph:

In [5]:
import asyncio
from contextlib import aclosing # Return an async context manager that calls the aclose() method of thing upon completion of the block.

event = asyncio.Event()

input = {
    "messages": [
        HumanMessage("""How old was the 30th president of the United States 
            when he died?""")
    ]
}

config = {"configurable": {"thread_id": "1"}}

async with aclosing(graph.astream(input, config)) as stream:
    async for chunk in stream:
        if event.is_set():
            break
        else:
            ... # do something with the output

# Somewhere else in your application

event.set()

This functionality relies on using an **event or signal**  
to allow **external control** over interruption—meaning you can halt execution  
even from outside the application itself.

---

### Language-Specific Considerations

**Python**

- Notice the use of `aclosing` in the Python code example.
- This ensures that the **async stream is properly closed** when interrupted.
- Failing to close the stream correctly can result in **resource leaks** or incomplete state persistence.


### Thread Identification with the Checkpointer

When using a **checkpointer**, it is important to provide an **identifier for the current thread**.  
This identifier is used to distinguish one interaction (or session) with the graph from all others.

- This is critical when you are supporting **multiple concurrent users** or workflows.
- It ensures each graph run accesses and resumes the **correct saved state**.

---

Together, these techniques support robust, externally-interruptible LLM workflows  
with proper session handling and graceful stream termination.


A second human-in-the-loop control mode is **authorize**.

In this mode, the user specifies **ahead of time** that they want the application to **pause and request approval** whenever a specific node is about to run.


This is typically used for **tool confirmation**, allowing users to:

- Review tool calls before they are made
- Prevent undesired or high-impact actions
- Insert new guidance when needed

User Options on Pause

When the application pauses before executing the node, the user can:

1. **Resume**  
   Approve the tool call and continue computation as planned.

2. **Redirect**  
   Provide **new input or instructions** to steer the conversation in a different direction.  
   The tool will **not** be called.

3. **Do Nothing**  
   The application remains paused until the user acts.

This mode enhances **trust and control** in higher-agency applications,  
giving users the final say before potentially irreversible or expensive actions.

Here’s the code:


In [6]:
input = {
    "messages": [
        HumanMessage("""How old was the 30th president of the United States when he died?""")
    ]
}

config = {"configurable": {"thread_id": "1"}}

output = graph.astream(input, config, interrupt_before=["tools"])

async for c in output:
    ... # do something with the output

When using the **authorize** control mode, you can configure the graph to:

- **Pause execution** right before entering a specific node (e.g., `tools`)
- Allow for **manual inspection and decision-making** based on the current state

This gives the user a moment to review what’s about to happen,  
and either approve or redirect the flow.

---

### interrupt_before

- This is a **list of node names** where execution should pause.
- The **order of the list does not matter**—interruption will occur before each listed node.
- Example: `interrupt_before=["tools"]`  
  This causes the graph to pause **just before** entering the `tools` node.

This setup is useful for use cases such as tool call approval, safety validation, or manual intervention checkpoints.

---

### Resume

To **resume** from an interrupted state—whether due to an `interrupt` or `authorize` mode—  
you simply **re-invoke the graph** with:

- `null` (JavaScript)
- `None` (Python)

This signals the graph to continue processing the **last valid input**,  
rather than starting a new interaction.

No new user input is required—the graph picks up exactly where it left off,  
based on the **last saved state** in the checkpointer.


In [7]:
config = {"configurable": {"thread_id": "1"}}

output = graph.astream(None, config, interrupt_before=["tools"])

async for c in output:
    ... # do something with the output

**Restart**

If, instead of resuming, you want the **interrupted graph to start over**  
from the beginning—using new input—you can do so easily.

To **restart** execution:

- Simply **invoke the graph with new input**.

In [8]:
input = {
    "messages": [
        HumanMessage("""How old was the 30th president of the United States 
            when he died?""")
    ]
}

config = {"configurable": {"thread_id": "1"}}

output = graph.astream(input, config)

async for c in output:
    ... # do something with the output

When restarting a graph with new input:

- The system will **retain the current state**
- It will **merge that state with the new input**
- Execution will then **restart from the first node**

---

If your goal is to **discard the current state entirely** and start from scratch:

- Simply **change the `thread_id`**
- This triggers a **new interaction**, initialized from a **blank slate**

> Any string value is a valid `thread_id`  
> For best practice, use **UUIDs** or other unique identifiers  
> to ensure thread sessions are clearly separated

This approach gives you control over session scoping—  
letting you restart cleanly when needed, while still leveraging LangGraph’s state persistence when desired.


**Edit State**

There may be situations where you want to **manually update the graph’s state**  
before resuming execution—this is fully supported.

LangGraph provides two key methods for this:

- `get_state`: Allows you to **inspect the current state** of the graph.
- `update_state`: Lets you **modify the state** before continuing.

This is useful for:

- Correcting values
- Injecting new context
- Overriding intermediate outputs

---

Here’s what the typical workflow looks like:

1. Use `get_state(thread_id)` to **fetch the current saved state**
2. Make desired changes to the state dictionary
3. Use `update_state(thread_id, updated_state)` to **persist your changes**
4. Resume or restart the graph as needed

Let’s see what this looks like in code:


In [9]:
config = {"configurable": {"thread_id": "1"}}

state = graph.get_state(config)

# somwthing you want to add or replace
update = {}

graph.update_state(config, update)

{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1f03c0c3-5b5d-626c-800b-632be1932e69'}}

Once you call `update_state`, a **new checkpoint** is created that includes your modifications.  
From this point forward, you can **resume the graph** using this updated state.

> See the “Resume” section for details on how to continue execution after editing state.

---

**Fork**

Another powerful capability is the ability to **browse the history of all past states**  
that the graph has passed through.

From this history, you can:

- **Select any prior state**
- **Resume execution from that point**
- Explore **alternate outputs** or responses

This is especially useful in **creative applications**, where:

- Each run through the graph may result in different behavior or outputs
- You want to compare multiple potential outcomes from the same starting input

This feature allows for experimentation, decision branching, and creative exploration.

Let’s take a look at how to implement this in practice:

In [10]:
config = {"configurable": {"thread_id": "1"}}

history = [
    state for state in
    graph.get_state_history(config)
]

# replay a past state
graph.invoke(None, history[2].config)

{'messages': [HumanMessage(content='How old was the 30th president of the United States \n            when he died?', additional_kwargs={}, response_metadata={}, id='51903e9e-53be-4974-bd75-d348d85ee612'),
  AIMessage(content='The 30th president of the United States was Calvin Coolidge. He was born on July 4, 1872, and died on January 5, 1933. To calculate his age at the time of his death:\n\n1933 - 1872 = 61 years old.\n\nCalvin Coolidge was 61 years old when he died.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 75, 'prompt_tokens': 116, 'total_tokens': 191, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-11-20', 'system_fingerprint': 'fp_ee1d74bde0', 'id': 'chatcmpl-BcFU4sMaeYXsc3kkDdwQdSLTIuJGf', 'service_tier': None, 'prompt_filter_results': [{'prompt

Notice how we **collect the state history** into a `list` (Python) or `array` (JavaScript).  
The `get_state_history` method returns an **iterator** of states to allow for **lazy consumption**.

- The returned states are **sorted** from **most recent to oldest**
- This makes it easy to access the **latest checkpoints first**, which is often the most relevant for resuming or forking

---

The **real strength** of LangGraph's human-in-the-loop capabilities  
comes from your ability to **combine control modes** in a way that best suits your application.

For example, you can:

- Interrupt execution at key nodes
- Require authorization before tool use
- Edit or fork state between steps
- Stream intermediate output for live inspection

This **flexible, composable design** allows developers to build highly responsive,  
interactive, and user-centric LLM applications.

Notice how we collect the **state history** into a `list` or `array` depending on the language used.  
The `get_state_history` method returns an **iterator**, which supports **lazy consumption**.

- The states are sorted from **most recent to oldest**.
- This makes it easy to retrieve the most relevant checkpoints first.

---

The **true power** of human-in-the-loop controls lies in the ability to **mix them**  
in whatever way best suits your application.

---

**Multitasking LLMs**

This section covers how to handle **concurrent input** in LLM applications.  
This is an increasingly relevant issue due to:

- The relatively **high latency** of LLMs, especially for long responses or chained steps.
- The natural evolution of applications toward **more complex use cases** as LLM speed improves.

Even fast models will encounter challenges with concurrency as demand scales—  
just like humans need to prioritize competing tasks.

Let’s walk through the available strategies.

---

**1. Refuse Concurrent Inputs**

- Any input received while another is being processed is **rejected**.
- This is the **simplest approach**, but not ideal for most applications.
- It **offloads concurrency handling** to the caller (user or external system).

---

**2. Handle Independently**

- Treat each new input as an **independent invocation**.
- Creates a new thread (state container) and processes it in parallel.
- Pros:
  - **Scales well** to many users or contexts.
- Cons:
  - May appear as **disconnected interactions** to the user.
  - Not ideal when continuity or context sharing is required.

Use case: Running multiple chat sessions with different users concurrently.

---

**3. Queue Concurrent Inputs**

- New inputs are **queued** and handled in order, one after another.
- Pros:
  - Supports an **unlimited number** of concurrent requests.
  - Simple to implement and reason about.
- Cons:
  - Input may become **stale** by the time it’s processed.
  - The **queue can grow indefinitely** if inputs arrive faster than they are processed.
  - Not suitable when new input depends on the previous output.

---

**4. Interrupt**

- On receiving new input, **abort current execution** and begin processing the new input.
- Variants:
  - **Keep nothing**: Completely discard the previous input and its progress.
  - **Keep the last completed step**: Retain state up to the last successful node.
  - **Keep the last completed and current step**: Try to preserve any partial updates in progress.
  - **Wait for current node to finish**: Resume only after the active node completes.

- Pros:
  - **Faster response** to new input.
  - Reduces risk of stale outputs.
- Cons:
  - Only one input is processed at a time.
  - Can leave the graph in an **inconsistent state** if not properly handled.
  - Output can be **brittle or unpredictable**, depending on when the interruption occurs.

Example: Interrupting a tool call before completion can cause invalid state in some LLMs like OpenAI Chat.

---

**5. Fork and Merge**

- On new input, **fork the current state** and process the new input in parallel.
- Merge final states after each branch completes.

- Requirements:
  - State must be **mergeable**, either:
    - Automatically (e.g., via CRDTs)
    - Or with **manual conflict resolution** by the user.

- Pros:
  - Handles **new input promptly**.
  - Output is **independent of timing**.
  - Supports **arbitrary concurrency**.
- Cons:
  - Requires **designing the state system carefully** to handle merges safely.

This is the **most flexible and powerful strategy** if your app supports conflict resolution.

---

Each of these strategies represents a **design choice** about how to balance:

- Responsiveness
- Consistency
- Complexity
- User experience

Choose the one that best fits your application’s architecture and user expectations.

Let's go back to the main [file](../README.md#what-is-langchain).