# Lesson 4: Persistence and Streaming for long running tasks

### Persistence in LangChain:
- **Definition**: Persistence allows an agent to save its state at a particular point in time. This stored state can then be reloaded and used in future interactions, enabling the agent to resume from where it left off. 
- **Use Case**: Persistence is crucial for long-running applications where an agent may need to maintain continuity over extended periods. For example, an agent helping with a multi-step task, such as project planning or writing, can store progress and resume when needed without losing context.

### Streaming in LangChain:
- **Definition**: Streaming allows an agent to emit signals in real-time about its current activities or progress. This capability provides insights into what the agent is doing at each step, allowing users or systems to monitor its operations in real-time.
- **Use Case**: In long-running applications, streaming enables better interactivity by providing users with immediate feedback on the agent's status. For instance, in a live conversation or long-running process, streaming lets the user see updates as they happen, rather than waiting for the entire process to complete before receiving feedback.

### Summary:
- **Persistence**: Saves the agent's state to allow continuity in long-term applications.
- **Streaming**: Provides real-time updates about the agent's operations for better tracking and interactivity.

In [1]:
from dotenv import load_dotenv

_ = load_dotenv()

In [2]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

In [3]:
tool = TavilySearchResults(max_results=2)

In [4]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

### checkpoiner for persistence

In [8]:
# A check pointer basically checkpoints the state after and between every node.
# To add in persistence for this agent, what we'll do is we'll use a SqliteSaver (in memory database). 
# But you can easily connect this to an external database or we also have other check pointers that use Redis and Postgres and other more persistent databases like that.

from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")

In [6]:
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        
        # checkpoiner
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [7]:
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

In [9]:
messages = [HumanMessage(content="What is the weather in sf?")]

### Streaming

- First, we might care about streaming the individual messages. So this would be the Al message that determines what action to take. And then the observation message that represents the result of taking that action.
- The second thing we might care about streaming is tokens. So for each token of the LLM call we might want to stream the output.


We're now going to add this concept of a thread config. So this will be used to keep track of different threads inside the persistent checkpointer. This will allow us to have multiple conversations going on at the same time. This is really needed for production applications where you generally have many users.

In [10]:
thread = {"configurable": {"thread_id": "1"}} 

In [11]:
# We won't use invoke, but will use stream this time
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_dYeseNNLTZ8gx8HhLOfMxMWb', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-4f4b1228-3c31-414b-83ab-f58249a4af53-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_dYeseNNLTZ8gx8HhLOfMxMWb'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_dYeseNNLTZ8gx8HhLOfMxMWb'}
Back to the model!
[AIMessage(content='The current weather in San Francisco is partly cloudy with a temperature of approximately 15.3°C (59.5°F). The wind is blowing from the north-northeast at about 3.8 mph (6.

### Vairous examples

This time we're going to say "What about in LA?" So this is continuing the same conversation that we had before.
It's asking a follow-up question. We don't say anything explicitly about the weather, but based on it being a conversation, we would expect it to realize that we're asking about the weather here.

because we used the same `thread_id` here

In [12]:
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_oBN465Sil6jTfnMOsDTUxwQu', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 774, 'total_tokens': 796}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-a05a8a9f-2d8b-47e5-9230-153aba2d42b9-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_oBN465Sil6jTfnMOsDTUxwQu'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_oBN465Sil6jTfnMOsDTUxwQu'}
Back to the model!
{'messages': [ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'Los Angeles\', \'region\': \'California\', \'country\': \'Unite

In [13]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='Los Angeles is warmer than San Francisco. The current temperature in Los Angeles is approximately 24.9°C (76.9°F), while the temperature in San Francisco is around 15.3°C (59.5°F).', response_metadata={'token_usage': {'completion_tokens': 47, 'prompt_tokens': 1405, 'total_tokens': 1452}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-163483ab-78e9-4327-b036-4b6a4612cbfa-0')]}


In [14]:
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}} # The LLM will really confuse, because it can't find the context from thread_id 1
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

{'messages': [AIMessage(content='I need more context to provide an accurate answer. Are you comparing two specific locations, times, objects, or something else? Could you please provide more details?', response_metadata={'token_usage': {'completion_tokens': 33, 'prompt_tokens': 149, 'total_tokens': 182}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_157b3831f5', 'finish_reason': 'stop', 'logprobs': None}, id='run-47b926af-dffd-4335-8a07-2468f68f2330-0')]}


## Streaming tokens

A-stream event is an asynchronous method, which means that we're going to need to use an async checkpointer.
In order to do this. We can import async SaliteSaver and pass that to the agent.

In [15]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)

What we want to do, is we want to look for events that correspond to new tokens. These kind of events are called `on_chat_model_stream`. When we see these events happening, we want to get the content and print it out. And we'll print it out with this type delimiter. When we run this, we should see it streaming real time into the screen.

In [16]:
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")

  warn_beta(


Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_OASyT15D0jV3Y7h6Z6T0h8pn'}
Back to the model!
The| current| weather| in| San| Francisco| is| partly| cloudy| with| a| temperature| of| |15|.|2|°C| (|59|.|4|°F|).| The| wind| is| blowing| from| the| north|-n|ort|heast| at| |6|.|1| k|ph| (|3|.|8| mph|).| Hum|idity| is| at| |93|%,| and| visibility| is| |16| km| (|9| miles|).| The| UV| index| is| |5|.|

It returns a final answer and stream out those tokens one at a time.

We can see that we've got this little funny pipe delimiter, but we could easily remove that in our production application if we wanted to. So that's it for persistence and streaming. Pretty simple to get started with, but really powerful for building production applications.

You're going to want your agents to be able to have multiple conversations at the same time, and have a concept of memory so they can resume those conversations. And you're also going to want them to be able to stream both the final tokens, but also all of the messages that came before.

Persistence is also really important for enabling human in the loop type interactions, and that's exactly what we're going to cover in the next lesson.