> https://docs.langchain.com/oss/python/langgraph/streaming

# Basic usage example

In [8]:
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# The stream() method returns an iterator that yields streamed outputs
for chunk in graph.stream(  
    {"topic": "ice cream"},
    # Set stream_mode="updates" to stream only the updates to the graph state after each node
    # Other stream modes are also available. See supported stream modes for details
    stream_mode="updates",  
):
    print(chunk)

{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}


# Stream multiple modes

You can pass a list as the `stream_mode` parameter to stream multiple modes at once.

In [14]:
for mode, chunk in graph.stream({"topic": "ice cream"}, stream_mode=["updates", "values"]):
    print(mode, chunk)

values {'topic': 'ice cream'}
updates {'refine_topic': {'topic': 'ice cream and cats'}}
values {'topic': 'ice cream and cats'}
updates {'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}
values {'topic': 'ice cream and cats', 'joke': 'This is a joke about ice cream and cats'}


# Stream subgraph outputs

To include outputs from subgraphs in the streamed outputs, you can set `subgraphs=True` in the `.stream()` method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.

In [17]:
from langgraph.graph import START, StateGraph
from typing import TypedDict

# Define subgraph
class SubgraphState(TypedDict):
    foo: str  # note that this key is shared with the parent graph state
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# Define parent graph
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # Set subgraphs=True to stream outputs from subgraphs
    subgraphs=True,  
):
    print(chunk)

((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:509ef785-d223-e58f-b95a-fba73454082e',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:509ef785-d223-e58f-b95a-fba73454082e',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})


## Debugging

Use the `debug` streaming mode to stream as much information as possible throughout the execution of the graph.

In [19]:
for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="debug",
    # Set subgraphs=True to stream outputs from subgraphs
    subgraphs=True,  
):
    print(chunk)

((), {'step': 1, 'timestamp': '2025-11-04T09:47:57.661934+00:00', 'type': 'task', 'payload': {'id': 'bf0f9ffc-f2ad-6b85-55fd-f7bcf6490c4e', 'name': 'node_1', 'input': {'foo': 'foo'}, 'triggers': ('branch:to:node_1',)}})
((), {'step': 1, 'timestamp': '2025-11-04T09:47:57.663390+00:00', 'type': 'task_result', 'payload': {'id': 'bf0f9ffc-f2ad-6b85-55fd-f7bcf6490c4e', 'name': 'node_1', 'error': None, 'result': {'foo': 'hi! foo'}, 'interrupts': []}})
((), {'step': 2, 'timestamp': '2025-11-04T09:47:57.663629+00:00', 'type': 'task', 'payload': {'id': 'e0780287-fa09-2fe2-0ad3-efeb6f1a80e6', 'name': 'node_2', 'input': {'foo': 'hi! foo'}, 'triggers': ('branch:to:node_2',)}})
(('node_2:e0780287-fa09-2fe2-0ad3-efeb6f1a80e6',), {'step': 1, 'timestamp': '2025-11-04T09:47:57.665100+00:00', 'type': 'task', 'payload': {'id': '668f58ba-9245-2c0c-cd13-48b9a0ee89dc', 'name': 'subgraph_node_1', 'input': {'foo': 'hi! foo'}, 'triggers': ('branch:to:subgraph_node_1',)}})
(('node_2:e0780287-fa09-2fe2-0ad3-efeb

# LLM Tokens

Use the `messages` streaming mode to stream Large Language Model (LLM) outputs token by token.

In [35]:
from dataclasses import dataclass
import os
from dotenv import load_dotenv

from langchain_deepseek import ChatDeepSeek
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


model = ChatDeepSeek(model="deepseek-chat")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    # Note that message events are emitted even when the LLM is run using .invoke rather than .stream
    model_response = model.invoke(  
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",  
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

Why| did| the| ice| cream| truck| break| down|?

|Because| it| had| a| rocky| road|!|