#### LangChain Essentials Course

# Streaming With Langchain

LangChain is one of the most popular open source libraries for AI Engineers. It's goal is to abstract away the complexity in building AI software, provide easy-to-use building blocks, and make it easier when switching between AI service providers.

In this example, we will introduce LangChain's async streaming, allowing us to receive and view the tokens as they are generated by OpenAI's LLM. The use of streaming is typical in conversational interfaces and can provide a more natural experience for users.

---

> ⚠️ We will be using OpenAI for this example allowing us to run everything via API. If you would like to use Ollama instead, please see the [Ollama version](https://github.com/aurelio-labs/langchain-course/blob/main/notebooks/ollama/06-lcel-ollama.ipynb) of this example.

---

---

> ⚠️ If using LangSmith, add your API key below:

In [None]:
import os
from getpass import getpass

os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGCHAIN_API_KEY") or \
    getpass("Enter LangSmith API Key: ")

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_PROJECT"] = "aurelioai-langchain-course-agent-executor-openai"

---

For the LLM, we'll start by initializing our connection to the OpenAI API. We do need an OpenAI API key, which you can get from the [OpenAI platform](https://platform.openai.com/api-keys).

We will use the `gpt-4o-mini` model with a `temperature` of `0.0`:

In [2]:
import os
from getpass import getpass
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") \
    or getpass("Enter your OpenAI API key: ")

llm = ChatOpenAI(
    model_name="gpt-4o-mini",
    temperature=0.0,
    streaming=True
)

In [3]:
llm_out = llm.invoke("Hello there")
llm_out

AIMessage(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c'}, id='run-ac9dac42-47b4-4aff-ae98-c8e9c27bd145-0')

## Streaming with `astream`

We will start by creating a aysnc stream from our LLM. We do this within an `async for` loop, allowing us to iterate through the chunks of data and use them as soon as the async `astream` method returns the tokens to us. By adding a pipe character `|` we can see the individual tokens that are generated. We set `flush` equal to `True` as this forces immediate output to the console, resulting in smoother streaming.

In [7]:
tokens = []
async for token in llm.astream("What is NLP?"):
    tokens.append(token)
    print(token.content, end="|", flush=True)

|N|LP| stands| for| Natural| Language| Processing|,| which| is| a| sub|field| of| artificial| intelligence| (|AI|)| and| computational| lingu|istics|.| It| focuses| on| the| interaction| between| computers| and| humans| through| natural| language|.| The| goal| of| NLP| is| to| enable| machines| to| understand|,| interpret|,| generate|,| and| respond| to| human| language| in| a| way| that| is| both| meaningful| and| useful|.

|Key| tasks| and| applications| of| NLP| include|:

|1|.| **|Text| Analysis|**|:| Understanding| and| extracting| information| from| text|,| such| as| sentiment| analysis|,| topic| modeling|,| and| summar|ization|.

|2|.| **|Machine| Translation|**|:| Automatically| translating| text| from| one| language| to| another|,| as| seen| in| services| like| Google| Translate|.

|3|.| **|Speech| Recognition|**|:| Con|verting| spoken| language| into| text|,| which| is| used| in| virtual| assistants| like| Siri| and| Alexa|.

|4|.| **|Chat|bots| and| Convers|ational| Agents|*

Since we appended each token to the `tokens` list, we can also see what is inside each and every token. 

In [8]:
tokens[0]

AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-1a46d2ab-ed14-4cd5-9358-60b3532b799c')

In [9]:
tokens[1]

AIMessageChunk(content='N', additional_kwargs={}, response_metadata={}, id='run-1a46d2ab-ed14-4cd5-9358-60b3532b799c')

We can also merge multiple `AIMessageChunk` objects together with the `+` operator, creating a larger set of tokens / chunk:

In [10]:
tokens[0] + tokens[1] + tokens[2] + tokens[3] + tokens[4]

AIMessageChunk(content='NLP stands for', additional_kwargs={}, response_metadata={}, id='run-1a46d2ab-ed14-4cd5-9358-60b3532b799c')

A word of caution, there is nothing preventing you from merging tokens in the incorrect order, so be cautious to not output any token omelettes:

In [11]:
tokens[4] + tokens[3] + tokens[2] + tokens[1] + tokens[0]

AIMessageChunk(content=' for standsLPN', additional_kwargs={}, response_metadata={}, id='run-1a46d2ab-ed14-4cd5-9358-60b3532b799c')

## Streaming with Tool-use

Now we will define a few tools to be used by an async agent executor. When using the default streaming functionality of LangChain, we will see:

* The tool-use steps are streamed in one big chunk, ie we do not return the tool use information token-by-token but instead it streams message-by-message.

* The final LLM output _will_ be streamed token-by-token as we saw above.

In [20]:
from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
    """Add 'x' and 'y'."""
    return x + y

@tool
def multiply(x: float, y: float) -> float:
    """Multiply 'x' and 'y'."""
    return x * y

@tool
def exponentiate(x: float, y: float) -> float:
    """Raise 'x' to the power of 'y'."""
    return x ** y

@tool
def subtract(x: float, y: float) -> float:
    """Subtract 'x' from 'y'."""
    return y - x

We will create our `ChatPromptTemplate`:

In [21]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", "you're a helpful assistant"),
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}"),
])

Now we will generating outputs for the tools to the agent, this is also where we connect the llm and the prompt.

In [22]:
from langchain.agents import create_tool_calling_agent

tools = [add, multiply, exponentiate, subtract]

agent = create_tool_calling_agent(
    llm=llm, tools=tools, prompt=prompt
)

We then create the agent executor, providing it with our `agent` definition and the `tools` functions that the executor will use.

In [23]:
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
)

Now we can use the astream function as opposed to the invoke function, this allows the output of the function to be shown to us as chunks

In [24]:
import pprint

chunks = []

async for chunk in agent_executor.astream(
    {"input": "in the following order, what is 5+5, * 10, to the power of 3, minus 99?"}
):
    chunks.append(chunk)
    print("-"*80)
    pprint.pprint(chunk, depth=2)

--------------------------------------------------------------------------------
{'actions': [ToolAgentAction(tool='add', tool_input={'x': 5, 'y': 5}, log="\nInvoking: `add` with `{'x': 5, 'y': 5}`\n\n\n", message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_0c29cn9yIupogjwpKSUUQZcA', 'function': {'arguments': '{"x": 5, "y": 5}', 'name': 'add'}, 'type': 'function'}, {'index': 1, 'id': 'call_qQySkwhyHhMmD0ydWrNVPtNS', 'function': {'arguments': '{"x": 10, "y": 10}', 'name': 'multiply'}, 'type': 'function'}, {'index': 2, 'id': 'call_LwhclCbEwMiN02Oc0AxVj9gl', 'function': {'arguments': '{"x": 10, "y": 3}', 'name': 'exponentiate'}, 'type': 'function'}, {'index': 3, 'id': 'call_1Qcmlca5mzcWKmrmnmCJO7AO', 'function': {'arguments': '{"x": 99, "y": 99}', 'name': 'subtract'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c'}, id='run-6c5d077f-5789-44

Inside of each chunk we can see different attributes credited to each chunk.

In [66]:
chunks[0]["actions"]

[ToolAgentAction(tool='add', tool_input={'x': 5, 'y': 5}, log="\nInvoking: `add` with `{'x': 5, 'y': 5}`\n\n\n", message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_5LY91QynPiCypmkWL6t95m8p', 'function': {'arguments': '{"x": 5, "y": 5}', 'name': 'add'}, 'type': 'function'}, {'index': 1, 'id': 'call_e7O8LAZf32zNU5ThM6uUMwx3', 'function': {'arguments': '{"x": 10, "y": 10}', 'name': 'multiply'}, 'type': 'function'}, {'index': 2, 'id': 'call_ZVGzvK1x2egCZRAA08ewHPK7', 'function': {'arguments': '{"x": 10, "y": 3}', 'name': 'exponentiate'}, 'type': 'function'}, {'index': 3, 'id': 'call_xO1zz7Jsl6PATzJW5ttMLh0O', 'function': {'arguments': '{"x": 99, "y": 99}', 'name': 'subtract'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'}, id='run-0a931944-a303-48a2-9cbe-dfe323593d0c', tool_calls=[{'name': 'add', 'args': {'x': 5, 'y': 5}, 'id': 'call_5LY9

Within the 'messages' attribute we can see the content of the output.

In [67]:
for chunk in chunks:
    print(chunk["messages"])

[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_5LY91QynPiCypmkWL6t95m8p', 'function': {'arguments': '{"x": 5, "y": 5}', 'name': 'add'}, 'type': 'function'}, {'index': 1, 'id': 'call_e7O8LAZf32zNU5ThM6uUMwx3', 'function': {'arguments': '{"x": 10, "y": 10}', 'name': 'multiply'}, 'type': 'function'}, {'index': 2, 'id': 'call_ZVGzvK1x2egCZRAA08ewHPK7', 'function': {'arguments': '{"x": 10, "y": 3}', 'name': 'exponentiate'}, 'type': 'function'}, {'index': 3, 'id': 'call_xO1zz7Jsl6PATzJW5ttMLh0O', 'function': {'arguments': '{"x": 99, "y": 99}', 'name': 'subtract'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'}, id='run-0a931944-a303-48a2-9cbe-dfe323593d0c', tool_calls=[{'name': 'add', 'args': {'x': 5, 'y': 5}, 'id': 'call_5LY91QynPiCypmkWL6t95m8p', 'type': 'tool_call'}, {'name': 'multiply', 'args': {'x': 10, 'y': 10}, 'id': 'call_e7O8LAZf32zNU5ThM6u

Here we can also see the astream events in action, this shows the chunks within each function, when it starts, ends, produces an output etc...

In [68]:
import asyncio
from typing import Any, Dict, List, Optional

from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage, BaseMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI

import uuid

class MyCustomSyncHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")


class MyCustomAsyncHandler(AsyncCallbackHandler):
    """Async callback handler that can be used to handle callbacks from langchain."""

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when chain starts running."""
        await asyncio.sleep(0.3)
        class_name = serialized["name"]
        yield "Hi! I just woke up. Your llm is starting"

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when chain ends running."""
        await asyncio.sleep(0.3)
        yield "Hi! I just woke up. Your llm is ending"


# To enable streaming, we pass in `streaming=True` to the ChatModel constructor
# Additionally, we pass in a list with our custom handler
chat = ChatOpenAI(
    max_tokens=25,
    streaming=True,
    callbacks=[MyCustomAsyncHandler()],
)

response = chat.astream([HumanMessage(content="Tell me a joke")])

async for token in response:
    print(token)


content='' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content='Why' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content=' don' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content="'t" additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content=' scientists' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content=' trust' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content=' atoms' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content='?\n\n' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content='Because' additional_kwargs={} response_metadata={} id='run-53e12577-999c-491e-937f-26a9444a3f94'
content=' they' additional_kwargs={} response_metadata={} id='run