[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/langchain-ai/langchain-academy/blob/main/module-3/breakpoints.ipynb) [![Open in LangChain Academy](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66e9eba12c7b7688aa3dbb5e_LCA-badge-green.svg)](https://academy.langchain.com/courses/take/intro-to-langgraph/lessons/58239469-lesson-2-breakpoints)

# Breakpoints

## Review

For `human-in-the-loop`, we often want to see our graph outputs as its running. 

We laid the foundations for this with streaming. 

## Goals

Now, let's talk about the motivations for `human-in-the-loop`:

(1) `Approval` - We can interrupt our agent, surface state to a user, and allow the user to accept an action

(2) `Debugging` - We can rewind the graph to reproduce or avoid issues

(3) `Editing` - You can modify the state 

LangGraph offers several ways to get or update agent state to support various `human-in-the-loop` workflows.

First, we'll introduce [breakpoints](https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/breakpoints/#simple-usage), which provide a simple way to stop the graph at specific steps. 

We'll show how this enables user `approval`.

In [None]:
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk langgraph-prebuilt

In [None]:
import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

## Breakpoints for human approval

Let's re-consider the simple agent that we worked with in Module 1. 

Let's assume that are concerned about tool use: we want to approve the agent to use any of its tools.
 
All we need to do is simply compile the graph with `interrupt_before=["tools"]` where `tools` is our tools node.

This means that the execution will be interrupted before the node `tools`, which executes the tool call.

In [None]:
from langchain_openai import ChatOpenAI

def multiply(a: int, b: int) -> int:
    """Multiply a and b.

    Args:
        a: first int
        b: second int
    """
    return a * b

# This will be a tool
def add(a: int, b: int) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
    """
    return a + b

def divide(a: int, b: int) -> float:
    """Divide a by b.

    Args:
        a: first int
        b: second int
    """
    return a / b

tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)

In [None]:
from IPython.display import Image, display

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")

# Node
def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

# Graph
builder = StateGraph(MessagesState)

# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))

# Define edges: these determine the control flow
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
    # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
    tools_condition,
)
builder.add_edge("tools", "assistant")

memory = MemorySaver()
graph = builder.compile(interrupt_before=["tools"], checkpointer=memory)

# Show
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))

In [None]:
# Input
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    event['messages'][-1].pretty_print()

We can get the state and look at the next node to call.

This is a nice way to see that the graph has been interrupted.

In [None]:
state = graph.get_state(thread)
state
# state.next

Now, we'll introduce a nice trick.

When we invoke the graph with `None`, it will just continue from the last state checkpoint!

![breakpoints.jpg](https://cdn.prod.website-files.com/65b8cd72835ceeacd4449a53/66dbae7985b747dfed67775d_breakpoints1.png)

For clarity, LangGraph will re-emit the current state, which contains the `AIMessage` with tool call.

And then it will proceed to execute the following steps in the graph, which start with the tool node.

We see that the tool node is run with this tool call, and it's passed back to the chat model for our final answer.

In [None]:
for event in graph.stream(None, thread, stream_mode="values"):
    event['messages'][-1].pretty_print()

Now, lets bring these together with a specific user approval step that accepts user input.

In [None]:
# Input
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}

# Thread
thread = {"configurable": {"thread_id": "2"}}

# Run the graph until the first interruption
for event in graph.stream(initial_input, thread, stream_mode="values"):
    event['messages'][-1].pretty_print()

# Get user feedback
user_approval = input("Do you want to call the tool? (yes/no): ")

# Check approval
if user_approval.lower() == "yes":
    
    # If approved, continue the graph execution
    for event in graph.stream(None, thread, stream_mode="values"):
        event['messages'][-1].pretty_print()
        
else:
    print("Operation cancelled by user.")

### Breakpoints with LangGraph API

**⚠️ DISCLAIMER**

Since the filming of these videos, we've updated Studio so that it can be run locally and opened in your browser. This is now the preferred way to run Studio (rather than using the Desktop App as shown in the video). See documentation [here](https://langchain-ai.github.io/langgraph/concepts/langgraph_studio/#local-development-server) on the local development server and [here](https://langchain-ai.github.io/langgraph/how-tos/local-studio/#run-the-development-server). To start the local development server, run the following command in your terminal in the `/studio` directory in this module:

```
langgraph dev
```

You should see the following output:
```
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs
```

Open your browser and navigate to the Studio UI: `https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024`.

The LangGraph API [supports breakpoints](https://langchain-ai.github.io/langgraph/cloud/how-tos/human_in_the_loop_breakpoint/#sdk-initialization). 

In [None]:
if 'google.colab' in str(get_ipython()):
    raise Exception("Unfortunately LangGraph Studio is currently not supported on Google Colab")

In [None]:
# This is the URL of the local development server
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")

As shown above, we can add `interrupt_before=["node"]` when compiling the graph that is running in Studio.

However, with the API, you can also pass `interrupt_before` to the stream method directly. 

In [None]:
initial_input = {"messages": HumanMessage(content="Multiply 2 and 3")}
thread = await client.threads.create()
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=initial_input,
    stream_mode="values",
    interrupt_before=["tools"],
):
    print(f"Receiving new event of type: {chunk.event}...")
    messages = chunk.data.get('messages', [])
    if messages:
        print(messages[-1])
    print("-" * 50)

Now, we can proceed from the breakpoint just like we did before by passing the `thread_id` and `None` as the input!

In [None]:
async for chunk in client.runs.stream(
    thread["thread_id"],
    "agent",
    input=None,
    stream_mode="values",
    interrupt_before=["tools"],
):
    print(f"Receiving new event of type: {chunk.event}...")
    messages = chunk.data.get('messages', [])
    if messages:
        print(messages[-1])
    print("-" * 50)

# Notes


## What LangGraph is (in 60 seconds)
* Graphs, not chains. You model your app as a stateful directed graph: nodes are functions (or subgraphs), edges decide what runs next.
* ingle shared state. Nodes read from and return partial updates to a shared state object (usually a TypedDict or Pydantic model). LangGraph merges those partial updates for you.
* erministic control flow. You wire START → nodes → END. You can add conditional edges for branching and subgraphs for composition.
* sistence + threads. Every step is checkpointed; runs are grouped into threads so you can pause/resume later and recover state safely. This is the foundation for human-in-the-loop.  ￼

## Core primitives (the mental model)

1. State

```python
from typing import TypedDict

class State(TypedDict):
    messages: list   # or any keys you need
```
Nodes return partial updates. If a node returns {"messages": [...]}, only messages changes.

2. Nodes

```python
def plan(state: State) -> dict:
    # read state, compute, return an update
    return {"messages": state["messages"] + ["plan ready"]}
```

3. Edges & compilation

```python
from langgraph.graph import StateGraph, START, END

builder = StateGraph(State)
builder.add_node("plan", plan)
builder.add_edge(START, "plan")
builder.add_edge("plan", END)
graph = builder.compile()
```

4. Running

**chronous**: graph.invoke(inputs)
**Streaming**: for chunk in graph.stream(inputs, stream_mode=...): ...

## Streaming 101: stream_mode="values" vs "updates"

LangGraph can stream intermediate progress:
* **"values"** → after each step, you get the entire current state (full snapshot).
* **"dates"** → you get only the delta for that step (incremental updates). If a step runs multiple nodes, you can see multiple small updates.

Typical usage you’ll see in examples:

```python
for event in graph.stream(inputs, thread, stream_mode="values"):
    # event is the full state snapshot after each step

for event in graph.stream(inputs, thread, stream_mode="updates"):
    # event is just the change(s) made in that step
```

### Rule of thumb:
* Use updates for tight progress UIs (less data, easier to diff).
*  values for debugging when you want to inspect the whole state each step.

## Human-in-the-Loop (HITL): the key idea

LangGraph lets you pause the graph at defined points, gather human input/approval, then resume exactly where you left off because state is checkpointed per thread. There are two main patterns:
* **Declarative “breakpoints” around nodes**
Add interrupt_before=["tools"] (or specific node names) so the graph automatically pauses before those nodes run.
* **grammatic interrupts inside a node**
Call interrupt(...) in node code to pause with your own custom payload and wait for a resume.  ￼ ￼

This is especially useful to approve tool calls (e.g., “Send email?”, “Buy flight?”) or to ask the user a clarifying question before continuing.


### Mapping to your uploaded notebook (breakpoints.ipynb)￼

Your notebook demonstrates two complementary flows:

### Local graph streaming + manual approval
    * builds an agent-like graph (LLM + tools), then runs:
```python
for event in graph.stream(initial_input, thread, stream_mode="values"):
    ...

user_approval = input("Do you want to call the tool? (yes/no): ")
if user_approval.lower() == "yes":
    # resume the graph from the pause point
    for event in graph.stream(None, thread, stream_mode="values"):
        ...
else:
    print("Operation cancelled by user.")
```

What’s happening:
    * First .stream(...) progresses until it hits the interrupt (configured to pause before running tools).
    * You ask the human for approval.
    * If approved, call .stream(None, thread, ...) again to resume. Because LangGraph persists checkpoints per thread, it continues from exactly where it paused.

### Platform/SDK streaming with server-side runs

The notebook also shows using the LangGraph SDK:
```python
async for chunk in client.runs.stream(
    thread["thread_id"],
    "agent",
    input=None,
    stream_mode="values",
    interrupt_before=["tools"],
):
    ...
```

    * Same concept, but now the graph is deployed; you stream server events, catch the interrupt event, collect input/approval in your app’s UI, then resume the run.  ￼

These match the official “Add human intervention” and “Human-in-the-loop” docs: pause with interrupts, resume later using the thread’s saved state.

### what it’s teaching you

The “Add human in the loop → Simple usage” guide shows:
* Use interrupts to pause right before tools (or any node) so a human can review/edit.
*  persistence layer (threads/checkpoints) ensures the agent can wait indefinitely and then continue from the exact step later.  ￼

Related, more detailed guides show:
* Breakpoints: explicit pausing points and how they’re built on top of checkpoints/threads.  ￼
* grammatic interrupts: call interrupt() from inside a node to request user input and resume.  ￼
* ceptual overview + blog: why HITL exists and how interrupt evolved.


### Minimal end-to-end example (glued together)

```python
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.prebuilt import ToolNode

class State(TypedDict):
    messages: list

# 1) Define a simple tool
def search_tool(query: str) -> str:
    return f"RESULTS({query})"

tools = [{"name": "search", "description": "web search", "args": {"query": "str"}, "func": search_tool}]
tool_node = ToolNode(tools)

# 2) LLM node that may call the tool
llm = ChatOpenAI(model="gpt-4o-mini")  # example
def agent(state: State):
    # ask the model; bind tools so it can propose a tool call
    res = llm.bind_tools(tools).invoke(state["messages"])
    return {"messages": state["messages"] + [res]}

# 3) Wire graph
builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", "tools")      # tool call proposed → will run tools
builder.add_edge("tools", END)
graph = builder.compile()

# 4) Run with a breakpoint BEFORE tools
thread = {"configurable": {"thread_id": "t1"}}
for ev in graph.stream({"messages":[HumanMessage("search latest LangGraph docs")]},
                       thread,
                       stream_mode="updates",     # or "values"
                       interrupt_before=["tools"] # ← pause here
                       ):
    print(ev)

# ...show proposed tool call to a human...

approved = True  # ← collect from UI
if approved:
    # resume from checkpoint; pass None input to continue
    for ev in graph.stream(None, thread, stream_mode="updates"):
        print(ev)
```
The first stream(...) pauses before tools. You review/edit the tool call.
* The second stream(None, thread, ...) resumes and runs the tool if approved.
(Identical pattern works with the LangGraph SDK on a deployed graph.)  ￼ ￼

￼

Practical tips & gotchas
* Choose a clear state model. Start with a small TypedDict and let nodes update only what they own. This keeps merges predictable.
* Prefer updates for UI progress and logs; switch to values when debugging to see everything.  ￼
* Name your nodes meaningfully and interrupt selectively (e.g., interrupt_before=["tools"] or a particular sensitive node).
* Threads are your friend. Always run with a thread_id so you can pause/resume later without losing context.  ￼
* Programmatic interrupt() is perfect when you need to pause inside logic (e.g., “I need a date range from the user”).  ￼
* Server vs local. Locally, call graph.stream(...). When deployed, use client.runs.stream(...) with the same stream_mode and interrupt_before flags.


What you should take away
1. Model your app as nodes that return partial state updates; wire them with edges; compile.
2. Use stream_mode="updates" for progress UIs; "values" for full snapshots.  ￼
3. Enable HITL with interrupts (before specific nodes or from inside nodes) and resume using the same thread. This is reliable because LangGraph saves checkpoints every step.  ￼ ￼

If you’d like, I can turn your notebook into a cleaner, production-style template (CLI or Streamlit) that shows tool-approval with both local and SDK flows—just say the word.



## Uisng breakpoints with Invoke

```python

from typing import TypedDict, Any
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint import interrupt

# 1) Define your shared state
class State(TypedDict):
    question: str
    date_range: dict | None
    status: str

# 2) Node that pauses to ask a human for info
def ask_for_dates(state: State) -> dict:
    # Halt here and ask the operator for a date range.
    # The *next* input you pass to graph.invoke(..., config=same_thread)
    # will be returned as the value of `interrupt(...)`.
    human_value = interrupt({
        "type": "request",
        "message": "Please provide a date range like {'start':'YYYY-MM-DD','end':'YYYY-MM-DD'}."
    })
    # When resumed, `human_value` is whatever you passed to the next invoke.
    return {"date_range": human_value, "status": "got_dates"}

# 3) A follow-up node that uses the human-provided data
def run_report(state: State) -> dict:
    start = state["date_range"]["start"]
    end = state["date_range"]["end"]
    # ... do work (query, tool call, etc.) ...
    return {"status": f"report_done_for_{start}_to_{end}"}

# 4) Build & compile the graph
builder = StateGraph(State)
builder.add_node("ask_for_dates", ask_for_dates)
builder.add_node("run_report", run_report)
builder.add_edge(START, "ask_for_dates")
builder.add_edge("ask_for_dates", "run_report")
builder.add_edge("run_report", END)
graph = builder.compile()
```

How it works (mental model)
* interrupt(payload) records a checkpoint and yields control back to the caller.
*  first .invoke(initial_input, config) runs until it hits that interrupt and returns early with the partial state.
*  next call — .invoke(human_input, same_config) — resumes the graph right after the interrupt line. Inside the node, interrupt(...) returns the human_input you just passed, so your node
can keep going.

If you have multiple interruptions in a row, each resume call satisfies the next pending interrupt(...).


### Validate the human input (Pydantic)

```python
from pydantic import BaseModel, field_validator

class DateRange(BaseModel):
    start: str
    end: str
    @field_validator("start", "end")
    @classmethod
    def must_be_yyyy_mm_dd(cls, v):
        assert len(v) == 10 and v[4] == "-" and v[7] == "-"
        return v

def ask_for_dates(state: State) -> dict:
    value = interrupt({"type":"request","message":"Provide {'start','end'} in YYYY-MM-DD"})
    dr = DateRange(**value)  # raises if invalid
    return {"date_range": dr.model_dump(), "status": "got_dates"}
```

### Multiple programmatic interrupts in one node

```python
def two_step_approval(state: State) -> dict:
    first = interrupt({"type":"approve", "message":"Approve running the query?"})
    if not first.get("approved"):
        return {"status": "cancelled"}

    second = interrupt({"type":"confirm", "message":"Are you sure? This may incur cost."})
    if not second.get("approved"):
        return {"status": "cancelled"}

    return {"status": "approved"}
```

