# Lesson 4: Persistence and Streaming
- Persistence lets you keep around the state of an agent at a particualr point in time. This can let us go back in that state and resume that state in future interaction.
- In Streaming we can emit a list of signals about whats going on at that moment, so that for long running application we know exactly what the agent is exactly doing.

In [1]:
# Load appropriate envrionent variables
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
# import necessary libraries
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
# Create a Tavily search tool
tool = TavilySearchResults(max_results=2)

In [4]:
# Create the agent state
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

- To use persistence, checkpoints are added in langgraph. Checkpointer check states after and beteewn every node.
- To add a persistence we use SqliteSaver. This is a simple checkpointer that uses Sqlite a builtin database under the hood and we will use inmemory database.
- The way to use this checkpointer is that we will pass this in `graph.compile()`

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [7]:
# Create an agent 

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        # here we added the checkpointer
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

## Streaming messages

In [9]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [10]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [11]:
thread = {"configurable": {"thread_id": "1"}}

In [12]:
# Call the graph with stream
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_831e067d82', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-ee83603e-d43c-49a0-9e5b-933325106839-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}
Back t

In [13]:
# lets continue our conversation
messages = [HumanMessage(content="What about in la?")]
# In order to be in same conversation, pass same thread ID.
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_kKKQnrNNnECKp5JBGtkwYqex', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 567, 'total_tokens': 590, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_de57b65c90', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-f89ddd9e-ac62-438d-9abe-18dc3bfb2c06-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_kKKQnrNNnECKp5JBGtkwYqex'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_kKKQnrNNnECKp5JBGtkwYqex'

In [14]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Los Angeles is currently warmer than San Francisco. Los Angeles has a temperature of 55°F, while San Francisco has a temperature of 57°F. However, the "feels like" temperature in Los Angeles is 54°F, which is a bit lower.', response_metadata={'token_usage': {'completion_tokens': 55, 'prompt_tokens': 1171, 'total_tokens': 1226, 'prompt_tokens_details': {'cached_tokens': 1152, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_de57b65c90', 'finish_reason': 'stop', 'logprobs': None}, id='run-bb6dbf4f-9616-42e5-b433-29d010642b9d-0')]}


In [16]:
messages = [HumanMessage(content="Which one is warmer?")]
# If qwe change the thread ID , langague model get confused
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="It seems that your question is missing some context. Could you please provide more details about what you're comparing? For example, are you asking about the weather in two different locations, the warmth of different types of clothing, or something else? Let me know so I can assist you better.", response_metadata={'token_usage': {'completion_tokens': 59, 'prompt_tokens': 203, 'total_tokens': 262, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_f9f4fb6dbf', 'finish_reason': 'stop', 'logprobs': None}, id='run-3f0bc8d6-620f-4893-966d-c0851a021603-0')]}


## Streaming tokens

In [17]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [18]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_Q7Ja7dXreNf611ip7uPszCbI'}
Back to the model!
The| current| weather| in| San| Francisco| is| mostly| cloudy| with| a| temperature| of| |57|.|0|°F| and| wind| speed| of| |9|.|2| mph|.|