In [1]:
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate
from langchain.tools import tool
from langchain_core.callbacks import Callbacks
from langchain_openai import ChatOpenAI


In [2]:
# Enable LangSmith tracking
import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.langchain.plus"

## Create the model 

In [3]:
model  = ChatOpenAI(temperature=0, streaming=True)

## Tools

In [4]:
import random

@tool
async def where_cat_is_hiding() ->  str:
    """Where is the cat hiding right now?"""
    return random.choice(["under the bed", "on the shelf"])

@tool
async def get_items(place: str) -> str:
    """Use this tool to lookup which items are in the given  place. """
    if "bed" in place:  # For under the bed
        return "socks, shoes and dust bunnies"
    if "shelf" in place:  # For 'shelf'
        return "books, penciles and pictures"
    else:  # if the agent decides to ask about a different place
        return "cat snacks"

## Initialize the agent

**ATTENTION** Please note that we associated the name Agent with our agent using "run_name"="Agent". We'll use that fact later on with the astream_events API.

In [6]:
# Get the prompt to use  -  you  can modify  this
prompt = hub.pull("hwchase17/openai-tools-agent")
# print(prompt.messages)

tools = [get_items, where_cat_is_hiding]

agent  = create_openai_tools_agent(model.with_config({"tags": ["agent_llm"]}), tools, prompt)

agent_executor = AgentExecutor(agent=agent, tools=tools).with_config(
    {"run_name": "Agent"}
)

## Stream Intermediate Steps

We’ll use .stream method of the AgentExecutor to stream the agent’s intermediate steps.

The output from .stream alternates between (action, observation) pairs, finally concluding with the answer if the agent achieved its objective.

It’ll look like this:

- actions output
- observations output
- actions output
- observations output

… (continue until goal is reached) …

Then, if the final goal is reached, the agent will output the final answer.

The contents of these outputs are summarized here:

| Output       |      Contents      |
|----------    |:-------------:|
| Actions      |  `actions` `AgentAction` or a subclass, `messages` chat messages corresponding to action invocation |
| Observations |  `steps` History of what the agent did so far, including the current action and its observation, `messages` chat message with function invocation results (aka observations)   |
| Final answer | `output` `AgentFinish`, `messages` chat messages with the final output |

In [13]:
# Note: We use `pprint` to print only to depth 1, it makes it easier to see the output from a high level, before digging in.

import pprint

chunks = []

async for chunk in agent_executor.astream({
    "input": "What is the items are located  where the cat is hiding?"
}):
    chunks.append(chunk)
    print("---------")
    pprint.pprint(chunk, depth=1)

---------
{'actions': [...], 'messages': [...]}
---------
{'messages': [...], 'steps': [...]}
---------
{'actions': [...], 'messages': [...]}
---------
{'messages': [...], 'steps': [...]}
---------
{'messages': [...],
 'output': 'The items located where the cat is hiding (on the shelf) are '
           'books, pencils, and pictures.'}


## Using  Messages

You can access the underlying `messages`` from the outputs. Using messages can be nice when working with chat applications - because everything is a message!

In [19]:
chunks[0]["actions"]

[OpenAIToolAgentAction(tool='where_cat_is_hiding', tool_input={}, log='\nInvoking: `where_cat_is_hiding` with `{}`\n\n\n', message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_iMU3W45ykVrU6sxBB9aVMzSM', 'function': {'arguments': '{}', 'name': 'where_cat_is_hiding'}, 'type': 'function'}]})], tool_call_id='call_iMU3W45ykVrU6sxBB9aVMzSM')]

In [20]:
for chunk in chunks:
    print(chunk["messages"])

[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_iMU3W45ykVrU6sxBB9aVMzSM', 'function': {'arguments': '{}', 'name': 'where_cat_is_hiding'}, 'type': 'function'}]})]
[FunctionMessage(content='under the bed', name='where_cat_is_hiding')]
[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_4SGUd9YSFmPZmn86ePu53lcw', 'function': {'arguments': '{"place":"under the bed"}', 'name': 'get_items'}, 'type': 'function'}]})]
[FunctionMessage(content='socks, shoes and dust bunnies', name='get_items')]
[AIMessage(content='The items located where the cat is hiding (under the bed) are socks, shoes, and dust bunnies.')]


## Using AgentAction/Observation

The outputs also contain richer structured information inside of actions and steps, which could be useful in some situations, but can also be harder to parse.

`Attention` AgentFinish is not available as part of the `streaming` method. If this is something you’d like to be added, please start a discussion on github and explain why its needed.

In [14]:
async for chunk  in agent_executor.astream({
    "input": "What is the items are located where the cat is hiding"
}):
    # Agent Action
    if "actions" in chunk:
        for action in chunk["actions"]:
            print(f"Calling  Tool:  `{action.tool}` with input `{action.tool_input}`")
    
    # Observation
    if "steps" in chunk:
        for step in chunk["steps"]:
            print(f"Tool Result: `{step.observation}`")
    
    # Final result
    if "output" in chunk:
        # print(f'Final Output: {chunk["output"]}')
        print(f'Final Output: {chunk["output"]}')
    # else:
    #     raise ValueError()
    
    print("-----")

Calling  Tool:  `where_cat_is_hiding` with input `{}`
-----
Tool Result: `under the bed`
-----
Calling  Tool:  `get_items` with input `{'place': 'under the bed'}`
-----
Tool Result: `socks, shoes and dust bunnies`
-----
Final Output: The items located where the cat is hiding (under the bed) are socks, shoes, and dust bunnies.
-----


## Custom Streaming With Events

Use the `astream_events`` API in case the default behavior of stream does not work for your application (e.g., if you need to stream individual tokens from the agent or surface steps occuring **within** tools).

⚠️ This is a **beta** API, meaning that some details might change slightly in the future based on usage. ⚠️ To make sure all callbacks work properly, use `async` code throughout. Try avoiding mixing in sync versions of code (e.g., sync versions of tools).

Let’s use this API to stream the following events:

In [15]:
async for event in agent_executor.astream_events(
    {"input": "where is the cat hiding? what items are in that location?"},
    version="v1",
):
    kind = event["event"]
    if kind == "on_chain_start":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind == "on_chain_end":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind == "on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

Starting agent: Agent with input: {'input': 'where is the cat hiding? what items are in that location?'}
--
Starting tool: where_cat_is_hiding with inputs: {}
Done tool: where_cat_is_hiding
Tool output was: under the bed
--
--
Starting tool: get_items with inputs: {'place': 'under the bed'}
Done tool: get_items
Tool output was: socks, shoes and dust bunnies
--
The| cat| is| hiding| under| the| bed|.| In| that| location|,| you| can| find| socks|,| shoes|,| and| dust| b|unn|ies|.|
--
Done agent: Agent with output: The cat is hiding under the bed. In that location, you can find socks, shoes, and dust bunnies.


## Stream Events from within Tools

If your tool leverages LangChain runnable objects (e.g., LCEL chains, LLMs, retrievers etc.) and you want to stream events from those objects as well, you’ll need to make sure that callbacks are propagated correctly.

To see how to pass callbacks, let’s re-implement the get_items tool to make it use an LLM and pass callbacks to that LLM. Feel free to adapt this to your use case.

In [18]:
@tool
async def get_items_with_callbacks(place: str, callbacks: Callbacks) -> str:  # <--- Accept callbacks
    """Use this tool to look up which items are in the given place."""
    print("DEBUG")
    print(callbacks)
    template = ChatPromptTemplate.from_messages(
        [
            (
                "human",
                "Can you tell me what kind of items i might find in the following place: '{place}'. "
                "List at least 3 such items separating them by a comma. And include a brief description of each item..",
            )
        ]
    )
    chain = template | model.with_config(
        {
            "run_name": "Get Items LLM",
            "tags": ["tool_llm"],
            "callbacks": callbacks,  # <-- Propagate callbacks
        }
    )
    chunks = [chunk async for chunk in chain.astream({"place": place})]
    return "".join(chunk.content for chunk in chunks)

In [19]:
# Get the prompt to use - you can modify this
prompt = hub.pull("hwchase17/openai-tools-agent")

tools = [get_items_with_callbacks, where_cat_is_hiding]

agent = create_openai_tools_agent(
    model.with_config({"tags": ["agent_llm"]}), tools, prompt
)
agent_executor = AgentExecutor(agent=agent, tools=tools).with_config(
    {"run_name": "Agent"}
)

async for event in agent_executor.astream_events(
    {"input": "where is the cat hiding? what items are in that location?"},
    version="v1",
):
    kind = event["event"]
    if kind == "on_chain_start":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind == "on_chain_end":
        if (
            event["name"] == "Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind == "on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind == "on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

Starting agent: Agent with input: {'input': 'where is the cat hiding? what items are in that location?'}
--
Starting tool: where_cat_is_hiding with inputs: {}
Done tool: where_cat_is_hiding
Tool output was: on the shelf
--
--
Starting tool: get_items_with_callbacks with inputs: {'place': 'on the shelf'}
DEBUG
<langchain_core.callbacks.manager.AsyncCallbackManager object at 0x7f75e35ed890>
1|.| Books| -| On| the| shelf|,| you| may| find| a| variety| of| books| ranging| from| fiction| to| non|-fiction|,| novels| to| reference| books|.| Books| are| typically| organized| by| genre| or| author| for| easy| browsing|.

|2|.| Photo| frames| -| Photo| frames| are| commonly| found| on| shelves| to| display| cherished| memories| and| moments| captured| in| photographs|.| They| come| in| various| sizes|,| shapes|,| and| materials| to| suit| different| decor| styles|.

|3|.| Decor|ative| figur|ines| -| Decor|ative| figur|ines| such| as| sculptures|,| statues|,| or| small| ornaments| are| often| pla

## Other approaches

**Using astream_log**

**Note** You can also use the `astream_log` API. This API produces a granular log of all events that occur during execution. The log format is based on the `JSONPatch` standard. It’s granular, but requires effort to parse. For this reason, we created the astream_events API instead