# Create streaming service

In [47]:
from dotenv import load_dotenv 
load_dotenv(override=True)

True

In [48]:
import os
os.environ['GOOGLE_API_KEY'] = os.getenv('GOOGLE_API_KEY')
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')

In [49]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
from langchain_ollama import ChatOllama

# create llms 
google_llm = ChatGoogleGenerativeAI(temperature=0, model="gemini-3-flash-preview")
openai_llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")
ollama_llm = ChatOllama(temperature=0, model="ministral-3:14b")

In [None]:
# google_llm.invoke("Hello")

AIMessage(content=[{'type': 'text', 'text': 'Hello! How can I help you today?', 'extras': {'signature': 'Eq4CCqsCAXLI2nwC3+r4ReVX+3ZWHzppIWXNqhQURWknHwMt+KD9ffybj/8ac0/ykuxPeFPC1kdetT4zwl0LDMOtC9GPxEE5RrQHPyS87V36N61qZGxgDsAbfiHu2WvLBNT4S+Cm+M6C6rin8z6IUrDZey0duyubEh+RlaaZfg5smTU6VQMN3lO8TXvvHb6CDIIZx/6+hLuMpgT/7p4Z8YpF3Zim6v1WMFYsgbGqNmnCLnMDGVL0Zw+tMMCFqbbNr//MmwGIQWMAqmFYj9yqk02ThFL+ItEBJhlssDwos6zT7CeepbxKRxYjIzpt/YExl4gbC3u3plwtZSGjw7Trt+cmotPD1Ngfn9Au9+InThLLsQG8psBIfZDgsK8AOCclOZSNfzToIBvAPP33anY4EUE='}}], additional_kwargs={}, response_metadata={'finish_reason': 'STOP', 'model_name': 'gemini-3-flash-preview', 'safety_ratings': [], 'model_provider': 'google_genai'}, id='lc_run--019b8916-63da-7673-b649-3cd4de645d9c-0', usage_metadata={'input_tokens': 2, 'output_tokens': 77, 'total_tokens': 79, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 68}})

In [50]:
ollama_llm.invoke("Hello")

AIMessage(content="Hello! ðŸ˜Š How can I assist you today? Whether you have questions, need help with something, or just want to chat, I'm here for you!", additional_kwargs={}, response_metadata={'model': 'ministral-3:14b', 'created_at': '2026-01-04T13:11:42.495037Z', 'done': True, 'done_reason': 'stop', 'total_duration': 36931369041, 'load_duration': 5605820041, 'prompt_eval_count': 555, 'prompt_eval_duration': 28563513875, 'eval_count': 35, 'eval_duration': 2706343670, 'logprobs': None, 'model_name': 'ministral-3:14b', 'model_provider': 'ollama'}, id='lc_run--019b8922-0055-7363-861c-caebc8a0e5bb-0', usage_metadata={'input_tokens': 555, 'output_tokens': 35, 'total_tokens': 590})

### Streaming with astream

In [None]:
tokens = [] 
async for token in ollama_llm.astream("What are autoencoders?"):
    tokens.append(token)
    print (token.content, end='|', flush=True)

### Implementing streaming feature for agent. 

As we know to build an agent executor we need - 
- Tools 
- ChatPromptTemplate
- LLM 
- Agent 
- Agent Executor

In [7]:
from langchain.tools import tool 

In [8]:
@tool
def add(x:float, y:float) -> float: 
    "Add 'x' and 'y' "
    return x + y 

@tool
def substract(x:float, y:float) -> float: 
    "Substract 'y' from 'x' "
    return x - y 

@tool
def multiply(x:float, y:float) -> float: 
    "Multiply 'x' and 'y' "
    return x * y 

@tool
def divide(x:float, y:float) -> float: 
    "Divide 'x' by 'y' "
    return x / y 

@tool
def exponential(x:float, y:float) -> float: 
    "Raise 'x' to the power of 'y' "
    return x ** y

@tool 
def final_answer(answer:str, tools_used:list[str]) -> str:
    """ Use this tool to provide the final answer to the user. 
    The answer should be in natural language as this will be provided to user directly.
    tools_used should contain list of tools used within the `scratchpad`
    """

    return {'answer': answer, "tools_used": tools_used}

In [9]:
# create list of tools 
tools = [add, substract, multiply, divide, exponential, final_answer]

### Chat Prompt template

In [10]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You are a helpful assistant. When answering user's query you should first use one of the tool provided. "
        "After using the tool, the tool output will be provided in scratchpad below."
        "You MUST then use final_answer tool to provide an answer to the user. DO NOT use the same tool more than once."
    )),
    MessagesPlaceholder(variable_name="chat_history"), 
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad")
])

### LCEL Declaration for agent 

In [11]:
from langchain_core.runnables.base import RunnableSerializable

agent : RunnableSerializable = (
    {
        "input" : lambda x : x["input"], 
        "chat_history" : lambda x : x["chat_history"], 
        "agent_scratchpad" : lambda x: x.get("agent_scratchpad", [])
    }
    | prompt 
    | ollama_llm.bind_tools(tools, tool_choice="any")
)

### Agent Executor

In [12]:
import json 
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

# create name2tool lookup 
name2tool = {tool.name : tool.func for tool in tools}


In [13]:
name2tool

{'add': <function __main__.add(x: float, y: float) -> float>,
 'substract': <function __main__.substract(x: float, y: float) -> float>,
 'multiply': <function __main__.multiply(x: float, y: float) -> float>,
 'divide': <function __main__.divide(x: float, y: float) -> float>,
 'exponential': <function __main__.exponential(x: float, y: float) -> float>,
 'final_answer': <function __main__.final_answer(answer: str, tools_used: list[str]) -> str>}

In [None]:
# ollama_openai_llm = ChatOpenAI(temperature=0, model="ministral-3:14b", api_key="ollama_key", base_url=os.getenv('OLLAMA_BASE_URL'))

In [None]:
class CustomAgentExecutor: 
    chat_history: list[BaseMessage]

    def __init__(self, max_iteration:int=3):
        self.chat_history = [] 
        self.max_iterations = max_iteration
        self.agent: RunnableSerializable = (
            {
                "input" : lambda x : x["input"], 
                "chat_history" : lambda x : x["chat_history"], 
                "agent_scratchpad" : lambda x: x.get("agent_scratchpad", [])
            }
            | prompt 
            | openai_llm.bind_tools(tools, tool_choice="any")
        )

    def invoke(self, input:str) -> dict:
        count = 0
        agent_scratchpad = [] 

        while count < self.max_iterations:
            out = self.agent.invoke({
                "input": input, 
                "chat_history": self.chat_history, 
                "agent_scratchpad": agent_scratchpad
            })

            # if final_answer tool called -> come out of the loop 
            print (out)
            print()
            print(out.tool_calls)

            # if out.tool_calls:

            if out.tool_calls[0]["name"] == "final_answer":
                break 
            
            # add the output to agent scratchpad
            agent_scratchpad.append(out)
            tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
            action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
            agent_scratchpad.append({
                "role": "tool", 
                "content": action_str, 
                "tool_call_id": out.tool_calls[0]["id"]
            })

            print (f"{count}: {action_str}")
            count += 1 

        final_answer = out.tool_calls[0]["args"]
        final_answer_str = json.dumps(final_answer)
        self.chat_history.append({
            "input": input, 
            "output": final_answer_str
        })
        self.chat_history.extend([
            HumanMessage(content=input), 
            AIMessage(content=final_answer_str)
        ])

        return final_answer


In [58]:
custom_agent = CustomAgentExecutor()

In [59]:
custom_agent.invoke(input="What is 2+8")

content='' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 281, 'total_tokens': 298, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_29330a9688', 'id': 'chatcmpl-CuIFJ0QeVsDxmBpPiWMyZQ77vflJ8', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None} id='lc_run--019b8926-3171-77f0-92be-3a038947ed44-0' tool_calls=[{'name': 'add', 'args': {'x': 2, 'y': 8}, 'id': 'call_GeIWGWQ4K8kWG6vazazWz7Td', 'type': 'tool_call'}] usage_metadata={'input_tokens': 281, 'output_tokens': 17, 'total_tokens': 298, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}

[{'name': 'add', 'args': {'x': 2, 'y': 8}, 'id': 'call_GeI

{'answer': 'The result of 2 + 8 is 10.', 'tools_used': ['functions.add']}

```
Works well for single operation. Fails with key error for role and all for multiple operations

In [61]:
custom_agent.chat_history

[{'input': 'What is 2+8',
  'output': '{"answer": "The result of 2 + 8 is 10.", "tools_used": ["functions.add"]}'},
 HumanMessage(content='What is 2+8', additional_kwargs={}, response_metadata={}),
 AIMessage(content='{"answer": "The result of 2 + 8 is 10.", "tools_used": ["functions.add"]}', additional_kwargs={}, response_metadata={})]

``` 
Notice ollama is putting everying in content instead of tool call :) 
Need to fix this

In [62]:
custom_agent_ministral = CustomAgentExecutor()
custom_agent_ministral.invoke(input="What is 2+8")

content='add[ARGS]{"x": 2, "y": 8}' additional_kwargs={} response_metadata={'model': 'ministral-3:14b', 'created_at': '2026-01-04T13:17:01.772463Z', 'done': True, 'done_reason': 'stop', 'total_duration': 11086397542, 'load_duration': 126107708, 'prompt_eval_count': 455, 'prompt_eval_duration': 9906908583, 'eval_count': 15, 'eval_duration': 1005357207, 'logprobs': None, 'model_name': 'ministral-3:14b', 'model_provider': 'ollama'} id='lc_run--019b8927-447a-7e81-8834-ab04a199f5d9-0' usage_metadata={'input_tokens': 455, 'output_tokens': 15, 'total_tokens': 470}

[]


IndexError: list index out of range

### Implementing Async call to Agent Executor 

In [91]:
# define aysncio llm for streaming the data 
from langchain_core.runnables import ConfigurableField

stream_llm = ChatOpenAI(temperature=0, model="gpt-4o-mini", streaming=True).configurable_fields(callbacks=ConfigurableField(id="callbacks", name="callbacks", description="List of callbacks to use for streaming"))

In [92]:
agent : RunnableSerializable = (
    {
        "input" : lambda x : x["input"], 
        "chat_history" : lambda x : x["chat_history"], 
        "agent_scratchpad" : lambda x: x.get("agent_scratchpad", [])
    }
    | prompt 
    | stream_llm.bind_tools(tools, tool_choice="any")
)

In [93]:
import asyncio 
from langchain_core.callbacks.base import AsyncCallbackHandler

class QueueCallbackHandler(AsyncCallbackHandler):
    """ Call back handler to put tokens generated to queue """

    def __init__(self, queue:asyncio.Queue):
        self.queue = queue 
        self.final_answer_seen = False 

    async def __aiter__(self):
        while True: 
            if self.queue.empty():
                await asyncio.sleep(0.1)
                continue 

            token_or_done = await self.queue.get()

            if token_or_done == "<<DONE>>":
                return 
            
            if token_or_done:
                yield token_or_done

    async def on_llm_new_token(self, *args, **kwargs) -> None:
        """ Put token on the queue """

        chunk = kwargs.get("chunk")
        if chunk: 

            # := operator will assign the value in the right side to left side; it is like checking if left has something or not and equal to right side
            if tool_calls := chunk.message.additional_kwargs.get("tool_calls"):
                if tool_calls[0]["function"]["name"] == "final_answer":
                    self.final_answer_seen = True 

        self.queue.put_nowait(kwargs.get("chunk"))
        return 
    

    async def on_llm_end(self, *args, **kwargs) -> None:
        """ Put message to queue to signal completion"""

        if self.final_answer_seen:
            self.queue.put_nowait("<<DONE>>")
        else: 
            self.queue.put_nowait("<<STEP_END>>")
        return 
            

In [94]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

In [95]:
tokens = [] 
async def stream(query:str):
    response = agent.with_config(callbacks=[streamer])

    async for token in response.astream({
        "input" : query, 
        "chat_history": [], 
        "agent_scratchpad": []
    }):
        tokens.append(token)
        print(token, flush=True)

In [96]:
await stream("What is 788+25?")

content='' additional_kwargs={} response_metadata={'model_provider': 'openai'} id='lc_run--019b897a-c319-7352-a4f5-a771a5836f18' tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_0xxQ3mrbi0s6qtyeUwmY0G3N', 'type': 'tool_call'}] tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_0xxQ3mrbi0s6qtyeUwmY0G3N', 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={} response_metadata={'model_provider': 'openai'} id='lc_run--019b897a-c319-7352-a4f5-a771a5836f18' tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}] tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={} response_metadata={'model_provider': 'openai'} id='lc_run--019b897a-c319-7352-a4f5-a771a5836f18' invalid_tool_calls=[{'name': None, 'args': 'x', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': 'x', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' add

In [97]:
# lets see one single message from all these streams values 
tk = tokens[0]
for token in tokens[1:]: 
    tk += token
tk

AIMessageChunk(content='', additional_kwargs={}, response_metadata={'model_provider': 'openai', 'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_29330a9688', 'service_tier': 'default'}, id='lc_run--019b897a-c319-7352-a4f5-a771a5836f18', tool_calls=[{'name': 'add', 'args': {'x': 788, 'y': 25}, 'id': 'call_0xxQ3mrbi0s6qtyeUwmY0G3N', 'type': 'tool_call'}], usage_metadata={'input_tokens': 282, 'output_tokens': 17, 'total_tokens': 299, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}, tool_call_chunks=[{'name': 'add', 'args': '{"x":788,"y":25}', 'id': 'call_0xxQ3mrbi0s6qtyeUwmY0G3N', 'index': 0, 'type': 'tool_call_chunk'}], chunk_position='last')

### Modify our CustomAgentExecutor

In [None]:
from langchain_core.messages import ToolMessage

class CustomStreamAgentExecutor:
    chat_history = list[BaseMessage]

    def __init__(self, max_iterations:int=3):
        self.chat_history = [] 
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"], 
                "chat_history": lambda x: x["chat_history"], 
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt 
            | stream_llm.bind_tools(tools, tool_choice="any")
        )

    async def invoke(self, input:str, streamer:QueueCallbackHandler, verbose:bool=False):
        count = 0 
        agent_scratchpad = [] 

        while count < self.max_iterations:
            async def stream(query:str):
                response = self.agent.with_config(callbacks=[streamer])
                output = None 
                async for token in response.astream({
                    "input": query, 
                    "chat_history": self.chat_history, 
                    "agent_scratchpad": agent_scratchpad
                }):
                    if output is None: 
                        output = token 
                    else:
                        output += token 
                    
                    if token.content != "": 
                        if verbose: print (f"content: {token.content}", flush=True)


                    tool_calls = token.additional_kwargs.get("tool_calls")
                    if tool_calls:
                        if verbose: print(f"tools call: {tool_calls}", flush=True)
                        tool_name = tool_calls[0]["function"]["name"]

                        if tool_name:
                            if verbose: print (f"tool name: {tool_name}", flush=True)
                        
                            arg = tool_calls[0]["function"]["arguments"]
                            if arg != "": 
                                if verbose: print (f"Arguments: {arg}", flush=True)

                return AIMessage(
                    content=output.content, 
                    tool_calls=output.tool_calls, 
                    tool_call_id=output.tool_calls[0]["id"]
                )
            
            tool_call = await stream(query=input)
            agent_scratchpad.append(tool_call)
            tool_name = tool_call.tool_calls[0]["name"]
            tool_args = tool_call.tool_calls[0]["args"]
            tool_call_id = tool_call.tool_call_id

            tool_out = name2tool[tool_name](**tool_args)
            tool_exec = ToolMessage(
                content=f"{tool_out}", 
                tool_call_id=tool_call_id
            )

            agent_scratchpad.append(tool_exec)
            count += 1

            if tool_name == "final_answer":
                break 
        
        final_answer = tool_out["answer"]
        self.chat_history.extend([
            HumanMessage(content=input), 
            AIMessage(content=final_answer)
        ])

        return tool_args
    

custom_stream_agent = CustomStreamAgentExecutor()


In [99]:
queue=asyncio.Queue()
streamer = QueueCallbackHandler(queue)
out = await custom_stream_agent.invoke("What is 788+25?", streamer=streamer, verbose=True)

In [100]:
out

{'answer': 'The sum of 788 and 25 is 813.', 'tools_used': ['functions.add']}

### Right way to print 

In [101]:
queue=asyncio.Queue()
streamer = QueueCallbackHandler(queue)
task = asyncio.create_task(custom_stream_agent.invoke("What is 788+25?", streamer))

async for token in streamer:
    print (token)
    if token == "<<STEP_END>>":
        print ("\n", flush=True)
    
    elif tool_calls := token.message.additional_kwargs.get("tool_calls"):
        if tool_name := tool_calls[0]["function"]["name"]:
            print (f"Calling {tool_name}", flush=True)
        if tool_args := tool_calls[0]["function"]["arguments"]:
            print (f"({tool_args})", end="", flush=True)

_ = await task

message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'model_provider': 'openai'}, id='lc_run--019b897b-23b9-7243-9326-0d8fcaa3d4e5', tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_nvPpk83yfdoDjgu6id2db4z5', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_nvPpk83yfdoDjgu6id2db4z5', 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'model_provider': 'openai'}, id='lc_run--019b897b-23b9-7243-9326-0d8fcaa3d4e5', tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}], tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'model_provider': 'openai'}, id='lc_run--019b897b-23b9-7243-9326-0d8fcaa3d4e5', invalid_tool_calls=[{'name': None, 'args': 'x', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name':

CancelledError: 