# Lesson 4: Persistence and Streaming

In [16]:
from dotenv import load_dotenv

_ = load_dotenv()

In [17]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [18]:
tool = TavilySearchResults(max_results=2)

In [19]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [21]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [22]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [23]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [24]:
messages = [HumanMessage(content="What is the weather in Noakhali?")]

In [25]:
thread = {"configurable": {"thread_id": "1"}}

In [26]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_i961gN1KuorLNkXfFhkSjfuy', 'function': {'arguments': '{"query":"current weather in Noakhali"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 153, 'total_tokens': 176}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_f4e629d0a5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-f593235d-cd08-4b46-ab6d-c33a8dad00b7-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Noakhali'}, 'id': 'call_i961gN1KuorLNkXfFhkSjfuy'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Noakhali'}, 'id': 'call_i961gN1KuorLNkXfFhkSjfuy'}
Back to the model!
[AIMessage(content='The current weather in Noakhali, Bangladesh is as follows:\n\n- **Temperature**: 30.6°C (87.1°F)\n- **Condition**: Patchy rain nearby\n- **Feels Like**: 38.6°C (101.6°F)\n- **Wind**: From

In [27]:
messages = [HumanMessage(content="What about in Barisal?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_jxSb8QCpHBBaBF7b8YDGCksP', 'function': {'arguments': '{"query":"current weather in Barisal"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 773, 'total_tokens': 795}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_9cb5d38cf7', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-03c1ac81-84a0-4230-bcc9-2a4ee92434b8-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Barisal'}, 'id': 'call_jxSb8QCpHBBaBF7b8YDGCksP'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Barisal'}, 'id': 'call_jxSb8QCpHBBaBF7b8YDGCksP'}
Back to the model!
{'messages': [ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'Barisal\', \'region\': \'\', \'country\': \'Bangladesh\', \'lat\': 22.7, \'

In [28]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Based on the current temperatures:\n\n- **Noakhali**: 30.6°C (87.1°F)\n- **Barisal**: 31.4°C (88.5°F)\n\nBarisal is slightly warmer than Noakhali.', response_metadata={'token_usage': {'completion_tokens': 52, 'prompt_tokens': 1405, 'total_tokens': 1457}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_f4e629d0a5', 'finish_reason': 'stop', 'logprobs': None}, id='run-faa8bce9-9319-4022-b597-c548b9b6b46c-0')]}


In [29]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please specify what you are asking about? Are you referring to comparing the temperatures of two specific places, materials, times, or something else? Please provide more details so I can assist you accurately.', response_metadata={'token_usage': {'completion_tokens': 42, 'prompt_tokens': 149, 'total_tokens': 191}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_9cb5d38cf7', 'finish_reason': 'stop', 'logprobs': None}, id='run-8b21c223-c7ec-4503-b65d-ab9c6cd0b05c-0')]}


## Streaming tokens

In [30]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [31]:
messages = [HumanMessage(content="What is the weather in Dhaka")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Dhaka'}, 'id': 'call_ThueI0U9XFfd28Y1e2ad21Lq'}
Back to the model!
The| current| weather| in| Dh|aka| is| as| follows|:

|-| **|Temperature|:**| |32|.|4|°C| (|90|.|3|°F|)
|-| **|Condition|:**| Patch|y| light| rain|
|-| **|Feels| Like|:**| |40|.|7|°C| (|105|.|2|°F|)
|-| **|Humidity|:**| |69|%
|-| **|Wind|:**| |23|.|4| k|ph| (|14|.|5| mph|)| from| the| South|
|-| **|Pressure|:**| |996|.|0| mb|
|-| **|Visibility|:**| |9|.|0| km|
|-| **|UV| Index|:**| |7|

|![|Weather| Icon|](|https|://|cdn|.weather|api|.com|/weather|/|64|x|64|/day|/|293|.png|)|