Streaming in LangChain v1.x 
------------------------------------------------------
This notebook demonstrates all streaming patterns in modern LangChain:
- Token-level streaming from LLMs
- Agent step streaming with LangGraph
- Custom streaming for tool execution
- Multi-mode streaming for complex UIs


### Token Streaming Fundamentals

When you call an LLM, it generates text sequentially. Streaming exposes
this incremental generation to your application in real-time.

Key concept: Each "chunk" contains a partial response. You accumulate
these chunks to build the complete output.

In [5]:
from langchain_groq import ChatGroq

# Initialize model with streaming enabled (default in v1.x)
llm = ChatGroq(
    model="meta-llama/llama-4-maverick-17b-128e-instruct",
    temperature=0,
    streaming=True  # Explicitly enabled, though it's default
)

# Simple streaming example
print("Streaming tokens from LLM:\n")
for chunk in llm.stream("Explain quantum computing"):
    # Each chunk contains a partial response
    print(chunk.content, end="", flush=True)

print("\n\nDone!")

Streaming tokens from LLM:

Quantum computing! A fascinating and complex topic that has been gaining significant attention in recent years. I'll try to break it down in a way that's easy to understand.

**What is Quantum Computing?**

Quantum computing is a new paradigm for computing that uses the principles of quantum mechanics to perform calculations and operations on data. It's a fundamentally different approach from classical computing, which uses bits to represent information as 0s and 1s.

**Classical Computing vs. Quantum Computing**

Classical computers use "bits" to process information, which can only be in one of two states: 0 or 1. This limits their ability to process complex problems, as they can only perform one calculation at a time.

Quantum computers, on the other hand, use "qubits" (quantum bits), which can exist in multiple states simultaneously, represented by a combination of 0 and 1. This property, known as superposition, allows qubits to process multiple possibili

### Understanding Message Flow

LangChain uses typed messages (AIMessage, HumanMessage, SystemMessage).
When streaming, you receive AIMessageChunk objects that contain:
- content: The actual text/tool calls
- response_metadata: Token counts, model info
- usage_metadata: Input/output token details

In [3]:
from langchain.messages import HumanMessage, SystemMessage

messages = [
    SystemMessage(content="You are a helpful AI assistant specialized in Python."),
    HumanMessage(content="Write a decorator for timing function execution")
]

print("Streaming with structured messages:\n")
full_response = ""

for chunk in llm.stream(messages):
    # Accumulate the full response
    full_response += chunk.content
    print(chunk.content, end="", flush=True)

print(f"\n\nComplete response length: {len(full_response)} characters")
print(f"Response metadata: {chunk.response_metadata.get('model_name', 'N/A')}")

Streaming with structured messages:

Here is an example of a decorator in Python that can be used to time the execution of a function:

```python
import time
from functools import wraps

def timer_decorator(func):
    """
    A decorator to time the execution of a function.
    
    Args:
        func (function): The function to be timed.
    
    Returns:
        function: The decorated function.
    """
    @wraps(func)
    def wrapper_timer(*args, **kwargs):
        """
        The wrapper function that records the start and end time of the function execution.
        
        Args:
            *args: The positional arguments to the function.
            **kwargs: The keyword arguments to the function.
        """
        start_time = time.perf_counter()  # Record the start time
        value = func(*args, **kwargs)  # Execute the function
        end_time = time.perf_counter()  # Record the end time
        run_time = end_time - start_time  # Calculate the execution time
        pr

### Streaming Agent Steps

stream_mode="updates" shows you the agent's reasoning process:
- When it decides to call a tool
- When tools return results  
- When it generates the final response

In [6]:
from langchain.agents import create_agent
from langchain.tools import tool

@tool
def get_weather(city: str) -> str:
    """Get current weather for a city. Use this when users ask about weather."""
    # Simulate API call
    import random
    conditions = ["sunny", "cloudy", "rainy", "windy"]
    temp = random.randint(15, 30)
    return f"Weather in {city}: {random.choice(conditions)}, {temp}°C"

@tool
def get_forecast(city: str, days: int = 3) -> str:
    """Get weather forecast for a city. Use for multi-day predictions."""
    return f"{days}-day forecast for {city}: Mostly sunny with occasional clouds"


# Create agent with our tools
agent = create_agent(
    model=llm,
    tools=[get_weather, get_forecast],
)

# Verify agent structure
print("Agent created successfully")
print(f"Available tools: {[tool.name for tool in [get_weather, get_forecast]]}")


Agent created successfully
Available tools: ['get_weather', 'get_forecast']


In [8]:

query = {"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}]}

print("=== Streaming Agent Progress ===\n")

for chunk in agent.stream(query, stream_mode="updates"):
    for node_name, node_data in chunk.items():
        print(f"\n--- Node: {node_name} ---")
        
        # Extract and display the latest message
        if "messages" in node_data and node_data["messages"]:
            latest_msg = node_data["messages"][-1]
            
            # Check if it's a tool call
            if hasattr(latest_msg, "tool_calls") and latest_msg.tool_calls:
                for tool_call in latest_msg.tool_calls:
                    print(f"Tool: {tool_call['name']}")
                    print(f"Args: {tool_call['args']}")
            
            # Check if it's a tool result
            elif hasattr(latest_msg, "content"):
                print(f"Content: {latest_msg.content}")

print("\n=== Agent execution complete ===")

=== Streaming Agent Progress ===


--- Node: model ---
Tool: get_weather
Args: {'city': 'Tokyo'}

--- Node: tools ---
Content: Weather in Tokyo: sunny, 24°C

--- Node: model ---
Content: The weather in Tokyo is sunny, 24°C.

=== Agent execution complete ===


### Streaming LLM Tokens from Agents

stream_mode="messages" gives you real-time token generation,
but now within an agent context. This shows:
- Tool call chunks being built incrementally
- Final response tokens as they're generated

Use this for ChatGPT-like UIs where users see text appear in real-time.

In [10]:
print("=== Streaming Tokens from Agent ===\n")
for token, metadata in agent.stream(  
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode="messages",
):
    print(f"node: {metadata['langgraph_node']}")
    print(f"content: {token.content_blocks}")
    print("\n")

print("\n\n=== Streaming complete ===")


=== Streaming Tokens from Agent ===

node: model
content: []


node: model
content: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'cj02wd90n'}]


node: model
content: []


node: model
content: []


node: tools
content: [{'type': 'text', 'text': 'Weather in San Francisco: sunny, 28°C'}]


node: model
content: []


node: model
content: [{'type': 'text', 'text': 'The'}]


node: model
content: [{'type': 'text', 'text': ' weather'}]


node: model
content: [{'type': 'text', 'text': ' in'}]


node: model
content: [{'type': 'text', 'text': ' San'}]


node: model
content: [{'type': 'text', 'text': ' Francisco'}]


node: model
content: [{'type': 'text', 'text': ' is'}]


node: model
content: [{'type': 'text', 'text': ' sunny'}]


node: model
content: [{'type': 'text', 'text': ','}]


node: model
content: [{'type': 'text', 'text': ' '}]


node: model
content: [{'type': 'text', 'text': '28'}]


node: model
content: [{'type': 'text', 'text': '°C'}]


node: m

### Custom Streaming - Application-Specific Signals

Use get_stream_writer() inside tools to emit custom progress updates.
This is powerful for long-running operations like:
- Database queries
- File processing
- External API calls

Critical: This only works inside LangGraph execution context.

In [11]:
from langgraph.config import get_stream_writer

@tool
def analyze_data(dataset: str) -> str:
    """Analyze a dataset and return summary statistics."""
    writer = get_stream_writer()
    
    # Simulate multi-step analysis with progress updates
    import time
    
    writer(f"Loading dataset: {dataset}")
    time.sleep(0.5)
    
    writer(f"Preprocessing {dataset}...")
    time.sleep(0.5)
    
    writer(f"Running statistical analysis on {dataset}...")
    time.sleep(0.5)
    
    writer(f"Analysis complete for {dataset}")
    
    return f"Dataset {dataset}: 1000 rows, 15 columns, no missing values"

# Create new agent with analysis tool
analysis_agent = create_agent(
    model=llm,
    tools=[analyze_data]
)

# Stream custom updates
query = {
    "messages": [{"role": "user", "content": "Analyze the sales_2024 dataset"}]
}

print("=== Custom Streaming Example ===\n")

for chunk in analysis_agent.stream(query, stream_mode="custom"):
    print(f"Progress: {chunk}")

=== Custom Streaming Example ===

Progress: Loading dataset: sales_2024
Progress: Preprocessing sales_2024...
Progress: Running statistical analysis on sales_2024...
Progress: Analysis complete for sales_2024


### Combining Multiple Stream Modes

Real applications often need multiple types of feedback:
- Show tokens to users (messages)
- Log agent steps for debugging (updates)
- Display progress bars (custom)

You can stream multiple modes simultaneously by passing a list.

In [12]:
@tool
def fetch_api_data(endpoint: str) -> str:
    """Fetch data from an API endpoint."""
    writer = get_stream_writer()
    import time
    
    writer(f"Connecting to {endpoint}...")
    time.sleep(0.3)
    
    writer(f"Fetching data from {endpoint}...")
    time.sleep(0.3)
    
    writer(f"Processing response...")
    time.sleep(0.3)
    
    return f"Successfully retrieved data from {endpoint}"

# Create agent with enhanced tool
multi_agent = create_agent(
    model=llm,
    tools=[fetch_api_data]
)

query = {
    "messages": [{"role": "user", "content": "Fetch data from /api/users"}]
}

print("=== Multi-Mode Streaming ===\n")

for stream_mode, chunk in multi_agent.stream(
    query,
    stream_mode=["updates", "custom"]  # Multiple modes
):
    print(f"[{stream_mode.upper()}]", end=" ")
    
    if stream_mode == "custom":
        print(chunk)
    elif stream_mode == "updates":
        for node_name in chunk.keys():
            print(f"Node '{node_name}' executed")

print("\n=== Multi-mode streaming complete ===")

=== Multi-Mode Streaming ===

[UPDATES] Node 'model' executed
[CUSTOM] Connecting to /api/users...
[CUSTOM] Fetching data from /api/users...
[CUSTOM] Processing response...
[UPDATES] Node 'tools' executed
[UPDATES] Node 'model' executed

=== Multi-mode streaming complete ===


### Production Streaming Guidelines


DO:
- Use async streaming (astream) in web servers
- Implement timeout handling for streaming calls
- Buffer tokens for better UI performance (e.g., word-by-word vs char-by-char)
- Add custom streaming for operations >1 second
- Use "updates" mode for debugging, "messages" for UIs

DON'T:
- Block the main thread with synchronous streaming
- Stream sensitive data without proper filtering
- Forget to handle stream interruptions/cancellations
- Use custom streaming for fast operations (<100ms)

Memory: stream_mode="messages" is your UI mode
       stream_mode="updates" is your debugging mode
       stream_mode="custom" is your progress bar mode