Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using a Agent and wanted to stream just the final response #2483

Closed
zhuxiaobin opened this issue Apr 6, 2023 · 27 comments · Fixed by #4630
Closed

using a Agent and wanted to stream just the final response #2483

zhuxiaobin opened this issue Apr 6, 2023 · 27 comments · Fixed by #4630

Comments

@zhuxiaobin
Copy link

I am using a Agent and wanted to stream just the final response, do you know if that is supported already? and how to do it?

@Wzy232303
Copy link

Me,too

@WarlaxZ
Copy link

WarlaxZ commented Apr 23, 2023

Ticked the top reply but commenting to bump the issue

@Prayforhanluo
Copy link

Me, too

@pelyhe
Copy link

pelyhe commented May 9, 2023

Do you have any solution for this?

@UmerHA
Copy link
Contributor

UmerHA commented May 9, 2023

Do you want to just get the final response, without intermediate output? Doesn't verbose=False do the trick?

image

If you're looking for something else, feel free to describe it. I'd be happy to implement it :)

@zhuxiaobin @Wzy232303

@pelyhe
Copy link

pelyhe commented May 9, 2023

I think the main question here is not that we want to get only the final answer, but the streaming part, so the user does have to wait 20-30 seconds for the answer, but it comes by words so the response seems faster.

@hgoona
Copy link

hgoona commented May 13, 2023

is there a solution to this?

@pelyhe
Copy link

pelyhe commented May 13, 2023

As far as im concerned, no

@UmerHA
Copy link
Contributor

UmerHA commented May 13, 2023

Done ✅ - see #4630

Recording.mp4

@Prayforhanluo
Copy link

can we use agent.run(or any other method) returns as a generator so that we can do some custom processing for outputs? @UmerHA thanks

@UmerHA
Copy link
Contributor

UmerHA commented May 14, 2023

You can subclass FinalStreamingStdOutCallbackHandler and override its on_llm_new_token to get custom processing.
What's the specific use case you have in mind? @Prayforhanluo

@Prayforhanluo
Copy link

In fact, I need to return each char just like openai stream, I need to wrap each char in json with some additional information and pass it to the downstream service @UmerHA

@UmerHA
Copy link
Contributor

UmerHA commented May 15, 2023

LLMs return the output on a token-level, not character-level. Making the characters appear one after another is a UI trick.

Still, you can get your desired behavior by subclassing FinalStreamingStdOutCallbackHandler, adding an internal buffer and turning it into a generator. Here's the code:

import time
from datetime import datetime
from typing import Dict, List, Any, Tuple, Optional
from langchain.callbacks.streaming_stdout_final_only import FinalStreamingStdOutCallbackHandler
from langchain.schema import LLMResult
from uuid import UUID

class CustomStreamingStdOutCallbackHandler(FinalStreamingStdOutCallbackHandler):

	buffer: List[Tuple[str, float]] = []
	stop_token = "#!stop!#"
		
	def on_llm_start(
		self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
	) -> None:
		super().on_llm_start(serialized, prompts, **kwargs)
		self.buffer = []
		
	def on_llm_end(
		self,
		response: LLMResult,
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		**kwargs: Any,
	) -> Any:
		self.add_to_buffer(self.stop_token)
	
	def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
		# Remember the last n tokens, where n = len(answer_prefix_tokens)
		self.last_tokens.append(token)
		if len(self.last_tokens) > len(self.answer_prefix_tokens):
			self.last_tokens.pop(0)

		# Check if the last n tokens match the answer_prefix_tokens list ...
		if self.last_tokens == self.answer_prefix_tokens:
			self.answer_reached = True
			# Do not print the last token in answer_prefix_tokens,
			# as it's not part of the answer yet
			return

		# ... if yes, then append tokens to buffer
		if self.answer_reached:
			self.add_to_buffer(token)
			
	def add_to_buffer(self, token:str) -> None:
		now = datetime.now()
		self.buffer.append((token, now))

	def stream_chars(self):
		while True:
			# when we didn't receive any token yet, just continue
			if len(self.buffer) == 0:
				continue
			
			token, timestamp = self.buffer.pop(0)
			
			if token != self.stop_token:
				for character in token:
					yield (character, timestamp)
					time.sleep(0.2)  # Remove this line. It's just for illustration purposes
			else:
				break

You can use it like this:

import os
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.llms import OpenAI
from langchain.utilities import WikipediaAPIWrapper

stream_handler = CustomStreamingStdOutCallbackHandler()

llm = OpenAI(streaming=True, callbacks=[stream_handler], temperature=0)
tools = load_tools(["wikipedia", "llm-math"], llm=llm)
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=False)
agent.run("It's 2023 now. How many years ago did Konrad Adenauer become Chancellor of Germany.")

for character, timestamp in stream_handler.stream_chars():
	print(f"Received: |{character}| at {timestamp}")

@Prayforhanluo

Edit: formatting

@stoneHah
Copy link

FinalStreamingStdOutCallbackHandler

I updated the langchain to the latest version(v0.0.169), but I didn't find the FinalStreamingStdOutCallbackHandler

@UmerHA
Copy link
Contributor

UmerHA commented May 15, 2023

FinalStreamingStdOutCallbackHandler

I updated the langchain to the latest version(v0.0.169), but I didn't find the FinalStreamingStdOutCallbackHandler

The PR (#4630) hasn't been merged yet, so it's not part of the code base yet. Waiting for the LangChain team to merge it.

@My3VM
Copy link

My3VM commented May 16, 2023

Have been facing the same challenge! Eagerly waiting to take benefit from this PR. Hoping they'd soon merge with master.

dev2049 added a commit that referenced this issue May 20, 2023
# Streaming only final output of agent (#2483)
As requested in issue #2483, this Callback allows to stream only the
final output of an agent (ie not the intermediate steps).

Fixes #2483

Co-authored-by: Dev 2049 <dev.dev2049@gmail.com>
@thaiminhpv
Copy link
Contributor

can we use agent.run(or any other method) returns as a generator so that we can do some custom processing for outputs?

I have created a PR #5937 to support returning a generator by using simple syntax:

Basic setup

from langchain.agents import load_tools, initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.callbacks import StreamingLastResponseCallbackHandler

llm = OpenAI(temperature=0, streaming=True)
tools = load_tools(["serpapi", "llm-math"], llm=llm)
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=False)

Then this is all you need to do

import threading

stream = StreamingLastResponseCallbackHandler.from_agent_type(agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)

def _run():
    agent.run("Who is Leo DiCaprio's girlfriend? What is her current age raised to the 0.43 power?", callbacks=[stream])
threading.Thread(target=_run).start()

for token in stream:  # <-- generator here
    print(token, end="", flush=True)

if you want to do it the callbacks way then it is also supported:

@stream.on_last_response_new_token()
def on_new_token(token: str):
    print(f"Next token: '{token}'")

@arysttoto
Copy link

Done ✅ - see #4630

Recording.mp4

Have you noticed the fact that not all types of agents can stream the reponse? Or is it just for me???

@ericbellet
Copy link

@linpan
Copy link
Contributor

linpan commented Sep 22, 2023

Me,too

@jsham042
Copy link

For what Its worth, if you're using AgentType.CONVERSATIONAL_REACT_DESCRIPTION, this is the callbacks=[FinalStreamingStdOutCallbackHandler(answer_prefix_tokens=['AI', ':'])

@usersina
Copy link

usersina commented Nov 26, 2023

I am not a fan of using stdout when not necessary, therefore I created my own CustomAsyncCallbackHandler based on AsyncIteratorCallbackHandler for now, until PR #5937 is merged:

import asyncio
from typing import Any, AsyncIterator, Dict, List, Literal, Union, cast

from langchain.callbacks.base import AsyncCallbackHandler
from langchain.schema.agent import AgentAction, AgentFinish
from langchain.schema.output import LLMResult


class CustomAsyncCallbackHandler(AsyncCallbackHandler):
    """
    Streaming callback handler that returns an async iterator. This supports
    both streaming llm and agent executors.

    :param is_agent: Whether this is an agent executor.
    """

    queue: asyncio.Queue[str]
    done: asyncio.Event

    @property
    def always_verbose(self) -> bool:
        return True

    def __init__(self, is_agent: bool = False) -> None:
        self.queue = asyncio.Queue()
        self.done = asyncio.Event()
        self.is_agent = is_agent

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        print("================== LLM Start! ==========================")
        self.done.clear()

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        if token is not None and token != "":
            self.queue.put_nowait(token)

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """
        Do not close the queue here, as it may be used by the agent.
        """
        if not self.is_agent:
            self.done.set()
        else:
            print("================== LLM finished! ==========================")
            print(response)
            generation_info = response.generations[-1][-1].generation_info
            if generation_info is not None:
                print(
                    "================== LLM finish reason! =========================="
                )
                # Figured out through trial and error
                if generation_info.get("finish_reason") == "stop":
                    self.done.set()

    async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
        if not self.is_agent:
            self.done.set()

    # async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> None:
    #     """Run on agent action."""

    # async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None:
    #    """Run on agent end."""
    #    self.done.set()

    # async def on_chain_start(
    #    self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
    # ) -> None:
    #    self.done.clear()

    # async def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
    #    """Run when chain ends running."""
    #   self.done.set()

    async def aiter(self) -> AsyncIterator[str]:
        """
        Returns an async iterator that yields tokens from the LLM.
        """
        while not self.queue.empty() or not self.done.is_set():
            # Wait for the next token in the queue or for the done event to be set
            done, other = await asyncio.wait(
                [
                    asyncio.ensure_future(self.queue.get()),
                    asyncio.ensure_future(self.done.wait()),
                ],
                return_when=asyncio.FIRST_COMPLETED,
            )

            # Cancel the other task
            if other:
                other.pop().cancel()

            # Extract the value of the first completed task
            token_or_done = cast(Union[str, Literal[True]], done.pop().result())

            # If the extracted value is the boolean True, the done event was set
            if token_or_done is True:
                break

            # Otherwise, the extracted value is a token, which we yield
            yield token_or_done

Edit: After some more details testing, looks like on_agent_action, on_agent_finish and on_chain_end are never getting called for some reason. I opted to check the last generation_info's finish_reason.

For completeness purposes, here's a FastAPI route:

@router.post("/")
async def general(input: Request):
    # Create the memory out of the provided messages list
    prompt = input.prompt
    memory = ConversationSummaryBufferMemory(
        llm=OpenAI(model="gpt-3.5-turbo-instruct"),
        chat_memory=get_chat_memory(input.messages or []),
    )

    # Create the conversation and the async callback handler
    handler = CustomAsyncCallbackHandler()
    conversation = ConversationChain(
        llm=OpenAI(model="gpt-3.5-turbo-instruct", streaming=True, callbacks=[handler]),
        memory=memory,
    )

    async def ask_question_async():
        asyncio.create_task(conversation.apredict(input=prompt))
        async for chunk in handler.aiter():
            yield f"data: {json.dumps({'content': chunk, 'tokens': 0})}\n\n"

    return StreamingResponse(ask_question_async(), media_type="text/plain")

@mbrialesam
Copy link

Trying the above, but want to include intermediate steps in terms of Tools usage, really difficult as I don't have a way to distinguish between final output and intermediate logs / thoughts...

@usersina
Copy link

Trying the above, but want to include intermediate steps in terms of Tools usage, really difficult as I don't have a way to distinguish between final output and intermediate logs / thoughts...

You might consider using the new LangChain expression language. I also recommend the new take on Agents from LangChain 0.1.0 omwards. You can do so much more know from streaming the output out of the box, to even stream the intermediary steps themselves.

I might follow with a project on GitHub (not in the near future though)

@alexgg278
Copy link

@usersina can you refer to the documentation explaining that you can directly stream the final output. I have only found the same stream method that streams intermediare steps and actions

@usersina
Copy link

usersina commented Mar 9, 2024

@usersina can you refer to the documentation explaining that you can directly stream the final output. I have only found the same stream method that streams intermediare steps and actions

This page has it all - https://python.langchain.com/docs/expression_language/streaming#using-stream-events

@samuelint
Copy link

I successfully resolved this issue using the langchain-openai-api-bridge library. You can find the library here: https://github.com/samuelint/langchain-openai-api-bridge

Key Benefits:

  • Streaming Support
  • Exposes Agent Final Message in an OpenAI-compatible API format.

Example Usage:
You can then consume the agent response with regular OpenAI Client library (python, javascript, or any other)

# Client
openai_client = OpenAI(
    base_url="http://my-langchain-server/my-custom-path/openai/v1",
)

chat_completion = openai_client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {
            "role": "user",
            "content": 'Say "This is a test"',
        }
    ],
)
print(chat_completion.choices[0].message.content)
#> "This is a test"

Hope this help :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.