In [None]:
# Install required packages (uncomment if needed)
# !pip install langgraph-checkpoint-sqlite
# !pip install --upgrade langgraph langgraph-checkpoint-sqlite  # If you get AttributeError


# Lesson 4: Persistence and Streaming

Persistance: track states allowing you to go back to it and run forward
Streaming: signals demosntrating inner works/ progress

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

  tool = TavilySearchResults(max_results=2)


In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [15]:
# Addition of persistence - via a checkpoint
# FIXED: SqliteSaver.from_conn_string() returns a context manager in newer versions
# We need to create the connection manually or use MemorySaver

try:
    from langgraph.checkpoint.sqlite import SqliteSaver
    import sqlite3
    
    # Create SQLite connection manually (works for in-memory)
    # check_same_thread=False allows use across different threads
    conn = sqlite3.connect(":memory:", check_same_thread=False)
    memory = SqliteSaver(conn)
    print("✓ SQLite checkpointer created successfully")
    
except Exception as e:
    print(f"Error creating SqliteSaver: {e}")
    print("Falling back to MemorySaver (works but doesn't persist between sessions)...")
    from langgraph.checkpoint.memory import MemorySaver
    memory = MemorySaver()
    print("✓ Using MemorySaver (in-memory only)")

✓ SQLite checkpointer created successfully


In [14]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer) #checkpointer is the memory
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [16]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [17]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [18]:
thread = {"configurable": {"thread_id": "1"}} #allows multiple convos - good for production 

In [19]:
#Streaming
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_34b44b8219', 'id': 'chatcmpl-CbBtMhMiSWtxpgTXCZfgWME0gk5gn', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--db9d39ef-2759-4d5d-aace-2fcd6295e551-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_ZQdkqLCBFuuuQ78ClSCR6Zmi', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 22, 'total_tokens': 173, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 

In [20]:
#Followup question: same memory/ thread if therefore should know we are talking about the weather
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 1096, 'total_tokens': 1118, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 1024}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_cbf1785567', 'id': 'chatcmpl-CbBtp2hrr09Y20MnzkkpmRC2nhVue', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--0ae3dbfb-97f7-455d-81ec-eea23128e9c9-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_cIzlXG8AQ0onnHbRmE2Ygsl0', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1096, 'output_tokens': 22, 'total_tokens': 1118, 'input_token_details': {'audio': 0, 'cache_read': 1024}, 'output_token_details': {'aud

In [None]:
#Using saem thread id - access history
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

In [21]:
#Different thread, therefore no access to history
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please provide more information about what you are comparing to determine which is warmer? Are you comparing two locations, two pieces of clothing, or something else?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 33, 'prompt_tokens': 149, 'total_tokens': 182, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_cbf1785567', 'id': 'chatcmpl-CbBuuTV3ZvZ63nxhGOL9k0mQpO1XN', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--8d506ec3-559b-45ee-8d53-0a84a50215e5-0', usage_metadata={'input_tokens': 149, 'output_tokens': 33, 'total_tokens': 182, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0

## Streaming tokens

In [None]:
# Async version for streaming tokens
# FIXED: Same issue - from_conn_string() returns context manager
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
import aiosqlite

try:
    # Create async SQLite connection manually
    # Note: For async, we need to use aiosqlite
    # In practice, you'd create the connection in an async context
    # For this example, we'll use a workaround
    import asyncio
    
    async def create_async_memory():
        conn = await aiosqlite.connect(":memory:")
        return AsyncSqliteSaver(conn)
    
    # For notebook use, we'll create it synchronously
    # In production, you'd handle this properly with async context
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    conn = loop.run_until_complete(aiosqlite.connect(":memory:"))
    memory = AsyncSqliteSaver(conn)
    print("✓ Async SQLite checkpointer created")
    
except Exception as e:
    print(f"Error creating AsyncSqliteSaver: {e}")
    print("Using regular MemorySaver instead...")
    from langgraph.checkpoint.memory import MemorySaver
    memory = MemorySaver()

abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")