Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental_StreamData dosen't work with LangChain Agents #483

Closed
matannahmani opened this issue Aug 19, 2023 · 1 comment
Closed

experimental_StreamData dosen't work with LangChain Agents #483

matannahmani opened this issue Aug 19, 2023 · 1 comment
Labels
help wanted Extra attention is needed

Comments

@matannahmani
Copy link

@MaxLeiter
I've added the experimental_StreamData to my langchain agent but after the chain finishes it infinitely throws errors in handlers, even thou the stream has completed successfully.
here is a video show casing the problem:
https://www.awesomescreenshot.com/video/20061948?key=608a4c716adf74479a38be80d37982b9

Current error that occurs after streamData.close():

Error: aborted
    at connResetException (node:internal/errors:711:14)
    at abortIncoming (node:_http_server:747:17)
    at socketOnClose (node:_http_server:741:3)
    at Socket.emit (node:events:525:35)
    at TCP.<anonymous> (node:net:313:12)
    at TCP.callbackTrampoline (node:internal/async_hooks:130:17) {
  code: 'ECONNRESET'
}
- error uncaughtException: Error: aborted
    at connResetException (node:internal/errors:711:14)
    at abortIncoming (node:_http_server:747:17)
    at socketOnClose (node:_http_server:741:3)
    at Socket.emit (node:events:525:35)
    at TCP.<anonymous> (node:net:313:12)
    at TCP.callbackTrampoline (node:internal/async_hooks:130:17) {
  digest: undefined
}
Error in handler Handler, handleLLMNewToken: undefined
Error in handler Handler, handleLLMNewToken: undefined
Error in handler Handler, handleLLMNewToken: undefined
... keeps spamming this message

Current AI SDK LangChainStream implementation:

  const streamData = new experimental_StreamData()
  const { stream, handlers } = LangChainStream({
    async onFinal(completion) {
      const title = json.messages[0].content.substring(0, 100)
      const id = json.id ?? nanoid()
      const createdAt = Date.now()
      const path = `/chat/${id}`
      const payload: Omit<Chat, 'createdAt'> & {
        createdAt: number
      } = {
        id,
        title,
        createdAt,
        path,
        messages: [
          ...messages,
          {
            content: completion,
            role: 'assistant'
          }
        ]
      }
      if (userId) payload.userId = userId
      else payload.uuid = uuid
      await kv.hmset(`chat:${id}`, payload)
      // if user is logged in add chat to their list of chats
      await kv.zadd(`${userPrefix}:chat:${userPrefixId}`, {
        score: createdAt,
        member: `chat:${id}`
      })
      streamData.close()
    },

    experimental_streamData: true
  })

Current Agent Implementation:

  const executor = await AgentCall(history, systemMessage)
  executor
    .call(
      {
        input: currentMessage?.content ?? ''
      },
      {
        callbacks: [
          handlers,
          {
            async handleToolStart(
              tool,
              input,
              runId,
              parentRunId,
              tags,
              metadata
            ) {
              // sending a message to the client that the tool has started
              const name =
                typeof metadata?.name === 'string' ? metadata.name : ''
                const messageAction: MessageAction = {
                  name,
                  id: parentRunId ?? runId,
                  loading: 'true'
                }
                streamData.append(messageAction)
              await handlers.handleLLMNewToken(`:action{#${messageAction.id}}`)
              handlers.handleToolStart(tool, input, runId)
            }
          },
          {
            async handleToolEnd(output, runId, parentRunId, tags) {
              // sending a message to the client that the tool has ended
              const messageAction: MessageAction = {
                id: parentRunId ?? runId,
                loading: 'false'
              }
              streamData.append(messageAction)
              handlers.handleToolEnd(output, runId)
            }
          }
        ]
      }
    )
    .catch(console.error).finally(() => {
      // console.log("Closing stream")
      // streamData.close()
    })

  return new StreamingTextResponse(stream, {}, streamData)
@matannahmani
Copy link
Author

@MaxLeiter
I found a temporary workaround for now.

first return response should be changed to:

  return new StreamingTextResponse(stream, {
    headers: {
      [COMPLEX_HEADER]: 'true'
    }
  })

second should create a wrapper for the LangChainStream :

function createStream(sourceStream: ReadableStream) {
  let controller: ReadableStreamDefaultController
  const stream = new ReadableStream({
    async start(ctrl) {
      const reader = sourceStream.getReader()
      controller = ctrl
      while (true) {
        const { done, value } = await reader.read()
        if (done) {
          // can push something at the end of the stream
          // controller.enqueue(new TextEncoder().encode('Stream Ended!'))
          controller.close()
          break
        }
        controller.enqueue(value)
      }
    }
  })

  return {
    stream,
    pushText(text: string) {
      controller?.enqueue(
        new TextEncoder().encode(getStreamString('text', text))
      )
    },
    pushData(data: Object) {
      controller?.enqueue(
        new TextEncoder().encode(
          getStreamString('data', JSON.stringify([data]))
        )
      )
    },
    close() {
      controller?.close()
    },
    error(e: string) {
      controller?.error(e)
    }
  }
}

then lastly you can use it as follows:

  const { stream: langChainStream, handlers } = LangChainStream({
    experimental_streamData: true // not sure if this is needed
  })
  const { stream, pushData, pushText } = createStream(langChainStream)
  pushText('') // to push text to the client without adding to the completion
  pushData({
    type: 'chat',
  }) // to push data to the client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants