Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming support with ConversationChain #11945

Open
4 of 14 tasks
marcocello opened this issue Oct 17, 2023 · 12 comments
Open
4 of 14 tasks

Streaming support with ConversationChain #11945

marcocello opened this issue Oct 17, 2023 · 12 comments
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature Ɑ: models Related to LLMs or chat model modules

Comments

@marcocello
Copy link

System Info

langchain 0.0.316
langserve 0.0.10
python 3.11.4 on darwin

Who can help?

@agola11 @hwchase17

Information

  • The official example notebooks/scripts
  • My own modified scripts

Related Components

  • LLMs/Chat Models
  • Embedding Models
  • Prompts / Prompt Templates / Prompt Selectors
  • Output Parsers
  • Document Loaders
  • Vector Stores / Retrievers
  • Memory
  • Agents / Agent Executors
  • Tools / Toolkits
  • Chains
  • Callbacks/Tracing
  • Async

Reproduction

Langserve example here (https://github.com/langchain-ai/langserve-launch-example/blob/main/langserve_launch_example/chain.py) in which I want to use ConversationChain instead of ChatOpenAI.

server.py

#!/usr/bin/env python
"""A server for the chain above."""

from fastapi import FastAPI
from langserve import add_routes

from chain import conversation_chain

app = FastAPI(title="My App")

add_routes(app, conversation_chain)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8000)

chain.py

from langchain.llms import OpenAI`
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory

template = """Assistant is a large language model trained by OpenAI.

Assistant is designed to be able to assist with a wide range of tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics. As a language model, Assistant is able to generate human-like text based on the input it receives, allowing it to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand.

{history}
Human: {input}
Assistant:"""

prompt = PromptTemplate(input_variables=["history", "input"], template=template)

conversation_chain = ConversationChain(
    llm=OpenAI(temperature=0),
    prompt=prompt,
    verbose=True,
    memory=ConversationBufferMemory(),
)

Expected behavior

I was expecting to have a streaming response as the ChatOpenAI behavior. It seems to me that ConversationChain doesn't support streaming response.

@dosubot dosubot bot added Ɑ: models Related to LLMs or chat model modules 🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature labels Oct 17, 2023
@dosubot
Copy link

dosubot bot commented Oct 17, 2023

🤖

Hello,

Based on the information you've provided and the context from the LangChain repository, it appears that the ConversationChain class does support streaming responses. This is evident from the _generate method in the ConversationChain class. The method checks if it should stream responses based on the stream parameter or the self.streaming attribute of the class. If streaming is required, it calls the _stream method and uses the _generate_from_stream function to generate responses.

Here is the relevant code:

def _generate(
    self,
    messages: List[BaseMessage],
    stop: Optional[List[str]] = None,
    run_manager: Optional[CallbackManagerForLLMRun] = None,
    stream: Optional[bool] = None,
    **kwargs: Any,
) -> ChatResult:
    should_stream = stream if stream is not None else self.streaming
    if should_stream:
        stream_iter = self._stream(
            messages, stop=stop, run_manager=run_manager, **kwargs
        )
        return _generate_from_stream(stream_iter)
    message_dicts, params = self._create_message_dicts(messages, stop)
    params = {**params, **kwargs}
    response = self.completion_with_retry(
        messages=message_dicts, run_manager=run_manager, **params
    )
    return self._create_chat_result(response)

In this code, should_stream is determined by the stream parameter if it is not None, otherwise it falls back to the self.streaming attribute of the class. If should_stream is True, it calls the _stream method and then uses the _generate_from_stream function to generate responses.

However, it seems that there might be some confusion about how to enable streaming responses in the ConversationChain class. Based on similar issues in the LangChain repository, you might need to set verbose=False when you instantiate your ConversationChain. Here is an example:

conversation_chain = ConversationChain(
    llm=OpenAI(temperature=0),
    prompt=prompt,
    verbose=False,
    memory=ConversationBufferMemory(),
)

Please try this and let me know if it works for you. If you still encounter issues, could you please provide more details about the problem? Specifically, it would be helpful to know more about the behavior you're observing and any error messages you're seeing.

Best,
Dosu

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@marcocello
Copy link
Author

Hello, I tried your suggestion but only got the final answer, not streaming.

@dmahlow
Copy link

dmahlow commented Nov 10, 2023

Are there any updates on this? Is ConversationChain not being able to be streamed from a bug or as expected? If the former, what's a good way of fixing this?

@deuscapturus
Copy link

deuscapturus commented Nov 15, 2023

None of the chains have implemented a stream method. I'm investigating to see why this is so.

UPDATE: While investigating I came to the opinion that langchain is a overly complicated mess. I've migrated my application to llama-index which is much simpler.

@seanmavley
Copy link
Contributor

seanmavley commented Dec 19, 2023

@deuscapturus I am seriously also considering switching. I see potential with langchain and langserve, however, both are waaay too early to be used for anything serious.

It should be stable hopefully in a year or two's time.

I've looked into Haystack, and now Llama Index, you mentioned. There's also Flowise.

I've sunk too much time into Langchain I'd wanna be careful which step I take next

Edit:
Gotta say the learning curve hasn't been the easiest for me, however, I'm gradually getting the hang of things. Also, i needed to remind myself that LangChain is literally python, which means I'm not bound to "think" LangChain all the time, I can just drop down to python the way I'm familiar with, and continue.

@ttessarolo
Copy link

if you use "streamLog" instead, and you parse the response it works...

@cfa532
Copy link

cfa532 commented Feb 13, 2024

Here is my piece of code that actually makes Conversation chain streams. @deuscapturus is right that Langchain is a mess, even though which is much less messier that a year ago.

`class MyStreamingHandler(StreamingStdOutCallbackHandler):
def init(self) -> None:
super().init()

def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
    sys.stdout.write(token)
    sys.stdout.flush()
    #time.sleep(1)

CHAT_LLM = ChatOpenAI(temperature=0, model="gpt-4", max_tokens=2048, streaming=True,
callbacks=[MyStreamingHandler()]) # ChatOpenAI cannot have max_token=-1
chain = ConversationChain(llm=CHAT_LLM, memory=ConversationBufferWindowMemory(k=6), verbose=False)
chain.output_parser = StrOutputParser()`

@cfa532
Copy link

cfa532 commented Feb 13, 2024

Here is my piece of code that actually makes Conversation chain streams. @deuscapturus is right that Langchain is a mess, even though which is much less messier that a year ago.

`class MyStreamingHandler(StreamingStdOutCallbackHandler): def init(self) -> None: super().init()

def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
    sys.stdout.write(token)
    sys.stdout.flush()
    #time.sleep(1)

CHAT_LLM = ChatOpenAI(temperature=0, model="gpt-4", max_tokens=2048, streaming=True, callbacks=[MyStreamingHandler()]) # ChatOpenAI cannot have max_token=-1 chain = ConversationChain(llm=CHAT_LLM, memory=ConversationBufferWindowMemory(k=6), verbose=False) chain.output_parser = StrOutputParser()`

However I am pulling my hair for a day trying to get the following code works. It does not print anything. According to some comments, it suppose to be the right way of streaming, instead of overriding the on_llm_new_token().

for chunk in chain.stream(query):
    print(chunk, end="", flush=True)
return

@ttessarolo
Copy link

ttessarolo commented Feb 14, 2024

Here is my current solution to stream results to client using SSE.
First a generator function:

import EventEmitter, { on } from "node:events";

async function* chat(input, sessionId) {
  let id = 0;
  const { chain } = getConversationalChain(sessionId);  // get from cache
  const emitter = new EventEmitter();
  chain.call({
    input,
    callbacks: [
      {
        handleLLMNewToken(data) {
          emitter.emit("data", { data, id: id++, event: "text" });
        },
        handleLLMEnd(data) {
          emitter.emit("data", { event: "end" });
        },
      },
    ],
  });

  for await (const [data] of on(emitter, "data")) {
    if (data.event === "end") return;
    yield data;
  }
}

and then I use that function to SSE to client:

import Fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";

fastify.get("/chat", async function (req, res) {
  const prompt = req.query.prompt;
  const sessionId = req.query.sessionId;
  res.sse(chat(prompt, sessionId));
});

@kevin12314
Copy link

Here is my piece of code that actually makes Conversation chain streams. @deuscapturus is right that Langchain is a mess, even though which is much less messier that a year ago.

`class MyStreamingHandler(StreamingStdOutCallbackHandler): def init(self) -> None: super().init()

def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
    sys.stdout.write(token)
    sys.stdout.flush()
    #time.sleep(1)

CHAT_LLM = ChatOpenAI(temperature=0, model="gpt-4", max_tokens=2048, streaming=True, callbacks=[MyStreamingHandler()]) # ChatOpenAI cannot have max_token=-1 chain = ConversationChain(llm=CHAT_LLM, memory=ConversationBufferWindowMemory(k=6), verbose=False) chain.output_parser = StrOutputParser()`

I used this code and made some adjustments, finally successfully making the conversation chain streams. Here is my code.

class MyStreamingHandler(StreamingStdOutCallbackHandler):
    def init(self) -> None:
        super().init()
    
    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        sys.stdout.write(token)
        sys.stdout.flush()
        #time.sleep(1)

async def process_chunks(qustion:str):
    final_result = None
    async for chunk in chain.astream(qustion):
        final_result=chunk
        print('')
        print(final_result)

CHAT_LLM = ChatOpenAI(openai_api_key="",temperature=0, model="gpt-4", max_tokens=2048, streaming=True,
    callbacks=[MyStreamingHandler()]) # ChatOpenAI cannot have max_token=-1

chain = ConversationChain(llm=CHAT_LLM, memory=ConversationBufferWindowMemory(k=6), verbose=False)
asyncio.run(process_chunks("hello. tell me something about yourself"))

@rhighs
Copy link

rhighs commented May 6, 2024

I suspect the issue underlies in how a transformer is used to save generated stuff in a buffer, I don't really use a ConversationChain rather i have implemented a simple chain that adds a RunnableLambda at the end to save stuff in memory. I didn't check how ConversationChain is implemented under the hood to handle memory state, but I wouldn't be surprised if it used a RunnableLambda at the chain end.

def setup_chain(
    model="gpt-3.5-turbo",
) -> RunnableSerializable:
    memory = ConversationBufferWindowMemory(return_messages=True, k=5)

    def save_into_mem(input_output):
        input = input_output
        message = input_output.pop("output")
        if type(message) is AIMessageChunk or type(message) is AIMessage:
            memory.save_context(input, {"output": message.content})
        elif type(message) is str:
            memory.save_context(input, {"output": message})
        return message

    chain = (
        RunnablePassthrough.assign(
            history=RunnableLambda(
                lambda x: memory.load_memory_variables(x).get("history", [])
            ),
        )
        | ChatPromptTemplate.from_messages(
            [
                ("system", SYSTEM_MESSAGE),
                MessagesPlaceholder(variable_name="history"),
                ("user", "{input}"),
            ]
        )
        | ChatOpenAI(model=model, temperature=0, streaming=True)
    )
    chain = RunnablePassthrough.assign(output=chain) | RunnableLambda(save_into_mem)
    return chain

I had a look at how _transform for RunnableLambda is implemented and interestingly i found that all incoming input to a runnable lambda seem to be buffered into a single chunk before being processed:

# By definitions, RunnableLambdas consume all input before emitting output.
Since _transform pretty much forms the base of creating chunks for a stream, this could be the reason this stuff doesn't work as one would expect. Also, documentation doesn't really state anything about: "# By definitions, RunnableLambdas consume all input before emitting output" which is rather confusing IMHO, I mean I see a sense in there but there must be a way to opt out of this behavior if one needs a streamed output at the chain end. I'm not too experienced with langchain so I really have no idea how to work this around without using LLM callbacks + asyncio.Queue or some sort of producer-consume stuff.

NOTE: Removing RunnableLambda(save_into_mem) will produce the expected streaming behavior

@rhighs
Copy link

rhighs commented May 7, 2024

I hate doing this, but SIGH here I got a workaround that enables streaming an lllm response whilst saving stuff into a memory buffer. I came up with this after looking at how RunnableLambda is implemented and basically overriding its _transform method to let input chunks pass through the stream and then call the callback with the collected input, which is the place where you should update memory state.

This sucks, but it does the job and until someone has enough patience to work on this and comes out with a better solution for langchain itself you can use this:

class RunnableCollector(RunnableLambda):
    def _transform(
        self,
        input: Iterator[Input],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> Iterator[Output]:
        final: Input
        got_first_val = False
        for ichunk in input:
            yield cast(Output, ichunk)

            if not got_first_val:
                final = ichunk
                got_first_val = True
            else:
                try:
                    final = final + ichunk  # type: ignore[operator]
                except TypeError:
                    final = ichunk

        call_func_with_variable_args(
            self.func, cast(Input, final), config, run_manager, **kwargs
        )


class RunnableFilter(RunnableLambda):
    def __init__(self, filter: Callable[[Input], bool], **kwargs) -> None:
        super().__init__(func=lambda _: None, **kwargs)
        self.filter = filter

    def _transform(
        self,
        input: Iterator[Input],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> Iterator[Output]:
        for ichunk in input:
            if self.filter(ichunk):
                yield ichunk


class RunnableMap(RunnableLambda):
    def __init__(self, mapping: Callable[[Input], Output], **kwargs) -> None:
        super().__init__(func=lambda _: None, **kwargs)
        self.mapping = mapping

    def _transform(
        self,
        input: Iterator[Input],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> Iterator[Output]:
        for ichunk in input:
            yield self.mapping(ichunk)


def setup_chain(
    model="gpt-3.5-turbo",
) -> RunnableSerializable:
    memory = ConversationBufferMemory(memory_key="history", return_messages=True)

    def save_into_mem(input_output: Dict[str, Any]):
        message = input_output.pop("output")
        memory.save_context(input_output, {"output": message.content})

    chain = (
        RunnablePassthrough.assign(
            history=RunnableLambda(memory.load_memory_variables)
            | itemgetter("history")
        )
        | ChatPromptTemplate.from_messages(
            [
                ("system", SYSTEM_MESSAGE),
                MessagesPlaceholder(variable_name="history"),
                ("user", "{input}"),
            ]
        )
        | ChatOpenAI(
            model=model,
            temperature=0,
            streaming=True,
        )
    )

    chain = (
        RunnablePassthrough.assign(
            output=chain,
        )
        | RunnableFilter(
            filter=lambda chunk: type(chunk) is AddableDict
            and "output" in chunk
            or "input" in chunk
        )
        | RunnableCollector(save_into_mem)
        | RunnableFilter(
            filter=lambda chunk: type(chunk) is AddableDict
            and "output" in chunk
            and not "input" in "chunk"
        )
        | RunnableMap(mapping=lambda chunk: cast(Output, chunk["output"]))
    )

    return chain

NOTE: this is really focused only on streaming output consumption, I haven't tested anything else nor async transforms which shouldn't differ too much in behavior...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:bug Related to a bug, vulnerability, unexpected error with an existing feature Ɑ: models Related to LLMs or chat model modules
Projects
None yet
Development

No branches or pull requests

8 participants