# Lesson 4: Persistence and Streaming

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

![breakdown](\images\graphs.png)

In [4]:
class AgentState(TypedDict):
    """
    AgentState is a specialized dictionary that stores the state of the agent.
    
    Attributes:
        messages: A list of AnyMessage objects. This list is annotated with
                  operator.add to ensure that new messages are appended to
                  the existing list rather than replacing it.
    """
    messages: Annotated[list[AnyMessage], operator.add]

In [5]:
from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")

In [6]:
class Agent:
    """
    The Agent class manages the interaction between a language model and various tools
    within a state machine framework. It is responsible for maintaining the state of
    messages, invoking the language model, and executing tool actions as required.

    Attributes:
        system (str): An optional system prompt to initialize the conversation.
        graph (StateGraph): The state machine that manages the flow of messages and actions.
        tools (dict): A dictionary of tools available to the agent, keyed by their names.
        model (ChatOpenAI): The language model instance that the agent uses for generating responses.

    Methods:
        exists_action(state: AgentState) -> bool:
            Checks if there are any tool calls in the latest message in the state.

        call_openai(state: AgentState) -> dict:
            Invokes the language model with the current list of messages, optionally
            prepending a system message if provided.

        take_action(state: AgentState) -> dict:
            Executes the tool calls specified in the latest message, and returns the results
            as new messages.
    """

    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        print(f"number of tool calls: {len(tool_calls)} \n")
        
        print(f"tool calls: {tool_calls} \n")
        for t in tool_calls:
            print(f"Calling: {t} \n")
            if not t['name'] in self.tools:      # check for bad tool name from LLM
                print("\n ....bad tool name....")
                result = "bad tool name, retry"  # instruct LLM to retry if bad
            else:
                result = self.tools[t['name']].invoke(t['args'])
                print(f"result: {result} \n")
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        
        print(f"all results: {results} \n")
        print("Back to the model! \n")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [8]:
messages = [HumanMessage(content="What is the weather in sf?")]

In [9]:
thread = {"configurable": {"thread_id": "1"}}

In [10]:
for event in abot.graph.stream({"messages": messages}, thread):
    print(f"event: {event} \n")
    for v in event.values():
        print(f"event value: {v} \n")
        print(f"event value messages: {v['messages']} \n")

event: {'llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_WwXejdcRhhG11vqH0Jp67r9i', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-37395d61-3018-496d-ba85-ee65b88f555b-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_WwXejdcRhhG11vqH0Jp67r9i'}])]}} 

event value: {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_WwXejdcRhhG11vqH0Jp67r9i', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'p

event: {'llm': {'messages': [AIMessage(content='The current weather in San Francisco is partly cloudy with a temperature of 12.6°C (54.8°F). The wind is coming from the southwest at 5.1 mph (8.3 kph), and the humidity level is at 91%. Visibility is 10 km (6 miles), and the UV index is 1.', response_metadata={'token_usage': {'completion_tokens': 71, 'prompt_tokens': 680, 'total_tokens': 751}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'stop', 'logprobs': None}, id='run-1fed1979-f53a-4c5c-9a4e-7da7979aee8d-0')]}} 

event value: {'messages': [AIMessage(content='The current weather in San Francisco is partly cloudy with a temperature of 12.6°C (54.8°F). The wind is coming from the southwest at 5.1 mph (8.3 kph), and the humidity level is at 91%. Visibility is 10 km (6 miles), and the UV index is 1.', response_metadata={'token_usage': {'completion_tokens': 71, 'prompt_tokens': 680, 'total_tokens': 751}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3

In [11]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    print(f"event: {event} \n")
    for v in event.values():
        print(f"event value: {v} \n")

event: {'llm': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_XCiKxguJowjgLTa4fMQz2Wow', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 763, 'total_tokens': 785}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-fe4f8e8c-5fbc-4c72-9a4b-d46b16e12ad1-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_XCiKxguJowjgLTa4fMQz2Wow'}])]}} 

event value: {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_XCiKxguJowjgLTa4fMQz2Wow', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_

In [12]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    print(f"event: {event} \n")
    for v in event.values():
        print(f"event value: {v} \n")

event: {'llm': {'messages': [AIMessage(content='Los Angeles is warmer than San Francisco. The current temperature in Los Angeles is 24.4°C (75.9°F), while in San Francisco it is 12.6°C (54.8°F).', response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 1410, 'total_tokens': 1454}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'stop', 'logprobs': None}, id='run-34569696-c618-4634-95f6-76e8b34262b7-0')]}} 

event value: {'messages': [AIMessage(content='Los Angeles is warmer than San Francisco. The current temperature in Los Angeles is 24.4°C (75.9°F), while in San Francisco it is 12.6°C (54.8°F).', response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 1410, 'total_tokens': 1454}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'stop', 'logprobs': None}, id='run-34569696-c618-4634-95f6-76e8b34262b7-0')]} 



Note: If you change the thread_id, the agent has no context of previous conversations in that thread.  
Threads allow you to have multiple conversations with the agent

In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please specify the two locations or items you are comparing in terms of warmth?', response_metadata={'token_usage': {'completion_tokens': 18, 'prompt_tokens': 149, 'total_tokens': 167}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_3aa7262c27', 'finish_reason': 'stop', 'logprobs': None}, id='run-bd36ff65-8738-4e83-ac42-21c22676a79e-0')]}


## Streaming tokens

In [14]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [15]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


number of tool calls: 1 

tool calls: [{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_E2sI0Nw5b8ZfTOIp0QVkXHgS'}] 

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_E2sI0Nw5b8ZfTOIp0QVkXHgS'} 

result: [{'url': 'https://www.weatherapi.com/', 'content': "{'location': {'name': 'San Francisco', 'region': 'California', 'country': 'United States of America', 'lat': 37.78, 'lon': -122.42, 'tz_id': 'America/Los_Angeles', 'localtime_epoch': 1722943049, 'localtime': '2024-08-06 04:17'}, 'current': {'last_updated_epoch': 1722942900, 'last_updated': '2024-08-06 04:15', 'temp_c': 12.6, 'temp_f': 54.8, 'is_day': 0, 'condition': {'text': 'Partly Cloudy', 'icon': '//cdn.weatherapi.com/weather/64x64/night/116.png', 'code': 1003}, 'wind_mph': 5.1, 'wind_kph': 8.3, 'wind_degree': 222, 'wind_dir': 'SW', 'pressure_mb': 1015.0, 'pressure_in': 29.96, 'precip_mm': 0.0, 'precip_in': 0.0