[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pinecone-io/examples/blob/master/learn/generation/langchain/handbook/09-langchain-streaming/09-langchain-streaming.ipynb) [![Open nbviewer](https://raw.githubusercontent.com/pinecone-io/examples/master/assets/nbviewer-shield.svg)](https://nbviewer.org/github/pinecone-io/examples/blob/master/learn/generation/langchain/handbook/09-langchain-streaming/09-langchain-streaming.ipynb)

#### [LangChain Handbook](https://pinecone.io/learn/langchain)

# Streaming

For LLMs, streaming has become an increasingly popular feature. The idea is to rapidly return tokens as an LLM is generating them, rather than waiting for a full response to be created before returning anything.

Streaming is actually very easy to implement for simple use-cases, but it can get complicated when we start including things like Agents which have their own logic running which can block our attempts at streaming. Fortunately, we can make it work — it just requires a little extra effort.

We'll start easy by implementing streaming to the terminal for LLMs, but by the end of the notebook we'll be handling the more complex task of streaming via FastAPI for Agents.

First, let's install all of the libraries we'll be using.

In [1]:
# !pip install -qU \
#     openai==0.28.0 \
#     langchain==0.0.301 \
#     fastapi==0.103.1 \
#     "uvicorn[standard]"==0.23.2

## LLM Streaming to Stdout

The simplest form of streaming is to simply "print" the tokens as they're generated. To set this up we need to initialize an LLM (one that supports streaming, not all do) with two specific parameters:

* `streaming=True`, to enable streaming
* `callbacks=[SomeCallBackHere()]`, where we pass a LangChain callback class (or list containing multiple).

The `streaming` parameter is self-explanatory. The `callbacks` parameter and callback classes less so — essentially they act as little bits of code that do something as each token from our LLM is generated. As mentioned, the simplest form of streaming is to print the tokens as they're being generated, like with the `StreamingStdOutCallbackHandler`.

In [14]:
import os
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") or "YOUR_API_KEY"
OPENAI_API_KEY = "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"

llm = ChatOpenAI(
    openai_api_key=OPENAI_API_KEY,
    temperature=0.0,
    model_name="gpt-3.5-turbo",
    streaming=True,  # ! important
    callbacks=[StreamingStdOutCallbackHandler()]  # ! important
)

Now if we run the LLM we'll see the response being _streamed_.

In [3]:
from langchain.schema import HumanMessage

# create messages to be passed to chat LLM
messages = [HumanMessage(content="tell me a long story")]

llm(messages)

Once upon a time, in a small village nestled deep within a lush forest, there lived a young girl named Lily. She was known for her vibrant red hair, sparkling green eyes, and a heart full of curiosity. Lily had always been fascinated by the stories her grandmother would tell her about a magical creature called the Moonlight Unicorn.

Legend had it that the Moonlight Unicorn was a rare and majestic creature that only appeared during the full moon. Its coat shimmered like silver, and its horn glowed with a soft, ethereal light. The unicorn was said to possess incredible powers, capable of granting wishes and bringing good fortune to those who encountered it.

Driven by her desire to see the Moonlight Unicorn, Lily embarked on a journey through the enchanted forest. Armed with her grandmother's stories and a sense of adventure, she ventured deeper into the woods, following the faint whispers of the wind.

As she wandered through the forest, Lily encountered various magical creatures. She 

KeyboardInterrupt: 

That was surprisingly easy, but things begin to get much more complicated as soon as we begin using agents. Let's first initialize an agent.

In [4]:
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import load_tools, AgentType, initialize_agent

# initialize conversational memory
memory = ConversationBufferWindowMemory(
    memory_key="chat_history",
    k=5,
    return_messages=True,
    output_key="output"
)

# create a single tool to see how it impacts streaming
tools = load_tools(["llm-math"], llm=llm)

# initialize the agent
agent = initialize_agent(
    agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
    tools=tools,
    llm=llm,
    memory=memory,
    verbose=True,
    max_iterations=3,
    early_stopping_method="generate",
    return_intermediate_steps=False
)

We already added our `StreamingStdOutCallbackHandler` to the agent as we initialized the agent with the same `llm` as we created with that callback. So let's see what we get when running the agent.

In [5]:
prompt = "Hello, how are you?"

agent(prompt)



[1m> Entering new AgentExecutor chain...[0m
{
    "action": "Final Answer",
    "action_input": "I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?"
}[32;1m[1;3m{
    "action": "Final Answer",
    "action_input": "I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?"
}[0m

[1m> Finished chain.[0m


{'input': 'Hello, how are you?',
 'chat_history': [],
 'output': "I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?"}

Not bad, but we do now have the issue of streaming the _entire_ output from the LLM. Because we're using an agent, the LLM is instructed to output the JSON format we can see here so that the agent logic can handle tool usage, multiple "thinking" steps, and so on. For example, if we ask a math question we'll see this:

In [6]:
agent("what is the square root of 71?")



[1m> Entering new AgentExecutor chain...[0m
{
    "action": "Calculator",
    "action_input": "sqrt(71)"
}[32;1m[1;3m{
    "action": "Calculator",
    "action_input": "sqrt(71)"
}[0m```text
sqrt(71)
```
...numexpr.evaluate("sqrt(71)")...

Observation: [36;1m[1;3mAnswer: 8.426149773176359[0m
Thought:{
    "action": "Final Answer",
    "action_input": "The square root of 71 is approximately 8.426149773176359."
}[32;1m[1;3m{
    "action": "Final Answer",
    "action_input": "The square root of 71 is approximately 8.426149773176359."
}[0m

[1m> Finished chain.[0m


{'input': 'what is the square root of 71?',
 'chat_history': [HumanMessage(content='Hello, how are you?', additional_kwargs={}, example=False),
  AIMessage(content="I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?", additional_kwargs={}, example=False)],
 'output': 'The square root of 71 is approximately 8.426149773176359.'}

It's interesting to see during development but we'll want to clean this streaming up a little in any actual use-case. For that we can go with two approaches — either we build a custom callback handler, or use a purpose built callback handler from LangChain (as usual, LangChain has something for everything). Let's first try LangChain's purpose-built `FinalStreamingStdOutCallbackHandler`.

We will overwrite the existing `callbacks` attribute found here:

In [7]:
agent.agent.llm_chain.llm

ChatOpenAI(cache=None, verbose=False, callbacks=[<langchain.callbacks.streaming_stdout.StreamingStdOutCallbackHandler object at 0x0000021F61C7BA90>], callback_manager=None, tags=None, metadata=None, client=<class 'openai.api_resources.chat_completion.ChatCompletion'>, model_name='gpt-3.5-turbo', temperature=0.0, model_kwargs={}, openai_api_key='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', openai_api_base='', openai_organization='', openai_proxy='', request_timeout=None, max_retries=6, streaming=True, n=1, max_tokens=None, tiktoken_model_name=None)

With the new callback handler:

In [8]:
from langchain.callbacks.streaming_stdout_final_only import (
    FinalStreamingStdOutCallbackHandler,
)

agent.agent.llm_chain.llm.callbacks = [
    FinalStreamingStdOutCallbackHandler(
        answer_prefix_tokens=["Final", "Answer"]
    )
]

Let's try it:

In [9]:
agent("what is the square root of 71?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{
    "action": "Calculator",
    "action_input": "sqrt(71)"
}[0m
Observation: [36;1m[1;3mAnswer: 8.426149773176359[0m
Thought:",
    "action_input": "The square root of 71 is approximately 8.426149773176359."
}[32;1m[1;3m{
    "action": "Final Answer",
    "action_input": "The square root of 71 is approximately 8.426149773176359."
}[0m

[1m> Finished chain.[0m


{'input': 'what is the square root of 71?',
 'chat_history': [HumanMessage(content='Hello, how are you?', additional_kwargs={}, example=False),
  AIMessage(content="I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?", additional_kwargs={}, example=False),
  HumanMessage(content='what is the square root of 71?', additional_kwargs={}, example=False),
  AIMessage(content='The square root of 71 is approximately 8.426149773176359.', additional_kwargs={}, example=False)],
 'output': 'The square root of 71 is approximately 8.426149773176359.'}

Not quite there, we should really clean up the `answer_prefix_tokens` argument but it is hard to get right. It's generally easier to use a custom callback handler like so:

In [10]:
import sys

class CallbackHandler(StreamingStdOutCallbackHandler):
    def __init__(self):
        self.content: str = ""
        self.final_answer: bool = False

    def on_llm_new_token(self, token: str, **kwargs: any) -> None:
        self.content += token
        if "Final Answer" in self.content:
            # now we're in the final answer section, but don't print yet
            self.final_answer = True
            self.content = ""
        if self.final_answer:
            if '"action_input": "' in self.content:
                if token not in ["}"]:
                    sys.stdout.write(token)  # equal to `print(token, end="")`
                    sys.stdout.flush()

agent.agent.llm_chain.llm.callbacks = [CallbackHandler()]

Let's try again:

In [11]:
agent("what is the square root of 71?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{
    "action": "Calculator",
    "action_input": "sqrt(71)"
}[0m
Observation: [36;1m[1;3mAnswer: 8.426149773176359[0m
Thought: "The square root of 71 is approximately 8.426149773176359."
[32;1m[1;3m{
    "action": "Final Answer",
    "action_input": "The square root of 71 is approximately 8.426149773176359."
}[0m

[1m> Finished chain.[0m


{'input': 'what is the square root of 71?',
 'chat_history': [HumanMessage(content='Hello, how are you?', additional_kwargs={}, example=False),
  AIMessage(content="I'm an AI language model, so I don't have feelings, but I'm here to help you. How can I assist you today?", additional_kwargs={}, example=False),
  HumanMessage(content='what is the square root of 71?', additional_kwargs={}, example=False),
  AIMessage(content='The square root of 71 is approximately 8.426149773176359.', additional_kwargs={}, example=False),
  HumanMessage(content='what is the square root of 71?', additional_kwargs={}, example=False),
  AIMessage(content='The square root of 71 is approximately 8.426149773176359.', additional_kwargs={}, example=False)],
 'output': 'The square root of 71 is approximately 8.426149773176359.'}

In [12]:
agent.agent.llm_chain.llm

ChatOpenAI(cache=None, verbose=False, callbacks=[<__main__.CallbackHandler object at 0x0000021F73785990>], callback_manager=None, tags=None, metadata=None, client=<class 'openai.api_resources.chat_completion.ChatCompletion'>, model_name='gpt-3.5-turbo', temperature=0.0, model_kwargs={}, openai_api_key='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', openai_api_base='', openai_organization='', openai_proxy='', request_timeout=None, max_retries=6, streaming=True, n=1, max_tokens=None, tiktoken_model_name=None)

It isn't perfect, but this is getting better. Now, in most scenarios we're unlikely to simply be printing output to a terminal or notebook. When we want to do something more complex like stream this data through another API, we need to do things differently.

## Using FastAPI with Agents

In most cases we'll be placing our LLMs, Agents, etc behind something like an API. Let's add that into the mix and see how we can implement streaming for agents with FastAPI.

First, we'll create a simple `main.py` script to contain our FastAPI logic. You can find it in the same GitHub repo location as this notebook ([here's a link](https://github.com/pinecone-io/examples/blob/langchain-streaming/learn/generation/langchain/handbook/09-langchain-streaming/main.py)).

To run the API, navigate to the directory and run `uvicorn main:app --reload`. Once complete, you can confirm it is running by looking for the 🤙 status in the next cell output:

In [15]:
import requests

res = requests.get("http://localhost:8000/health")
res.json()

ConnectionError: HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: /health (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x0000021F76BE3950>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))

In [15]:
res = requests.get("http://localhost:8000/chat",
    json={"text": "hello there!"}
)
res

<Response [200]>

In [16]:
res.json()

{'input': 'hello there!',
 'chat_history': [],
 'output': 'Hello! How can I assist you today?'}

Unlike with our StdOut streaming, we now need to send our tokens to a generator function that feeds those tokens to FastAPI via a `StreamingResponse` object. To handle this we need to use async code, otherwise our generator will not begin emitting anything until _after_ generation is already complete.

The `Queue` is accessed by our callback handler, as as each token is generated, it puts the token into the queue. Our generator function asyncronously checks for new tokens being added to the queue. As soon as the generator sees a token has been added, it gets the token and yields it to our `StreamingResponse`.

To see it in action, we'll define a stream requests function called `get_stream`:

In [16]:
def get_stream(query: str):
    s = requests.Session()
    with s.get(
        "http://localhost:8000/chat",
        stream=True,
        json={"text": query}
    ) as r:
        for line in r.iter_content():
            print(line.decode("utf-8"), end="")

In [17]:
get_stream("hi there!")

ConnectionError: HTTPConnectionPool(host='localhost', port=8000): Max retries exceeded with url: /chat (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x0000021F76C34390>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))