-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: langchain stream not closing #201
Conversation
🦋 Changeset detectedLatest commit: 0a26a5a The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! Feel free to open an issue here for tracking the langchain discussion
Actually, I just remembered why @jaredpalmer made the change. Here's the initial issue: #63 It seems like we may want to let the consumer choose which callback to end on, as it's model dependent |
Yeah I figured that was the reasoning, thanks for confirming. This change will probably break It seems like your suggestion might be the best one. Or langchainjs can expose have something like |
Last one here before I create an issue: https://js.langchain.com/docs/production/callbacks/#multiple-handlers Before const { stream, handlers } = LangChainStream()
const llm = new ChatOpenAI({
streaming: true,
callbackManager: CallbackManager.fromHandlers(handlers)
})
llm
.call(
(messages as Message[]).map(m =>
m.role == 'user'
? new HumanChatMessage(m.content)
: new AIChatMessage(m.content)
)
)
.catch(console.error)
return new StreamingTextResponse(stream) After // Different handlers for llm, agents, chains
const { stream, llmHandlers, chainHandlers } = LangChainStream()
// Do not pass handlers here
const llm = new ChatOpenAI({
streaming: true,
})
// Pass handlers here instead
llm
.call(
(messages as Message[]).map((m) =>
m.role == "user"
? new HumanChatMessage(m.content)
: new AIChatMessage(m.content)
),
[llmHandlers]
)
.catch(console.error);
return new StreamingTextResponse(stream) |
Created #205 for further discussions This PR will probably break the linked usecase - can be closed or left until we find something. |
Big fan of |
@jaredpalmer that actually won't work after doing some research #205 As an exaggerated example, a chain can spawn a chain that spawns a chain, meaning that on the first chainEnd, the stream will close. You need to be able to count how many things are running otherwise only the first chain to finish will write to the stream, locking out the other chain results. I'm leaning towards #205 (comment) as the solution now. It should cover all use-cases, and keep the API the same. (Still need to update docs though to use on "Request callbacks" rather than "Constructor callbacks") |
Updated with my latest findings 👀 It may be oversimplified compared to what they have done with tracing but I think it covers all the use-cases. |
Take #205 (comment) as an example. Let's say we wanted to stream both the responses. in first chain
handleChainEnd gets called, stream is closed. Second chain can't write the second response
There's probably many ways to fix this, but the way I suggested in this PR seems to work for most use cases I've tried. |
const handleError = async (e: Error, runId: string) => { | ||
runs.delete(runId) | ||
await writer.ready | ||
await writer.abort(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to be careful with this one, in some chains that eg run multiple things in parallel there could be other events arriving after an error event, it doesn't look like you're handling those
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should it be handled? If there's an error, can it recover gracefully from it?
Right now this is definitely giving up on first error, but I'll be honest I haven't encountered a scenario where the error was recovered and it could continue the chain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it will overall be cancelled when the first error occurs. The issue is if it's a chain with multiple LLM calls in parallel you may still receive new tokens from call #2 after call #1 has failed. In which case currently what would happen is it would probably throw an exception in handleLLMNewToken as you try to write to an already aborted writer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I noticed that in #205 (comment).
Error in handler Handler, handleChainEnd: TypeError: The stream (in closed state) is not in the writable state and cannot be closed
I wonder what would be the best behavior, if to abort the call (ignoring the second one) or allow call 2
to keep writing to stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the exception of writing to a stream in closed state seems to be handled somewhere that doesn't seem to cause catastrophic errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes any exceptions inside a callback handler are caught by langchain to avoid an error in a callback handler interrupting a run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see your point.
How does this suggestion sound.
Keep an error state handler
let errors = []
Whenever we encounter an error on any handle*Error push it errors
Then handleEnd
const handleEnd = async (runId: string) => {
runs.delete(runId)
if (runs.size === 0) {
if (error.length !=0) {
// for loop over errors or join errors?
await writer.ready
await writer.abort(errors)
return;
}
await writer.ready
await writer.close()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or I can just re-throw the error similar to https://github.com/vercel-labs/ai/blob/main/packages/core/streams/ai-stream.ts#L152-L154
I think an issue I had yesterday working with this is that even though the stream had ended, the background processing had not stopped. This led me to have to restart dev server to clear out running processes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a fan of rethrowing the error
Are we good to merge @nfcampos @MaxLeiter ? |
looks good to me |
docs/pages/docs/guides/langchain.mdx
Outdated
@@ -34,6 +34,7 @@ export async function POST(req: Request) { | |||
? new HumanChatMessage(m.content) | |||
: new AIChatMessage(m.content) | |||
), | |||
[], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nfcampos I added this []
at all call sites in order to satisfy typescript. If these aren't necessary then, this may be an upstream issue with LC, likely missing an overload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be undefined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stylistic suggestion: y'all should have only a two arguments to .call
, passing undefined
is very odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep agreed, I've improved this here langchain-ai/langchainjs#1868 once that is merged you'll be able to do instead llm.call(messages, { callbacks: handlers })
Thank you 🙏 |
fixes: #97
The problem
Solution