# Agent Streaming and Responses

Master streaming modes and structured outputs for production agents.

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
from scripts import base_tools

In [None]:
model = ChatGoogleGenerativeAI(model='gemini-2.5-flash')

conn = sqlite3.connect("data/streaming_agent.db", check_same_thread=False)
checkpointer = SqliteSaver(conn=conn)

agent = create_agent(
    model=model,
    tools=[base_tools.web_search],
    checkpointer=checkpointer
)

## Standard Invoke

In [None]:
# Wait for complete response
response = agent.invoke(
    {'messages': [HumanMessage('Tell me about Apple')]},
    config={'configurable': {'thread_id': 'invoke_session'}}
)

response['messages'][-1].text

## Stream Mode: Messages

Stream individual messages as generated.

In [None]:
config = {'configurable': {'thread_id': 'stream_messages'}}

for chunk in agent.stream(
    {'messages': [HumanMessage('Search for tech news')]},
    stream_mode='messages',
    config=config
):
    print(chunk)

## Stream Mode: Updates

Stream state updates after each step.

In [None]:
config = {'configurable': {'thread_id': 'stream_updates'}}

for chunk in agent.stream(
    {'messages': [HumanMessage('Search for Microsoft news')]},
    stream_mode='updates',
    config=config
):
    print(chunk)

## Stream Mode: Values

Stream complete state after each step.

In [None]:
config = {'configurable': {'thread_id': 'stream_values'}}

for step, chunk in enumerate(agent.stream(
    {'messages': [HumanMessage('What is AI?')]},
    stream_mode='values',
    config=config
)):
    print(f"Step {step}: {len(chunk.get('messages', []))} messages")

## Structured Output

Return type-safe Pydantic models from agents.

In [None]:
from pydantic import BaseModel, Field
from typing import Optional

class FinancialAnalysis(BaseModel):
    company: str = Field(description="Company name")
    stock_symbol: str = Field(description="Stock ticker")
    current_price: Optional[str] = Field(description="Current price", default=None)
    analysis: str = Field(description="Brief analysis")
    recommendation: str = Field(description="Buy/Hold/Sell")

FinancialAnalysis.model_json_schema()

In [None]:
from langchain.agents.structured_output import ToolStrategy

structured_agent = create_agent(
    model=model,
    tools=[base_tools.web_search],
    response_format=ToolStrategy(FinancialAnalysis)
)

response = structured_agent.invoke({
    'messages': [HumanMessage('Analyze Tesla stock')]
})

response['structured_response']

In [None]:
# Access structured data
data = response['structured_response']
print(f"Company: {data.company}")
print(f"Symbol: {data.stock_symbol}")
print(f"Recommendation: {data.recommendation}")
print(f"\nAnalysis:\n{data.analysis}")

In [None]:
# Convert to JSON
data.model_dump_json(indent=2)

## Stream Mode Comparison

| Mode | Use Case | Best For |
|------|----------|----------|
| **messages** | Real-time chat | Displaying as they arrive |
| **updates** | Debugging | Tracking decisions |
| **values** | Progress tracking | State evolution |

## Key Takeaways

- Streaming improves UX with real-time feedback
- Messages mode for chat interfaces
- Updates mode for debugging
- Values mode for state tracking
- Structured output ensures type safety
- ToolStrategy works with any model

In [None]:
# Exercise: Create custom Pydantic model
