In [1]:
from dotenv import load_dotenv,find_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI

from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent,create_tool_calling_agent
from langchain.tools import tool
from langchain_core.callbacks import Callbacks
from langchain.prompts import ChatPromptTemplate
import random
import pprint

In [2]:
load_dotenv(find_dotenv("../.env"))

True

In [21]:
llmGemini=ChatGoogleGenerativeAI(model="gemini-2.0-flash-001")
llmOpenAI=ChatOpenAI(model="gpt-4o-mini")

<p>Streaming with Agents are made more complicated by the fact that it's not just tokens of the final answer that you will want to stream, but you may also want to stream back the intermediate steps an agent takes</p>

In [14]:
@tool
async def whereCatisHiding() -> str:
    """
        Where the cat is hiding right now?
    """
    return random.choice(seq=["On the Bed","On the Shelf"])

In [15]:
@tool
async def getItems(place:str) -> str:
    """
        Use this tool to look up which items are in the given place
    """
    if "bed" in place:  # For under the bed
        return "socks, shoes and dust bunnies"
    elif "shelf" in place:  # For shelf
        return "books, pencils and pictures"
    else:  # if the agent decides to ask about a different place
        return "cat snacks"

In [16]:
await whereCatisHiding.ainvoke(input={})

'On the Shelf'

In [17]:
await getItems.ainvoke(input={"place":"almirah"})

'cat snacks'

<h3>Initialize the Agent</h3>

In [19]:

prompt=hub.pull(owner_repo_commit="hwchase17/openai-tools-agent")

In [9]:
tools=[getItems,whereCatisHiding]

In [22]:
agent=create_openai_tools_agent(
    llm=llmOpenAI.with_config({"tags":["agent_llm"]}),
    tools=tools,
    prompt=prompt
)

In [23]:
agentExecutor=AgentExecutor(agent=agent,tools=tools).\
    with_config({"run_name":"Agent"})  # this config will be used later with astream_events API

<h3> Stream Intermediate Steps</h3>
<h4>Actions, Observations and Final Answer</h4>

In [24]:
chunks=[]
async for chunk in agentExecutor.astream(input={"input":"What items are located where the cat is hiding?"}):
    chunks.append(chunk)
    print("---------")
    pprint.pprint(object=chunk,depth=1)

---------
{'actions': [...], 'messages': [...]}
---------
{'messages': [...], 'steps': [...]}
---------
{'actions': [...], 'messages': [...]}
---------
{'messages': [...], 'steps': [...]}
---------
{'messages': [...],
 'output': 'The items located where the cat is hiding (on the shelf) are cat '
           'snacks.'}


In [25]:
chunks

[{'actions': [ToolAgentAction(tool='whereCatisHiding', tool_input={}, log='\nInvoking: `whereCatisHiding` with `{}`\n\n\n', message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'function': {'arguments': '{}', 'name': 'whereCatisHiding'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c'}, id='run--870d49ea-ec8d-47b3-867e-86ab5526fe1c', tool_calls=[{'name': 'whereCatisHiding', 'args': {}, 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'whereCatisHiding', 'args': '{}', 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'index': 0, 'type': 'tool_call_chunk'}])], tool_call_id='call_YJt2KiIs7aNuIa24JJZbJ6L1')],
  'messages': [AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'function': {'arguments': '{}', 'name': 'whereCatisHid

In [26]:
for chunk in chunks:
    print(chunk['messages'])

[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'function': {'arguments': '{}', 'name': 'whereCatisHiding'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c'}, id='run--870d49ea-ec8d-47b3-867e-86ab5526fe1c', tool_calls=[{'name': 'whereCatisHiding', 'args': {}, 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'whereCatisHiding', 'args': '{}', 'id': 'call_YJt2KiIs7aNuIa24JJZbJ6L1', 'index': 0, 'type': 'tool_call_chunk'}])]
[FunctionMessage(content='On the Shelf', additional_kwargs={}, response_metadata={}, name='whereCatisHiding')]
[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_z67jVNy43X1e89EgJWKhj6cA', 'function': {'arguments': '{"place":"On the Shelf"}', 'name': 'getItems'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_ca

<h3>Using AgentAction/Observation</h3>

In [27]:
async for chunk in agentExecutor.astream(input={"input":"What items are located where the cat is hiding?"}):
    # Agent Action
    if "actions" in chunk:
        for action in chunk['actions']:
            print(f"Calling Tool: {action.tool} with Input: {action.tool_input}")
    # Observation
    elif "steps" in chunk:
        for step in chunk["steps"]:
            print(f"Tool Result: {step.observation}")
    # Final Result
    elif "output" in chunk:
        print(f"Final Output: {chunk['output']}")
    else:
        raise ValueError
    print("----")
    
            
        

Calling Tool: whereCatisHiding with Input: {}
----
Tool Result: On the Shelf
----
Calling Tool: getItems with Input: {'place': 'On the Shelf'}
----
Tool Result: cat snacks
----
Final Output: The items located where the cat is hiding (on the shelf) are cat snacks.
----


<h3> Custom Streaming with Events</h3>

In [28]:
async for chunk in agentExecutor.astream_events(
    input={"input":"What items are located where the cat is hiding?"},
    version="v2"
    ):
    print(chunk)

{'event': 'on_chain_start', 'data': {'input': {'input': 'What items are located where the cat is hiding?'}}, 'name': 'Agent', 'tags': [], 'run_id': 'cc40f459-4c60-4ae4-a001-c05ec6b1ff43', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {}, 'name': 'RunnableSequence', 'tags': [], 'run_id': '64e048f8-7955-4a41-bec5-83466cff9370', 'metadata': {}, 'parent_ids': ['cc40f459-4c60-4ae4-a001-c05ec6b1ff43']}
{'event': 'on_chain_start', 'data': {}, 'name': 'RunnableAssign<agent_scratchpad>', 'tags': ['seq:step:1'], 'run_id': 'd0b9d1a9-82b4-454e-8947-4cdae2d421dc', 'metadata': {}, 'parent_ids': ['cc40f459-4c60-4ae4-a001-c05ec6b1ff43', '64e048f8-7955-4a41-bec5-83466cff9370']}
{'event': 'on_chain_stream', 'run_id': 'd0b9d1a9-82b4-454e-8947-4cdae2d421dc', 'name': 'RunnableAssign<agent_scratchpad>', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': {'input': 'What items are located where the cat is hiding?', 'intermediate_steps': []}}, 'parent_ids': ['cc40f459-4c60-4ae4-a

In [29]:
async for event in agentExecutor.astream_events(
    {"input": "where is the cat hiding? what items are in that location?"},
    version="v2",
):
    kind=event["event"]
    if kind=="on_chain_start":
        if (
            event["name"]=="Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind=="on_chain_end":
        if (
            event["name"]=="Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind=="on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind=="on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind=="on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

Starting agent: Agent with input: {'input': 'where is the cat hiding? what items are in that location?'}
--
Starting tool: whereCatisHiding with inputs: {}
Done tool: whereCatisHiding
Tool output was: On the Bed
--
--
Starting tool: getItems with inputs: {'place': 'On the Bed'}
Done tool: getItems
Tool output was: cat snacks
--
The| cat| is| hiding| on| the| bed|,| and| there| are| cat| snacks| in| that| location|.|
--
Done agent: Agent with output: The cat is hiding on the bed, and there are cat snacks in that location.


<p> If your tool leverages Langchain runnable objects, (eg. LCEL Chains, LLMs, retrievers, etc) and you want to stream events from those objects as well, you need to make sure that callbacks are propagated correctly</p> 

<h3>Stream Events from within Tools</h3>

In [30]:
@tool
async def getItems(place: str, callbacks:Callbacks) -> str:
    """
        Use this tool to look up which items are in the given place
    """
    template=ChatPromptTemplate.from_messages(
        messages=[
            (
                "human",
                "Can you tell me what kind of items I might find in the following place: {place}"
                "List at least 3 such items separating them by coma. And include a brief description of each item.."
            )
        ]
    )
    
    chain=template|llmOpenAI.with_config(config={
        "run_name":"Get Items LLM",
        "tags":["tool_llm"],
        "callbacks":callbacks
    })

    chunks=[chunk async for chunk in chain.astream(input={"place":place})]
    return "".join(chunk.content for chunk in chunks)

In [31]:
prompt=hub.pull(owner_repo_commit="hwchase17/openai-tools-agent")

tools=[getItems,whereCatisHiding]

agent=create_openai_tools_agent(
    llm=llmOpenAI.with_config({"tags":["agent_llm"]}),
    tools=tools,
    prompt=prompt
)

agentExecutor=AgentExecutor(agent=agent,tools=tools).\
    with_config({"run_name":"Agent"})  # this config will be used later with astream_events API


In [32]:
async for event in agentExecutor.astream_events(
    {"input": "where is the cat hiding? what items are in that location?"},
    version="v2",
):
    kind=event["event"]
    if kind=="on_chain_start":
        if (
            event["name"]=="Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print(
                f"Starting agent: {event['name']} with input: {event['data'].get('input')}"
            )
    elif kind=="on_chain_end":
        if (
            event["name"]=="Agent"
        ):  # Was assigned when creating the agent with `.with_config({"run_name": "Agent"})`
            print()
            print("--")
            print(
                f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}"
            )
    if kind=="on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            # Empty content in the context of OpenAI means
            # that the model is asking for a tool to be invoked.
            # So we only print non-empty content
            print(content, end="|")
    elif kind=="on_tool_start":
        print("--")
        print(
            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
        )
    elif kind=="on_tool_end":
        print(f"Done tool: {event['name']}")
        print(f"Tool output was: {event['data'].get('output')}")
        print("--")

Starting agent: Agent with input: {'input': 'where is the cat hiding? what items are in that location?'}
--
Starting tool: whereCatisHiding with inputs: {}
--
Starting tool: getItems with inputs: {'place': 'unknown'}
Done tool: whereCatisHiding
Tool output was: On the Shelf
--
Sure|!| However|,| I| need| you| to| specify| what| "|unknown|List|"| refers| to| in| order| to| provide| relevant| items|.| Could| you| please| clarify| what| type| of| place| or| category| you're| thinking| about|?|Done tool: getItems
Tool output was: Sure! However, I need you to specify what "unknownList" refers to in order to provide relevant items. Could you please clarify what type of place or category you're thinking about?
--
--
Starting tool: getItems with inputs: {'place': 'On the Shelf'}
Certainly|!| Here| are| three| items| you| might| find| on| a| shelf|:

|1|.| **|Book|**|:| A| hardcover| novel| with| a| captivating| storyline|,| perfect| for| curling| up| and| getting| lost| in| another| world|.| I

<h3> Using AStream Log </h3> <h4>(Recommended to use Astream Log instead of this for Parsing problem)</h4>

In [33]:
i=0
async for chunk in agentExecutor.astream_log(
    input={"input":"Where is the cat hiding? What items are in that location?"}
    ):
    print(chunk)
    i+=1
    if i>20: break

RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '22b7ede4-1ac9-4cc9-9fd2-48d43be67aea',
            'logs': {},
            'name': 'Agent',
            'streamed_output': [],
            'type': 'chain'}})
RunLogPatch({'op': 'add',
  'path': '/logs/RunnableSequence',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'e7a1f47f-cff6-4c34-80c7-56bca7027f89',
            'metadata': {},
            'name': 'RunnableSequence',
            'start_time': '2025-06-30T05:21:17.664+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': [],
            'type': 'chain'}})
RunLogPatch({'op': 'add',
  'path': '/logs/RunnableAssign<agent_scratchpad>',
  'value': {'end_time': None,
            'final_output': None,
            'id': '78753f3b-0db3-4fe4-8131-fa062b368b5b',
            'metadata': {},
            'name': 'RunnableAssign<agent_scratchpad>',
            'start_ti