# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [9]:
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_831e067d82', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-3dbb256a-185c-46e0-bf8e-4ffc6a9776bb-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}
Back t

In [11]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_lHKBApysD6DmKfU6fHh9VHJ8', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 1045, 'total_tokens': 1067, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_65564d8ba5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-acb1e406-1970-48ab-9f01-f4a84f5ad338-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_lHKBApysD6DmKfU6fHh9VHJ8'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_lHKBApysD6DmKfU6fHh9VHJ

In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Based on the general climate data for October, Los Angeles tends to be warmer than San Francisco. Los Angeles typically has temperatures with lows of around 18°C (64°F) and highs up to 27°C (81°F). In contrast, San Francisco generally experiences lows of around 13°C (55°F) and highs up to 23°C (73°F). Therefore, Los Angeles is usually warmer than San Francisco.', response_metadata={'token_usage': {'completion_tokens': 86, 'prompt_tokens': 1865, 'total_tokens': 1951, 'prompt_tokens_details': {'cached_tokens': 1792, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_65564d8ba5', 'finish_reason': 'stop', 'logprobs': None}, id='run-4a7a9836-1a61-4ef0-8e9d-2c980fd130a7-0')]}


In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Could you please clarify what you're comparing to determine which is warmer? Are you comparing two specific locations, types of clothing, materials, or something else? Let me know so I can provide the appropriate information.", response_metadata={'token_usage': {'completion_tokens': 43, 'prompt_tokens': 149, 'total_tokens': 192, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_f9f4fb6dbf', 'finish_reason': 'stop', 'logprobs': None}, id='run-c6b10b47-3f2c-4c9c-9f6a-f82751a739a4-0')]}


## Streaming tokens
⚡ Why token streaming is often preferred
1. Faster perceived latency

Token streaming gives the illusion of speed.
The first token might appear after 200 ms instead of waiting 5 seconds for the full message.

Users can start reading or reacting immediately.

2. Better interactivity / responsiveness

With token streaming, your app can:

Interrupt or stop generation mid-way (like “Stop generating” in ChatGPT)

Display live UI updates (progress indicators, typing effect)

Dynamically adjust tone, prompt, or tool usage as tokens arrive

This is essential for:

Chatbots

Voice assistants

Realtime coding assistants

Live reasoning/chain-of-thought visualizations


3. Granular control and analytics

You can:

Capture per-token timing for latency profiling

Analyze or censor tokens before displaying them

Log token-by-token deltas for replay, debugging, or incremental saving (using async memory savers like AsyncSqliteSaver)

4. Scalable architecture for long outputs

Token streaming allows you to process and transmit large outputs incrementally, without storing the entire message in memory at once.
That means:

Lower memory footprint

More robust for long documents or model responses

💬 Why you might still choose message streaming

Message-level streaming (or non-streaming) is simpler when:

You don’t need real-time updates.

You’re dealing with short or predictable outputs.

You prefer atomic transactions (e.g., you only save once the message is complete).

The client (UI) or backend doesn’t support streaming sockets or async pipelines.

💡 When you’d prefer message streaming
1. Backend pipelines / APIs

If you’re building an API endpoint that:

Takes a request,

Generates a model response,

Returns it as a JSON or text payload,

then message streaming simplifies everything.

Example:
A backend that powers a chatbot UI (where the client doesn’t support websockets or streams) — you just send the full message once it’s done.

Why:

Easier to manage errors (no partial responses)

Works well with HTTP/REST (single response)

Easier logging and retry logic

✅ Example use case:

An internal ML microservice that classifies or summarizes input text and returns the full output in one call.

2. Batch or offline processing

When you’re generating text for many samples at once — e.g.:

Summarizing thousands of documents,

Labeling data,

Generating synthetic datasets,

streaming token-by-token doesn’t help — you just want the final outputs.

✅ Example use case:

Generate product descriptions for a catalog of 10,000 items overnight.

3. Evaluation and scoring tasks

If your system compares whole responses (e.g., BLEU, ROUGE, or factuality evaluation), it needs complete outputs.

Why: Partial outputs have no evaluation meaning.

✅ Example use case:

Comparing full model completions in a research experiment or RLHF dataset.

4. Deterministic transaction workflows

If your model’s response triggers a single atomic action — e.g.:

Execute a SQL query

Make an API call

Send an email

you must ensure you have the entire valid output before acting.
Streaming partial tokens could be dangerous or invalid.

✅ Example use case:

An AI agent that outputs structured JSON for automation (you can’t parse half a JSON).

5. Simpler architecture / less async complexity

Token streaming requires async event loops, WebSockets, or streaming callbacks.
If your infrastructure doesn’t support that (e.g., synchronous web servers, CLI tools, or certain cloud environments), message streaming is more stable.

✅ Example use case:

A simple Flask API or batch script running model completions in sequence.

In [None]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
# This is because you dont want to block the actual text generation. 
memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_GiHnpbt7P6kuX4g2n6imqDXI'}
