# Lesson 4: Persistence and Streaming

In [2]:
# !pip install langgraph langchain_core langchain_openai langchain_community
# !pip install aiosqlite
# !pip install langgraph.checkpoint.sqlite
# !pip install langgraph.checkpoint.aiosqlite

In [3]:
import os

In [4]:
from dotenv import load_dotenv

_ = load_dotenv()

In [6]:
# reset all those keys
openai_api_key = os.getenv("OPENAI_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY")
# tool = TavilySearchResults(tavily_api_key=tavily_api_key)

In [7]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [8]:
tool = TavilySearchResults(max_results=2)

In [9]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [10]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [11]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [12]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [13]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [14]:
thread = {"configurable": {"thread_id": "1"}}

In [15]:
stream = abot.graph.stream({"messages": messages}, thread)
print(type(stream))  # To inspect what type of object stream is

<class 'generator'>


## Streaming tokens

In [22]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI

In [30]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [32]:
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [38]:
# move abot in the context manager:
# https://community.deeplearning.ai/t/lesson-4-persistence-and-streaming-attributeerror-generatorcontextmanager-object-has-no-attribute-get-next-version/697391/2
with SqliteSaver.from_conn_string(":memory:") as memory:
  abot = Agent(model, [tool], system=prompt, checkpointer=memory)

  messages = [HumanMessage(content="What is the weather in sf?")]
  thread = {"configurable": {"thread_id": "1"}}

  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_mGrZ1EtJ02FN4y86aumPjWHi', 'function': {'arguments': '{"query":"San Francisco weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 151, 'total_tokens': 172, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_6b68a8204b', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-20b34f65-80cb-41af-8e6b-8e8e9e45d4bf-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'San Francisco weather today'}, 'id': 'call_mGrZ1EtJ02FN4y86aumPjWHi', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 21, 'total_tokens': 172, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 0}})]
Calli

In [41]:
with SqliteSaver.from_conn_string(":memory:") as memory:
  abot = Agent(model, [tool], system=prompt, checkpointer=memory)
  messages = [HumanMessage(content="What is the weather in sf?")]
  thread = {"configurable": {"thread_id": "2"}}
  for event in abot.graph.stream({"messages": messages}, thread):
      for v in event.values():
          print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_LWMgmOT5poPaMjPWGXiez61D', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_6b68a8204b', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-456a9fcb-f7d3-4a8d-91b4-0deead94bd31-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_LWMgmOT5poPaMjPWGXiez61D', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 22, 'total_tokens': 173, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 0

So not much differences between different threads. The thread_id itself is not intended to change the type of result or the content. It’s there to keep conversations isolated when needed. The results will remain the same as long as the input queries are similar, regardless of the thread_id.

Initializes an AI agent with a memory-saving feature using SQLite.
The agent receives a user message asking for the weather in San Francisco.
The agent processes this request in a thread, and then streams back events that contain responses. Finally, the responses are printed out.