# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_tavily import TavilySearch

  class TavilyResearch(BaseTool):  # type: ignore[override, override]
  class TavilyResearch(BaseTool):  # type: ignore[override, override]


In [3]:
tool = TavilySearch(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory_cm = SqliteSaver.from_conn_string(":memory:")
memory = memory_cm.__enter__()

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-5-mini")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [8]:
messages = [HumanMessage(content="What is the weather in San Francisco, California")]

In [9]:
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 1345, 'total_tokens': 1389, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-CwuXrT0d0R17pUiN5qfDVIZE3h5ab', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--019bae55-d2b4-7f51-b439-505a3b372636-0', tool_calls=[{'name': 'tavily_search', 'args': {'query': 'San Francisco weather current temperature forecast', 'search_depth': 'fast', 'time_range': 'day', 'topic': 'general'}, 'id': 'call_pAC4tnPq84bGbOqSxMC4iIMS', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1345, 'output_tokens': 44, 'total_tokens': 1389, 'input_token_details': {'audio': 0, '

In [17]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 112, 'prompt_tokens': 8300, 'total_tokens': 8412, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 64, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 6784}}, 'model_provider': 'openai', 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-CwtyKyhsewtY1hkkFNBnc1JzZkl8D', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--019bae34-3506-7203-b723-920fcdca9a49-0', tool_calls=[{'name': 'tavily_search', 'args': {'query': 'KLAX METAR current', 'search_depth': 'fast', 'topic': 'general', 'include_images': False, 'time_range': 'day'}, 'id': 'call_2KlxTecp5JyMEPFG46sJ5mMM', 'type': 'tool_call'}], usage_metadata={'input_tokens': 8300, 'output_tokens': 112, 'total_tokens': 8412, 'input_token_details': {'

In [11]:
messages = [HumanMessage(content="Which one is warmer?.  You final answer should just state the city and state of the warmer of the two locations.")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Which two locations should I compare? Also, do you want current temperatures or a forecast?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 283, 'prompt_tokens': 4540, 'total_tokens': 4823, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 256, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-CwuYTyHFcoJ09NJ09pHDuTShLasVe', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--019bae56-67da-7160-ba4d-0cae6f771d02-0', usage_metadata={'input_tokens': 4540, 'output_tokens': 283, 'total_tokens': 4823, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 256}})]}


In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='I’m missing the two items you want compared. What are they — clothes (jackets, sweaters), fabrics (wool vs cotton), places (cities, rooms), colors, or something else?\n\nTell me the two specific items (or upload photos) and — if it’s clothing or fabric — any details you have: material, thickness, insulation type/fill power, weave, layers, and intended use. If it’s places, give temperatures or locations. With that I’ll compare which is warmer and why.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 305, 'prompt_tokens': 1341, 'total_tokens': 1646, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 192, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-5-mini-2025-08-07', 'system_fingerprint': None, 'id': 'chatcmpl-CwuYcL4X2hTYidbndqbFW96R0jdjv', 'service_tier': 

## Streaming tokens

In [20]:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

# Use 'async with' to correctly initialize the checkpointer
async with AsyncSqliteSaver.from_conn_string(":memory:") as memory:
    # Compile the graph using the 'memory' object provided by the context manager
    abot = Agent(model, [tool], system=prompt, checkpointer=memory)
    
    messages = [HumanMessage(content="What is current the weather in San Francisco, California?")]
    thread = {"configurable": {"thread_id": "4"}}
    
    # Run your async loop inside the 'async with' block
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|")


Calling: {'name': 'tavily_search', 'args': {'query': 'current weather San Francisco CA', 'search_depth': 'fast', 'topic': 'general', 'include_images': False}, 'id': 'call_jUWLxuGNjciidrFaThQTl1hO', 'type': 'tool_call'}
Back to the model!
Calling: {'name': 'tavily_search', 'args': {'query': 'San Francisco CA current weather site:weather.gov', 'search_depth': 'fast', 'topic': 'general'}, 'id': 'call_U3CZgyu6yiEbxhkGM5D0aI9A', 'type': 'tool_call'}
Back to the model!
Calling: {'name': 'tavily_search', 'args': {'query': 'San Francisco CA current temperature now (Downtown San Francisco)', 'search_depth': 'fast', 'topic': 'general'}, 'id': 'call_5Z5VmqJ97zt5wX58wm1aYn6t', 'type': 'tool_call'}
Back to the model!
Calling: {'name': 'tavily_search', 'args': {'query': 'current weather San Francisco CA now', 'search_depth': 'advanced', 'topic': 'general', 'include_images': False, 'time_range': 'day'}, 'id': 'call_EI4MUHHMrT8WJOg5BI2ETNYq', 'type': 'tool_call'}
Back to the model!
Calling: {'name': '