# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

创建checkpointer，或者称之为memory，一个意思。

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

在Agent类的__init__方法中，添加一个参数checkpointer，在graph.compile中使用这个参数，这样，graph就具有了memory。

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [9]:
thread = {"configurable": {"thread_id": "1"}}

stream传输分为2种，按message的流式传输，按token的流式传输。  

我们先来看按message的流式传输。  

In [None]:
# 这段代码首先打印出AIMessage，其内容指出要调用Tavily工具。
# 然后打印出ToolMessage，其内容是Tavily的搜索结果。
# 然后又打印AIMessage，其内容是LLM的最终回答。
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_831e067d82', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-6d5b0fd7-e3e9-4c0a-9fb8-d5b1873434ba-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_bmfLa92f6oAIKN9KvXtqbKDz'}
Back t

In [None]:
# 做一次追问，这次我们没有提及天气，因为Agent具有memory功能，所以它能分析出我们在问LA的天气如何。
# 首先打印一个AIMessage，其内容是要求调用Tavily工具查询LA的天气。
# 然后打印一个ToolMessage，其内容是Tavily工具的查询结果。
# 最后打印一个AIMessage，其内容是LLM的最终回答。
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_x2vtEruqqwmU7siTj25VXaEq', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 889, 'total_tokens': 912, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_0d4eb8a50b', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-f8f2f660-9bd1-400f-b115-2be13be7ef77-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_x2vtEruqqwmU7siTj25VXaEq'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_x2vtEruqqwmU7siTj25VXaEq'

In [None]:
# 再做一次追问，哪一个更暖和？
# LLM将通过memory的历史聊天记录，回答出LA比SF更暖和。
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Currently, Los Angeles is warmer than San Francisco. Los Angeles has a temperature of 51.1°F (10.6°C), while San Francisco's temperature is 48°F (8.9°C).", response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 1609, 'total_tokens': 1653, 'prompt_tokens_details': {'cached_tokens': 1536, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_0d4eb8a50b', 'finish_reason': 'stop', 'logprobs': None}, id='run-c82bd2fc-1844-4d57-8cef-cc4c39a85149-0')]}


In [None]:
# 如果我们改变线程号，Agent将不知道比较什么更暖和。
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="Could you please clarify what you're comparing to determine which is warmer? Are you comparing two specific locations, types of clothing, materials, or something else? Let me know so I can provide the appropriate information.", response_metadata={'token_usage': {'completion_tokens': 43, 'prompt_tokens': 149, 'total_tokens': 192, 'prompt_tokens_details': {'cached_tokens': 0, 'audio_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'audio_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_f9f4fb6dbf', 'finish_reason': 'stop', 'logprobs': None}, id='run-7bb8b4d1-8c20-4ae5-b041-d1cde97c8e68-0')]}


## Streaming tokens  

为了使用streaming token，我们需要使用graph.astream_events，同时需要使用一个异步检查点AsyncSqliteSaver。

In [14]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":   # kind为on_chat_model_stream代表收到了新的token。
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_ELSmZ2F8EZGoY7UfTT9TT3cV'}
Back to the model!
The| current| weather| in| San| Francisco| is| partly| cloudy| with| a| temperature| of| |48|°F| (|8|.|9|°C|).| The| wind| is| blowing| from| the| southwest| at| |12|.|5| mph| (|20|.|2| k|ph|),| and| the| humidity| level| is| around| |80|%.| The| weather| feels| like| |42|.|7|°F| (|6|°C|)| due| to| wind| chill|.| There| is| no| precipitation| currently|,| and| the| visibility| is| approximately| |16| km| (|9| miles|).|