# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    """
    list of messages and to keep appending new message
    """
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

# saving checkpoint at each point
memory = SqliteSaver.from_conn_string(":memory:")

#### Persistence
It let's you keep around the state of the agent at that particular point of time.
You can go to that state and continue.

#### Streaming:
We can show singnals at what's happening at that time.



In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        
        # adding checkpoint to save the state at a particular time.
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

Streaming:<br>
We can show singnals at what's happening at that time.

<ul>
    <li>Individual message</li>
    <li>Observation message: reprsents the results of taking that action</li>
    <li>Token: For each token we might want to stream the output</li>
</ul>

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [9]:
# this will be use to keep track of different threads
# it will track multiple conversation going on
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ltQ25WcvK8dQQ482FPX8X18r', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_4008e3b719', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-32da9ee4-d3de-4d7e-b7d5-5dd3754ab04a-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_ltQ25WcvK8dQQ482FPX8X18r'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_ltQ25WcvK8dQQ482FPX8X18r'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America

In [11]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_QOKkQk3GmmrxUBjDNeX02Xba', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 869, 'total_tokens': 891}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-ae5b80cc-2a41-4344-8650-f16fd7278fba-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_QOKkQk3GmmrxUBjDNeX02Xba'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_QOKkQk3GmmrxUBjDNeX02Xba'}
Back to the model!
{'messages': [ToolMessage(content="[{'url': 'https://www.wunderground.com/hourly/us/ca/los-angeles/90021/date/2024-07-03', 'content': 'Weather Underground provides local & long-range w

In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Mddb1F0FwcU90CHWAVIuxqjq', 'function': {'arguments': '{"query": "current temperature in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}, {'id': 'call_6MJdDXwr0jtAfEJvB9eBdhdq', 'function': {'arguments': '{"query": "current temperature in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 60, 'prompt_tokens': 1163, 'total_tokens': 1223}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-7194bacb-e4c2-40a7-b806-b01cbd034564-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current temperature in San Francisco'}, 'id': 'call_Mddb1F0FwcU90CHWAVIuxqjq'}, {'name': 'tavily_search_results_json', 'args': {'query': 'current temperature in Los Angeles'}, 'id': 'call_6MJdDXwr0jtAfEJvB9eBdhdq'}])]}
Calling: {'name

In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='I need more context to answer your question. Are you asking about the temperature comparison between two specific locations, items, or something else? Please provide more details so I can assist you better.', response_metadata={'token_usage': {'completion_tokens': 39, 'prompt_tokens': 149, 'total_tokens': 188}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_d576307f90', 'finish_reason': 'stop', 'logprobs': None}, id='run-87df8971-4f32-4a08-8a75-98defd1f5fe9-0')]}


## Streaming tokens

In [14]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# Asyn checkpoint
memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [15]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_ZOZAMXBB7lkLKIVtViY2RCR0'}
Back to the model!
The| current| weather| in| San| Francisco| is| partly| cloudy| with| a| temperature| of| |20|.|6|°C| (|69|.|1|°F|).| The| wind| is| blowing| from| the| north| at| |2|.|2| mph| (|3|.|6| k|ph|)| with| gust|s| up| to| |8|.|9| mph| (|14|.|4| k|ph|).| The| humidity| is| at| |65|%,| and| the| visibility| is| |16| kilometers| (|9| miles|).|