In [None]:
from dotenv import load_dotenv

_ = load_dotenv() 

In [15]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [16]:
tool = TavilySearchResults(max_results=2)

In [17]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

#### We're using here a ``synchronous`` memmory saver, which means that untill each llm/tool call is not done, we won't get any output

In [30]:
#add persistence to the agent
#by using a checkpointer
#"checkpoints" the state of the agent after and between every node

from langgraph.checkpoint.sqlite import SqliteSaver

# stack = AsyncExitStack()

# memory = SqliteSaver.from_conn_string(":memory:")

#it didn't work, so i used a sync solution answer
#https://community.deeplearning.ai/t/lesson-4-persistence-and-streaming-attributeerror-generatorcontextmanager-object-has-no-attribute-get-next-version/697391/3

from contextlib import ExitStack

stack = ExitStack()
memory = stack.enter_context(SqliteSaver.from_conn_string(":memory:"))

#it will use a built-in database of sqlite for this
#so when refreshing the notebook it'll dissapear


In [31]:
class Agent:
    
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        #we pass the checkpointer param here when compiling() the agent graph
        
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [32]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [33]:
#adding streaming concept to the agent

#1. We might care of streaming the individual messages
#so this would be keeping track of the ai message that determines which action to take
#and the observation message (result of taking that action/tool)

#we can also stream the tokens of each llm output


#we'll start streaming only the messages (not the tokens)
messages = [HumanMessage(content="What is the weather in sf?")]

In [34]:
#we're now use a thread config

#this'll be used to keep track of different threads
#inside the persistence checkpointer we defined before

#this can be REALLY usefull to allow us run multiple conversations at the same time
#(e.g) application with many users
thread = {"configurable": {"thread_id":"1"}}

Now, instead of calling `invoke()` to run our model, we just stream over the graph, with the selected `thread_id=1`we selected before

- In this way, **we'll be getting all the intermediate steps and messages** going on in our agent and tools untill it deliveries the end answer

In [35]:
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_2qQv5jXM7klQCJZFEPEIBqDA', 'function': {'arguments': '{"query":"San Francisco weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 151, 'total_tokens': 172, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_159d8341cc', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-3ec895b4-03ea-4e25-a226-f6c753ad1314-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'San Francisco weather today'}, 'id': 'call_2qQv5jXM7klQCJZFEPEIBqDA', 'type': 'tool_call'}], usage_metadata={'input_tokens': 151, 'output_tokens': 21, 'total_tokens': 172, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 0}})]
Calli

In [40]:
messages = [HumanMessage(content="Which was the weather, sorry?")]

thread = {"configurable": {"thread_id": "3"}}

#if we used thread_id=2, it wouldn't work cuz it wouldn't have context from the past
#the memmory saver would be inside another separate thread

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Could you please clarify your question? Are you asking about the current weather, a specific past date, or a particular location?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 151, 'total_tokens': 177, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_159d8341cc', 'finish_reason': 'stop', 'logprobs': None}, id='run-dd170041-abd8-4c7e-80a5-fed9c1c8cd6a-0', usage_metadata={'input_tokens': 151, 'output_tokens': 26, 'total_tokens': 177, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 0}})]}


here, we are following up with the same conversation (`thread=1`, so it'll renember the content because the `memmory saver of the agent` is getting saved in the same `thread=1`)

In [41]:
messages = [HumanMessage(content="What about LA?")]

thread = {"configurable": {"thread_id": "1"}}

#if we used thread_id=2, it wouldn't work cuz it wouldn't have context from the past
#the memmory saver would be inside another separate thread

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_U0aSHjTBBVPNkXOID9YgXYAw', 'function': {'arguments': '{"query":"Los Angeles weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 935, 'total_tokens': 956, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_159d8341cc', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-1af97bfb-140f-48cb-8bcc-9a10fbb8b957-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Los Angeles weather today'}, 'id': 'call_U0aSHjTBBVPNkXOID9YgXYAw', 'type': 'tool_call'}], usage_metadata={'input_tokens': 935, 'output_tokens': 21, 'total_tokens': 956, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 0}

In [43]:
messages = [HumanMessage(content="Compare this weather in sf with the one in Spain")]

thread = {"configurable": {"thread_id": "1"}}

#if we used thread_id=2, it wouldn't work cuz it wouldn't have context from the past
#the memmory saver would be inside another separate thread

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_zXFRJ8OhAyqpoONSIChdfvBs', 'function': {'arguments': '{"query":"Spain weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 2204, 'total_tokens': 2224, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 2048}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_159d8341cc', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-a814d5e2-10f1-4001-a78a-512502660578-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Spain weather today'}, 'id': 'call_zXFRJ8OhAyqpoONSIChdfvBs', 'type': 'tool_call'}], usage_metadata={'input_tokens': 2204, 'output_tokens': 20, 'total_tokens': 2224, 'input_token_details': {'cache_read': 2048}, 'output_token_details': {'reasoning': 0}})

Since we are in the same conversation memmory saved inside `thread=1`, we can ask it to compare the weathers from before with the one in Brazil

In [44]:
messages = [HumanMessage(content="Compare those weathers with the weather in Brazil")]

thread = {"configurable": {"thread_id": "1"}}

#if we used thread_id=2, it wouldn't work cuz it wouldn't have context from the past
#the memmory saver would be inside another separate thread

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_fmuYhr86x3iCLReoPRJzSmEc', 'function': {'arguments': '{"query":"Brazil weather today"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 2811, 'total_tokens': 2831, 'completion_tokens_details': {'audio_tokens': None, 'reasoning_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 2688}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_159d8341cc', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-9ec34feb-4f79-46a9-b7de-612799b252fc-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'Brazil weather today'}, 'id': 'call_fmuYhr86x3iCLReoPRJzSmEc', 'type': 'tool_call'}], usage_metadata={'input_tokens': 2811, 'output_tokens': 20, 'total_tokens': 2831, 'input_token_details': {'cache_read': 2688}, 'output_token_details': {'reasoning': 0}

#### We're using here a ``asynchronous`` memmory saver, which means that untill each llm/tool call is not done, we won't get any output

In that way, we'll `stream tokens` too

Basically, you'll see each token getting generated by the llm, one step at a time

In [47]:
import asyncio
from contextlib import AsyncExitStack
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

#this will allow us to stream tokens itselves too

#we'll use A-Stream events
#async method)


#solution from error https://community.deeplearning.ai/t/lesson-4-persistence-and-streaming-attributeerror-generatorcontextmanager-object-has-no-attribute-get-next-version/697391/4
stack = AsyncExitStack()
memory = await stack.enter_async_context(AsyncSqliteSaver.from_conn_string(":memory:"))

#----
# from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# memory = AsyncSqliteSaver.from_conn_string(":memory:")
# abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [48]:
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [None]:
messages = [HumanMessage(content="What is the weather in SF?")]

thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    
    
    #we retrieve the events when the llm is otuputting text
    #and then we start showing token by token async
    
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_r7EBd8PUWuhiWqU5XuaA1plg', 'type': 'tool_call'}
Back to the model!
The| current| weather| in| San| Francisco| is| as| follows|:

|-| Temperature|:| |13|.|4|°C| (|56|.|0|°F|)
|-| Condition|:| Patch|y| rain| nearby|
|-| Wind|:| |9|.|8| mph| (|15|.|8| k|ph|)| from| the| W|NW|
|-| Hum|idity|:| |82|%
|-| Cloud| Cover|:| |52|%
|-| Visibility|:| |10| km| (|6| miles|)
|-| UV| Index|:| |1| (|low|)

|The| feels|-like| temperature| is| approximately| |12|.|1|°C| (|53|.|7|°F|).|

In [51]:
messages = [HumanMessage(content="How much does a tomatoe cost at sf?. Compare it with the cost of it at spain")]

thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    
    
    #we retrieve the events when the llm is otuputting text
    #and then we start showing token by token async
    
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current price of tomatoes in Spain'}, 'id': 'call_UMDpECLoUaG6hQuJ8TwOK3M6', 'type': 'tool_call'}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current price of tomatoes in San Francisco'}, 'id': 'call_0I1IzBhpNcdRxP5pPSJ4mqSU', 'type': 'tool_call'}
Back to the model!
In| San| Francisco|,| the| average| price| for| |1| kilogram| (|approximately| |2| pounds|)| of| tomatoes| is| $|7|.

|In| Spain|,| specifically| in| Barcelona|,| the| price| for| |1| kilogram| of| tomatoes| is| approximately| €|2|.|00|.| Con|verting| this| to| US| dollars|,| given| the| current| exchange| rate|,| it| would| be| roughly| $|2|.|20| to| $|2|.|40| depending| on| the| exact| rate|.

|Compar|atively|,| tomatoes| are| significantly| more| expensive| in| San| Francisco| than| in| Spain|.|

In [None]:
# # Exit all contexts
# stack.close()