# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [4]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_ollama import ChatOllama
from langchain_community.tools.tavily_search import TavilySearchResults

In [5]:
tool = TavilySearchResults(max_results=2)

In [6]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [7]:
# Unable to run
# from langgraph.checkpoint.sqlite import SqliteSaver
# memory = SqliteSaver.from_conn_string(":memory:")

from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()

In [8]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_model)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_model(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [9]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
# model = ChatOpenAI(model="gpt-4o")

model = ChatOllama(model = "qwen2.5:7b")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [61]:
messages = [HumanMessage(content="What is the weather in Penang?")]

In [62]:
thread = {"configurable": {"thread_id": "1"}}

In [63]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])
        print()

[AIMessage(content='', additional_kwargs={}, response_metadata={'model': 'qwen2.5:7b', 'created_at': '2025-01-26T04:32:01.6283097Z', 'message': {'role': 'assistant', 'content': '', 'tool_calls': [{'function': {'name': 'tavily_search_results_json', 'arguments': {'query': 'weather in Penang'}}}]}, 'done_reason': 'stop', 'done': True, 'total_duration': 1349569300, 'load_duration': 36663200, 'prompt_eval_count': 244, 'prompt_eval_duration': 156000000, 'eval_count': 27, 'eval_duration': 1145000000}, id='run-7d050fb5-2572-481c-b9b9-ae14c3b0f238-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in Penang'}, 'id': '75781c9d-bd02-405a-b51b-f150705190c0', 'type': 'tool_call'}], usage_metadata={'input_tokens': 244, 'output_tokens': 27, 'total_tokens': 271})]

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in Penang'}, 'id': '75781c9d-bd02-405a-b51b-f150705190c0', 'type': 'tool_call'}
Back to the model!
[ToolMessage(content='[{\'url\': \'

In [64]:
messages = [HumanMessage(content="What about in Kuala Lumpur?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'][0])
        print()

content='' additional_kwargs={} response_metadata={'model': 'qwen2.5:7b', 'created_at': '2025-01-26T04:32:23.6735795Z', 'message': {'role': 'assistant', 'content': '', 'tool_calls': [{'function': {'name': 'tavily_search_results_json', 'arguments': {'query': 'weather in Kuala Lumpur'}}}]}, 'done_reason': 'stop', 'done': True, 'total_duration': 1163833300, 'load_duration': 48326300, 'prompt_eval_count': 1085, 'prompt_eval_duration': 58000000, 'eval_count': 26, 'eval_duration': 1004000000} id='run-17d5e08c-223e-4f7f-b437-056e9e5933c1-0' tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'weather in Kuala Lumpur'}, 'id': 'e6ed0493-0af2-4122-954b-92bb15dd2ee9', 'type': 'tool_call'}] usage_metadata={'input_tokens': 1085, 'output_tokens': 26, 'total_tokens': 1111}

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in Kuala Lumpur'}, 'id': 'e6ed0493-0af2-4122-954b-92bb15dd2ee9', 'type': 'tool_call'}
Back to the model!
content='[{\'url\': \'https://www

In [65]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'][0].content)

Based on the current weather information, Kuala Lumpur is warmer with a temperature of 27.4°C (81.3°F) compared to Penang, which had a maximum temperature forecasted for Sunday, January 26th, 2025, at 30°C (86°F).

Therefore, Penang is expected to be slightly warmer than Kuala Lumpur on that specific day.


In [66]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content="I'm sorry, I need more information to determine which one you're referring to. Could you please specify the two objects or phenomena you would like to compare in terms of temperature?", additional_kwargs={}, response_metadata={'model': 'qwen2.5:7b', 'created_at': '2025-01-26T04:35:16.560995Z', 'message': {'role': 'assistant', 'content': "I'm sorry, I need more information to determine which one you're referring to. Could you please specify the two objects or phenomena you would like to compare in terms of temperature?"}, 'done_reason': 'stop', 'done': True, 'total_duration': 1491044000, 'load_duration': 35242500, 'prompt_eval_count': 241, 'prompt_eval_duration': 126000000, 'eval_count': 37, 'eval_duration': 1324000000}, id='run-5bda94ee-4802-44f7-82f0-358a7d3f740f-0', usage_metadata={'input_tokens': 241, 'output_tokens': 37, 'total_tokens': 278})]}


## Streaming tokens

In [10]:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [11]:
import asyncio
from contextlib import AsyncExitStack
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

stack = AsyncExitStack()
memory = await stack.enter_async_context(AsyncSqliteSaver.from_conn_string(":memory:"))

abot = Agent(model, [tool], system=prompt, checkpointer=memory)

messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

await stack.aclose()

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'weather in San Francisco'}, 'id': '106d70bd-06ee-4839-a9eb-8b9511a68fad', 'type': 'tool_call'}
Back to the model!
The current weather in San Francisco is as follows:
- Temperature: 9.4°C (48.9°F)
- Condition: Light rain
- Wind speed: 9.6 mph
- Humidity: 71%

It was last updated at 21:00 UTC on the same day. The visibility is 16 km, and there's no precipitation currently recorded.

For further detailed information or hourly and daily forecasts, you can visit [WeatherAPI](https://www.weatherapi.com/).|