# AI Agents in LangGraph – Lesson 4

**Objective**: Explore the concepts of persistence and streaming in LangGraph to enable long-running agents with persistent memory and real-time streaming output.

LangGraph supports:
- ✅ **Persistence**: Save and resume conversation state using a checkpointer.
- ✅ **Streaming**: View intermediate reasoning steps and token-level output in real-time.


## 1. Setup: Imports, API Keys, and Tools

In [1]:
# !pip install -q langchain langgraph tavily-python aiosqlite

from dotenv import load_dotenv
import operator
from typing import TypedDict, Annotated

from langgraph.graph import StateGraph, END
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

# Load environment variables for Tavily search tool
load_dotenv()
tool = TavilySearchResults(max_results=2)

## 2. Defining Persistent Agent State

In [2]:
# Define the state of the agent - annotate the messages list to enable persistence
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

## 3. Defining the Agent with Persistence Support

In order to define the agent with persistence support, the concept of checkpointer has been introduced. Checkpointer is a function that takes the state of the agent and returns a string that can be used to restore the state of the agent, for example to continue a conversation.

In [3]:
# Define memory as a checkpointer to persist the state of the agent

# In-memory SQLite checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

In [4]:
from langgraph.checkpoint.sqlite import SqliteSaver

# Define the agent class with additional checkpointing support
class Agent:
    def __init__(self, model, tools, checkpointer, system=""): # added ckeckpointer as input
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        # only modification is to pass the checkpointer to the graph
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        return {"messages": [self.model.invoke(messages)]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        return {'messages': results}

## 4. Using the Agent with Persistent Memory

In [5]:
# Define the agent with prompt, model and checkpointer as the memory defined above
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!"""

model = ChatOpenAI(model="gpt-4o")
# only modification is to add checkpointer to the agent
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

## 5. Streaming Messages with Persistent Context

A key concept is streaming, both of individual messages or of output tokens

In [6]:
# Example of streaming individual messages
messages = [HumanMessage(content="What is the weather in SF?")]
# Definition of thread configuration to keep track of different threads,
# for example allowing multiple conversations
thread = {"configurable": {"thread_id": "1"}}

# Call the graph with the messages and thread and .stream not .invoke as before
# loop over a stream of events and print the messages
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_cCD4LJhJTCk8IAnBuFJTVQnH', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 151, 'total_tokens': 174, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_90122d973c', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-5c370c9b-2a23-4d83-8c55-a16d19815fbc-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_cCD4LJhJTCk8IAnBuFJTVQnH'}])]
[ToolMessage(content='[{\'url\': \'https://weathershogun.com/weather/usa/ca/san-francisco/480/may/2025-05-09\', \'content\': \'San Francisco, Califo

The output include: AIMessage which is output of the LLM with the tool call, TooMessage which is the output of the search tool, and finally AIMessage which is the final LLM output.

### Follow-up: Asking a question in the same thread

In [7]:
# Follow-up question without specifying the weather explicitly: the agent remembers the previous conversation
messages = [HumanMessage(content="What about in LA?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_mXS0smAXFFHNAeiMLmZ4CA8p', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 768, 'total_tokens': 791, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_90122d973c', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-736bfccf-e035-4c8f-963c-2b05a2d05416-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_mXS0smAXFFHNAeiMLmZ4CA8p'}])]}
{'messages': [ToolMessage(content='[{\'url\': \'https://weathershogun.com/weather/usa/ca/los-angeles/451/may/2025-05-09\', \'content\': \'

In [8]:
# Follow-up question on the same thread: the agent has access to all the previous messages
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Currently, Los Angeles is warmer than San Francisco. The temperature in Los Angeles is 57.9°F (14.4°C), whereas in San Francisco, it is 52°F (11.1°C).', response_metadata={'token_usage': {'completion_tokens': 45, 'prompt_tokens': 1393, 'total_tokens': 1438, 'prompt_tokens_details': {'cached_tokens': 1280, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_90122d973c', 'finish_reason': 'stop', 'logprobs': None}, id='run-29cd55fd-5159-454c-a728-0713a985f3b0-0')]}


### Changing Thread ID: No Memory Available

In [9]:
# Change the thread ID to a new value: the agent has no memory of the previous conversation
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please provide more context or specify the locations, regions, or objects you are comparing in terms of warmth?', response_metadata={'token_usage': {'completion_tokens': 25, 'prompt_tokens': 149, 'total_tokens': 174, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_90122d973c', 'finish_reason': 'stop', 'logprobs': None}, id='run-e0aa1f16-5777-406d-9c2e-38fa9ddd144e-0')]}


## 6. Streaming Tokens

In [10]:
import warnings
from langchain_core._api.beta_decorator import LangChainBetaWarning

# Ignore only this specific beta warning - beta feature for streaming
warnings.filterwarnings("ignore", category=LangChainBetaWarning)

In [11]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# Create a memory for the agent with AsyncSqliteSaver
memory = AsyncSqliteSaver.from_conn_string(":memory:")
# Create the agent with checkpointer
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [12]:
# Define human message
messages = [HumanMessage(content="What is the weather in SF?")]
# Create a thread with new id 4 to start conversation from scratch
thread = {"configurable": {"thread_id": "4"}}

# Stream events
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

The| current| weather| in| San| Francisco| is| fog|gy| with| a| temperature| of| |11|.|1|°C| (|52|°F|).| The| wind| is| coming| from| the| southwest| at| |2|.|2| mph| (|3|.|6| k|ph|),| and| the| humidity| is| |93|%.| The| visibility| is| |16| km| (|9| miles|).|

In [13]:

import asyncio

# Define a function to stream tokens
async def stream_tokens():
    messages = [HumanMessage(content="What is the weather in SF?")]
    # Create a thread with new id 4 to start conversation from scratch
    thread = {"configurable": {"thread_id": "4"}}

    # Stream events
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        if event["event"] == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="|") # the pipe can be removed for production 

# To run: uncomment below line in a full async-capable environment
# asyncio.run(stream_tokens())

## ✅ Summary
- **Checkpointer** lets your agent pause/resume conversations.
- **Thread ID** scopes the conversation history.
- **Streaming** shows live updates—either full messages or token-by-token.

Persistence is also very important for human-in-the-loop interactions (subject to next lesson).