# 如何流式传输

:::info 预备知识

本指南假定您熟悉以下概念：

- [聊天模型](/docs/concepts/chat_models)
- [LangChain 表达语言](/docs/concepts/lcel)
- [输出解析器](/docs/concepts/output_parsers)

:::

流式传输对于使基于大型语言模型（LLM）的应用程序对最终用户具有响应性至关重要。

像 LLM、解析器、提示词、检索器和代理这样的重要 LangChain 原语实现了 LangChain 的 Runnable 接口。

该接口提供了两种通用的流式传输方法：

- `.stream()`：流式传输的默认实现，用于从链中流式传输最终输出。
- `streamEvents()` 和 `streamLog()`：这些方法可以同时流式传输链中的中间步骤和最终输出。

让我们来看看这两种方法！

:::info
有关 LangChain 中流式传输技术的更高层次概述，请参阅[概念指南的这一部分](/docs/concepts/streaming)。
:::

# 使用 Stream

所有 `Runnable` 对象都实现了一个名为 stream 的方法。

这些方法旨在以分块的形式流式传输最终输出，一旦有可用的分块就立即生成该分块。

只有当程序中的所有步骤都知道如何处理**输入流**时，才能实现流式传输；也就是说，逐个处理输入分块，并生成相应的输出分块。

这种处理的复杂性可能各不相同，从像输出由 LLM 生成的 token 这样简单的任务，到像在完整 JSON 完成之前流式传输 JSON 部分这样更具挑战性的任务。

探索流式传输的最佳起点是 LLM 应用中最重要的组件之一 —— 模型本身！

## LLM 和聊天模型

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远慢于应用程序对最终用户具有响应性的**~200-300 毫秒**阈值。

让应用程序感觉更具响应性的关键策略是显示中间进度；例如，逐个 token 地流式传输模型的输出。

In [1]:
import "dotenv/config";

```{=mdx}
import ChatModelTabs from "@theme/ChatModelTabs";

<ChatModelTabs />
```

In [None]:
// @lc-docs-hide-cell
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "gpt-4o",
  temperature: 0,
});

In [3]:
const stream = await model.stream("Hello! Tell me about yourself.");
const chunks = [];
for await (const chunk of stream) {
  chunks.push(chunk);
  console.log(`${chunk.content}|`)
}

|
Hello|
!|
 I'm|
 a|
 large|
 language|
 model|
 developed|
 by|
 Open|
AI|
 called|
 GPT|
-|
4|
,|
 based|
 on|
 the|
 Gener|
ative|
 Pre|
-trained|
 Transformer|
 architecture|
.|
 I'm|
 designed|
 to|
 understand|
 and|
 generate|
 human|
-like|
 text|
 based|
 on|
 the|
 input|
 I|
 receive|
.|
 My|
 primary|
 function|
 is|
 to|
 assist|
 with|
 answering|
 questions|
,|
 providing|
 information|
,|
 and|
 engaging|
 in|
 various|
 types|
 of|
 conversations|
.|
 While|
 I|
 don't|
 have|
 personal|
 experiences|
 or|
 emotions|
,|
 I'm|
 trained|
 on|
 diverse|
 datasets|
 that|
 enable|
 me|
 to|
 provide|
 useful|
 and|
 relevant|
 information|
 across|
 a|
 wide|
 array|
 of|
 topics|
.|
 How|
 can|
 I|
 assist|
 you|
 today|
?|
|
|


让我们看一下其中一个原始数据块：

In [4]:
chunks[0]

AIMessageChunk {
  lc_serializable: true,
  lc_kwargs: {
    content: '',
    tool_call_chunks: [],
    additional_kwargs: {},
    id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
    tool_calls: [],
    invalid_tool_calls: [],
    response_metadata: {}
  },
  lc_namespace: [ 'langchain_core', 'messages' ],
  content: '',
  name: undefined,
  additional_kwargs: {},
  response_metadata: { prompt: 0, completion: 0, finish_reason: null },
  id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
  tool_calls: [],
  invalid_tool_calls: [],
  tool_call_chunks: [],
  usage_metadata: undefined
}


我们得到了一个名为 `AIMessageChunk` 的对象。这个 chunk 代表了 `AIMessage` 的一部分。

消息块在设计上是可累加的——可以简单地使用 `.concat()` 方法将它们相加，以获得到目前为止的响应状态！

In [5]:
let finalChunk = chunks[0];

for (const chunk of chunks.slice(1, 5)) {
  finalChunk = finalChunk.concat(chunk);
}

finalChunk

AIMessageChunk {
  lc_serializable: true,
  lc_kwargs: {
    content: "Hello! I'm a",
    additional_kwargs: {},
    response_metadata: { prompt: 0, completion: 0, finish_reason: null },
    tool_call_chunks: [],
    id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
    tool_calls: [],
    invalid_tool_calls: []
  },
  lc_namespace: [ 'langchain_core', 'messages' ],
  content: "Hello! I'm a",
  name: undefined,
  additional_kwargs: {},
  response_metadata: { prompt: 0, completion: 0, finish_reason: null },
  id: 'chatcmpl-9lO8YUEcX7rqaxxevelHBtl1GaWoo',
  tool_calls: [],
  invalid_tool_calls: [],
  tool_call_chunks: [],
  usage_metadata: undefined
}


## 链（Chains）

实际上，几乎所有LLM应用程序都包含多个步骤，而不仅仅是调用语言模型。

让我们使用`LangChain表达式语言`（`LCEL`）构建一个简单的链，将提示（prompt）、模型和解析器（parser）组合在一起，并验证流式传输是否正常工作。

我们将使用`StringOutputParser`来解析模型的输出。这是一个简单的解析器，可以从`AIMessageChunk`中提取内容字段，从而获取模型返回的`token`。

:::{.callout-tip}
LCEL是一种声明式方法，通过将不同的LangChain基本组件串联起来，以指定一个“程序”。使用LCEL创建的链可以自动实现流（stream）功能，从而允许流式传输最终输出。事实上，使用LCEL创建的链实现了完整的标准Runnable接口。
:::

In [6]:
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}");

const parser = new StringOutputParser();

const chain = prompt.pipe(model).pipe(parser);

const stream = await chain.stream({
  topic: "parrot",
});

for await (const chunk of stream) {
  console.log(`${chunk}|`)
}

|
Sure|
,|
 here's|
 a|
 joke|
 for|
 you|
:

|
Why|
 did|
 the|
 par|
rot|
 sit|
 on|
 the|
 stick|
?

|
Because|
 it|
 wanted|
 to|
 be|
 a|
 "|
pol|
ly|
-stick|
-al|
"|
 observer|
!|
|
|


:::{.callout-note}
您不必使用 `LangChain 表达语言` 来使用 LangChain，而是可以通过以标准的 **命令式** 编程方式，
单独调用每个组件的 `invoke`、`batch` 或 `stream` 方法，将结果赋值给变量，然后根据需要在后续流程中使用这些变量。

如果这种方式能满足您的需求，那对我们来说完全没问题 👌！
:::

### 处理输入流

如果您想在生成 JSON 数据的同时对其进行流式处理，该怎么办？

如果您依赖 `JSON.parse` 来解析部分 JSON 数据，解析会失败，因为部分 JSON 并不是有效的完整 JSON。

此时，您可能会完全不知道该如何处理，并认为 JSON 流式传输是不可能实现的。

其实，有一种方法可以做到这一点——解析器需要在 **输入流** 上进行操作，并尝试将部分 JSON "自动补全" 成为一个有效的状态。

让我们看看这样的解析器是如何工作的，以便理解其含义。

In [7]:
import { JsonOutputParser } from "@langchain/core/output_parsers"

const chain = model.pipe(new JsonOutputParser());
const stream = await chain.stream(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
  console.log(chunk);
}

{
  countries: [
    { name: 'France', population: 67390000 },
    { name: 'Spain', population: 47350000 },
    { name: 'Japan', population: 125800000 }
  ]
}


现在，让我们来**打破**流式传输。我们将使用前面的例子，并在末尾追加一个提取函数，从最终的 JSON 中提取国家名称。由于这个新的最后一步只是一个没有定义流式行为的函数调用，因此前面步骤的流式输出会被聚合，然后作为单个输入传递给该函数。

:::{.callout-warning}
链中任何对**最终输入**而不是对**输入流**进行操作的步骤，都可能通过 `stream` 打破流式传输功能。
:::

:::{.callout-tip}
稍后，我们将讨论 `streamEvents` API，它可以流式传输中间步骤的结果。即使链中包含仅对**最终输入**进行操作的步骤，该 API 仍能流式传输中间步骤的结果。
:::

In [9]:
// A function that operates on finalized inputs
// rather than on an input_stream

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
  if (!Array.isArray(inputs.countries)) {
    return "";
  }
  return JSON.stringify(inputs.countries.map((country) => country.name));
}

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
  `output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
  console.log(chunk);
}

["France","Spain","Japan"]


### 非流式组件

与上述示例类似，一些内置组件（如检索器）不提供任何流式传输功能。如果我们尝试对它们进行 `stream` 操作会发生什么？

In [10]:
import { OpenAIEmbeddings } from "@langchain/openai";
import { MemoryVectorStore } from "langchain/vectorstores/memory";
import { ChatPromptTemplate } from "@langchain/core/prompts";

const template = `Answer the question based only on the following context:
{context}

Question: {question}
`;
const prompt = ChatPromptTemplate.fromTemplate(template);

const vectorstore = await MemoryVectorStore.fromTexts(
  ["mitochondria is the powerhouse of the cell", "buildings are made of brick"],
  [{}, {}],
  new OpenAIEmbeddings(),
);

const retriever = vectorstore.asRetriever();

const chunks = [];

for await (const chunk of await retriever.stream("What is the powerhouse of the cell?")) {
  chunks.push(chunk);
}

console.log(chunks);


[
  [
    Document {
      pageContent: 'mitochondria is the powerhouse of the cell',
      metadata: {},
      id: undefined
    },
    Document {
      pageContent: 'buildings are made of brick',
      metadata: {},
      id: undefined
    }
  ]
]


流刚刚从该组件生成最终结果。

这没问题！并非所有组件都必须实现流式传输——在某些情况下，流式传输可能是不必要的、困难的，或者根本没有意义。

:::{.callout-tip}
由某些非流式组件构建的LCEL链在很多情况下仍然能够进行流式传输，链中最后一个非流式步骤之后将开始部分输出的流式传输。
:::

以下是一个示例：

In [11]:
import { RunnablePassthrough, RunnableSequence } from "@langchain/core/runnables";
import type { Document } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";

const formatDocs = (docs: Document[]) => {
  return docs.map((doc) => doc.pageContent).join("\n-----\n")
}

const retrievalChain = RunnableSequence.from([
  {
    context: retriever.pipe(formatDocs),
    question: new RunnablePassthrough()
  },
  prompt,
  model,
  new StringOutputParser(),
]);

const stream = await retrievalChain.stream("What is the powerhouse of the cell?");

for await (const chunk of stream) {
  console.log(`${chunk}|`);
}

|
M|
ito|
ch|
ond|
ria|
 is|
 the|
 powerhouse|
 of|
 the|
 cell|
.|
|
|


现在我们已经了解了 `stream` 方法的工作原理，让我们进入事件流的世界吧！

## 使用流事件

事件流是一个**测试版** API。该 API 可能会根据反馈进行一些调整。

:::{.callout-note}
在 @langchain/core **0.1.27** 中引入。
:::

为了让 `streamEvents` 方法正常工作：

* 任何自定义函数 / 可运行对象都必须传递回调
* 在模型上设置适当的参数以强制 LLM 流式传输 token。
* 如果有任何不符合预期的情况，请告诉我们！

### 事件参考

以下是一个参考表，展示了一些可运行对象可能发出的事件。

:::{.callout-note}
当正确实现流式传输时，在输入流完全被消费之前，可运行对象的输入通常是未知的。这意味着 `inputs` 通常只会包含在 `end` 事件中，而不是 `start` 事件中。
:::

| 事件                | 名称             | 数据块                           | 输入                                         | 输出                                          |
|----------------------|------------------|---------------------------------|-----------------------------------------------|-------------------------------------------------|
| on_llm_start         | [模型名称]       |                                 | {'input': 'hello'}                            |                                                 |
| on_llm_stream        | [模型名称]       | 'Hello' `或` AIMessageChunk(content="hello")  |                                               |                                   |
| on_llm_end           | [模型名称]       |                                 | 'Hello human!'                                | {"生成内容": [...], "模型输出": None, ...}  |
| on_chain_start       | format_docs      |                                 |                                               |                                                 |
| on_chain_stream      | format_docs      | "hello world!, goodbye world!"  |                                               |                                                 |
| on_chain_end         | format_docs      |                                 | [Document(...)]                               | "hello world!, goodbye world!"                  |
| on_tool_start        | some_tool        |                                 | {"x": 1, "y": "2"}                            |                                                 |
| on_tool_stream       | some_tool        | {"x": 1, "y": "2"}              |                                               |                                                 |
| on_tool_end          | some_tool        |                                 |                                               | {"x": 1, "y": "2"}                              |
| on_retriever_start   | [检索器名称]     |                                 | {"query": "hello"}                            |                                                 |
| on_retriever_chunk   | [检索器名称]     | {文档: [...]}                    |                                               |                                                 |
| on_retriever_end     | [检索器名称]     |                                 | {"query": "hello"}                            | {文档: [...]}                              |
| on_prompt_start      | [模板名称]       |                                 | {"question": "hello"}                         |                                                 |
| on_prompt_end        | [模板名称]       |                                 | {"question": "hello"}                         | ChatPromptValue(messages: [SystemMessage, ...]) |

`streamEvents` 在 `v2` 中还将发出分发的自定义事件。更多信息请参阅[此指南](/docs/how_to/callbacks_custom_events/)。

### 聊天模型

让我们首先看一下聊天模型产生的事件。

In [12]:
const events = [];

const eventStream = await model.streamEvents("hello", { version: "v2" });

for await (const event of eventStream) {
  events.push(event);
}

console.log(events.length)

25


:::{.callout-note}

嘿，API 中那个奇怪的 version="v2" 参数是什么？！ 😾

这是一个**测试版 API**，我们几乎肯定会对其进行一些更改。

这个版本参数将允许我们尽量减少对您代码的破坏性更改。

简而言之，我们现在打扰一下您，是为了以后不再打扰您。
:::

让我们看一下部分开始事件和部分结束事件。

In [13]:
events.slice(0, 3);

[
  {
    event: 'on_chat_model_start',
    data: { input: 'hello' },
    name: 'ChatOpenAI',
    tags: [],
    run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_tokens: undefined,
      ls_stop: undefined
    }
  },
  {
    event: 'on_chat_model_stream',
    data: { chunk: [AIMessageChunk] },
    run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
    name: 'ChatOpenAI',
    tags: [],
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_tokens: undefined,
      ls_stop: undefined
    }
  },
  {
    event: 'on_chat_model_stream',
    run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
    name: 'ChatOpenAI',
    tags: [],
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_toke

In [14]:
events.slice(-2);

[
  {
    event: 'on_chat_model_stream',
    run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
    name: 'ChatOpenAI',
    tags: [],
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_tokens: undefined,
      ls_stop: undefined
    },
    data: { chunk: [AIMessageChunk] }
  },
  {
    event: 'on_chat_model_end',
    data: { output: [AIMessageChunk] },
    run_id: 'c983e634-9f1d-4916-97d8-63c3a86102c2',
    name: 'ChatOpenAI',
    tags: [],
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_tokens: undefined,
      ls_stop: undefined
    }
  }
]


### 链式结构

让我们重新审视解析流式 JSON 的示例链，以探索流式事件 API。

In [15]:
const chain = model.pipe(new JsonOutputParser());
const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  { version: "v2" },
);


const events = [];
for await (const event of eventStream) {
  events.push(event);
}

console.log(events.length)

83


如果您查看前几个事件，您会注意到有**3**个不同的开始事件，而不是**2**个开始事件。

这三个开始事件分别对应于：

1. 链（模型 + 解析器）
2. 模型
3. 解析器

In [16]:
events.slice(0, 3);

[
  {
    event: 'on_chain_start',
    data: {
      input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
    },
    name: 'RunnableSequence',
    tags: [],
    run_id: '5dd960b8-4341-4401-8993-7d04d49fcc08',
    metadata: {}
  },
  {
    event: 'on_chat_model_start',
    data: { input: [Object] },
    name: 'ChatOpenAI',
    tags: [ 'seq:step:1' ],
    run_id: '5d2917b1-886a-47a1-807d-8a0ba4cb4f65',
    metadata: {
      ls_provider: 'openai',
      ls_model_name: 'gpt-4o',
      ls_model_type: 'chat',
      ls_temperature: 1,
      ls_max_tokens: undefined,
      ls_stop: undefined
    }
  },
  {
    event: 'on_parser_start',
    data: {},
    name: 'JsonOutputParser',
    tags: [ 'seq:step:2' ],
    run_id: '756c57d6-d455-484f-a556-79a82c4e1d40',
    metadata: {}
  }
]


如果你查看最后3个事件，你觉得会看到什么？中间的事件呢？

让我们使用这个API来输出模型和解析器的流事件。我们忽略开始事件、结束事件以及来自链的事件。

In [17]:
let eventCount = 0;

const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  { version: "v1" },
);

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 30) {
    continue;
  }
  const eventType = event.event;
  if (eventType === "on_llm_stream") {
    console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
  } else if (eventType === "on_parser_stream") {
    console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
  }
  eventCount += 1;
}

Chat model chunk: 
Chat model chunk: ```
Chat model chunk: json
Chat model chunk: 

Chat model chunk: {

Chat model chunk:    
Chat model chunk:  "
Chat model chunk: countries
Chat model chunk: ":
Chat model chunk:  [

Chat model chunk:        
Chat model chunk:  {

Chat model chunk:            
Chat model chunk:  "
Chat model chunk: name
Chat model chunk: ":
Chat model chunk:  "
Chat model chunk: France
Chat model chunk: ",

Chat model chunk:            
Chat model chunk:  "
Chat model chunk: population
Chat model chunk: ":
Chat model chunk:  
Chat model chunk: 652
Chat model chunk: 735
Chat model chunk: 11
Chat model chunk: 



由于模型和解析器都支持流式传输，我们可以实时看到来自这两个组件的流式事件！很整洁！🦜

### 过滤事件

由于此API会产生大量事件，因此能够按事件进行过滤是非常有用的。

你可以通过组件的`名称`、组件的`标签`或组件的`类型`进行过滤。

#### 按名称



In [18]:
const chain = model.withConfig({ runName: "model" })
  .pipe(
    new JsonOutputParser().withConfig({ runName: "my_parser" })
  );


const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  { version: "v2" },
  { includeNames: ["my_parser"] },
);

let eventCount = 0;

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 10) {
    continue;
  }
  console.log(event);
  eventCount += 1;
}

{
  event: 'on_parser_start',
  data: {
    input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
  },
  name: 'my_parser',
  tags: [ 'seq:step:2' ],
  run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
  metadata: {}
}
{
  event: 'on_parser_stream',
  run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
  name: 'my_parser',
  tags: [ 'seq:step:2' ],
  metadata: {},
  data: { chunk: { countries: [Array] } }
}
{
  event: 'on_parser_end',
  data: { output: { countries: [Array] } },
  run_id: '0a605976-a8f8-4259-8ef6-b3d7e52b3d4e',
  name: 'my_parser',
  tags: [ 'seq:step:2' ],
  metadata: {}
}


#### 按类型

In [19]:
const chain = model.withConfig({ runName: "model" })
  .pipe(
    new JsonOutputParser().withConfig({ runName: "my_parser" })
  );


const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  { version: "v2" },
  { includeTypes: ["chat_model"] },
);

let eventCount = 0;

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 10) {
    continue;
  }
  console.log(event);
  eventCount += 1;
}

{
  event: 'on_chat_model_start',
  data: {
    input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
  },
  name: 'model',
  tags: [ 'seq:step:1' ],
  run_id: 'fb6351eb-9537-445d-a1bd-24c2e11efd8e',
  metadata: {
    ls_provider: 'openai',
    ls_model_name: 'gpt-4o',
    ls_model_type: 'chat',
    ls_temperature: 1,
    ls_max_tokens: undefined,
    ls_stop: undefined
  }
}
{
  event: 'on_chat_model_stream',
  data: {
    chunk: AIMessageChunk {
      lc_serializable: true,
      lc_kwargs: [Object],
      lc_namespace: [Array],
      content: '',
      name: undefined,
      additional_kwargs: {},
      response_metadata: [Object],
      id: 'chatcmpl-9lO98p55iuqUNwx4GZ6j2BkDak6Rr',
      tool_calls: [],
      invalid_tool_calls: [],
      tool_call_chunks: [],
      usage_metadata: undefined
    }
  

#### 按标签

:::{.callout-caution}

标签会被给定可运行组件的子组件继承。

如果您使用标签进行过滤，请确保这是您想要的效果。
:::

In [20]:
const chain = model
  .pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
  .withConfig({ tags: ["my_chain"] });


const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  { version: "v2" },
  { includeTags: ["my_chain"] },
);

let eventCount = 0;

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 10) {
    continue;
  }
  console.log(event);
  eventCount += 1;
}

{
  event: 'on_chain_start',
  data: {
    input: 'Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"'
  },
  name: 'RunnableSequence',
  tags: [ 'my_chain' ],
  run_id: '1fed60d6-e0b7-4d5e-8ec7-cd7d3ee5c69f',
  metadata: {}
}
{
  event: 'on_chat_model_start',
  data: { input: { messages: [Array] } },
  name: 'ChatOpenAI',
  tags: [ 'seq:step:1', 'my_chain' ],
  run_id: 'ecb99d6e-ce03-445f-aadf-73e6cbbc52fe',
  metadata: {
    ls_provider: 'openai',
    ls_model_name: 'gpt-4o',
    ls_model_type: 'chat',
    ls_temperature: 1,
    ls_max_tokens: undefined,
    ls_stop: undefined
  }
}
{
  event: 'on_parser_start',
  data: {},
  name: 'my_parser',
  tags: [ 'seq:step:2', 'my_chain' ],
  run_id: 'caf24a1e-255c-4937-9f38-6e46275d854a',
  metadata: {}
}
{
  event: 'on_chat_model_stream',
  data: {
    chunk: AIMes

### 通过HTTP流式传输事件

为了方便起见，`streamEvents` 支持将流式中间事件编码为HTTP [服务器发送事件](https://developer.mozilla.org/zh-CN/docs/Web/API/Server-sent_events)，以字节形式进行编码。以下是其使用方式（使用 [`TextDecoder`](https://developer.mozilla.org/zh-CN/docs/Web/API/TextDecoder) 将二进制数据重新转换为可读字符串）：

In [21]:
const chain = model
  .pipe(new JsonOutputParser().withConfig({ runName: "my_parser" }))
  .withConfig({ tags: ["my_chain"] });


const eventStream = await chain.streamEvents(
  `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
  {
    version: "v2",
    encoding: "text/event-stream",
  },
);

let eventCount = 0;

const textDecoder = new TextDecoder();

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 3) {
    continue;
  }
  console.log(textDecoder.decode(event));
  eventCount += 1;
}

event: data
data: {"event":"on_chain_start","data":{"input":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\""},"name":"RunnableSequence","tags":["my_chain"],"run_id":"41cd92f8-9b8c-4365-8aa0-fda3abdae03d","metadata":{}}


event: data
data: {"event":"on_chat_model_start","data":{"input":{"messages":[[{"lc":1,"type":"constructor","id":["langchain_core","messages","HumanMessage"],"kwargs":{"content":"Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key \"name\" and \"population\"","additional_kwargs":{},"response_metadata":{}}}]]}},"name":"ChatOpenAI","tags":["seq:step:1","my_chain"],"run_id":"a6c2bc61-c868-4570-a143-164e64529ee0","metadata":{"ls_provi

这种格式的一个 nice 特性是，你可以将生成的流直接传递给带有正确头部的原生[HTTP响应对象](https://developer.mozilla.org/zh-CN/docs/Web/API/Response)（常见于像[Hono](https://hono.dev/)和[Next.js](https://nextjs.org/)这样的框架中使用），然后在前端解析该流。你的服务端处理程序看起来会像这样：

In [22]:
const handler = async () => {
  const eventStream = await chain.streamEvents(
    `Output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`,
    {
      version: "v2",
      encoding: "text/event-stream",
    },
  );
  return new Response(eventStream, {
    headers: {
      "content-type": "text/event-stream",
    }
  });
};

而你的前端可能看起来像这样（使用 [`@microsoft/fetch-event-source`](https://www.npmjs.com/package/@microsoft/fetch-event-source) 包来获取并解析事件源）：

In [23]:
import { fetchEventSource } from "@microsoft/fetch-event-source";

const makeChainRequest = async () => {
  await fetchEventSource("https://your_url_here", {
    method: "POST",
    body: JSON.stringify({
      foo: 'bar'
    }),
    onmessage: (message) => {
      if (message.event === "data") {
        console.log(message.data);
      }
    },
    onerror: (err) => {
      console.log(err);
    }
  });
};

### 非流式组件

还记得一些组件因为不操作**输入流**而无法很好地进行流式传输吗？

虽然这些组件在使用 `stream` 时可能会中断最终输出的流式传输，但 `streamEvents` 仍将从支持流式传输的中间步骤中产生流式事件！

In [34]:
// A function that operates on finalized inputs
// rather than on an input_stream
import { JsonOutputParser } from "@langchain/core/output_parsers"
import { RunnablePassthrough } from "@langchain/core/runnables";

// A function that does not operates on input streams and breaks streaming.
const extractCountryNames = (inputs: Record<string, any>) => {
  if (!Array.isArray(inputs.countries)) {
    return "";
  }
  return JSON.stringify(inputs.countries.map((country) => country.name));
}

const chain = model.pipe(new JsonOutputParser()).pipe(extractCountryNames);

const stream = await chain.stream(
  `output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key "name" and "population"`
);

for await (const chunk of stream) {
  console.log(chunk);
}

["France","Spain","Japan"]


正如预期的那样，`stream` API 无法正常工作，因为 `extractCountryNames` 不支持在流上操作。

现在，我们确认一下使用 `streamEvents` 时，是否仍然能看到模型和解析器的流式输出。

In [None]:
const eventStream = await chain.streamEvents(
  `output a list of the countries france, spain and japan and their populations in JSON format.
Use a dict with an outer key of "countries" which contains a list of countries.
Each country should have the key "name" and "population"
Your output should ONLY contain valid JSON data. Do not include any other text or content in your output.`,
  { version: "v2" },
);

let eventCount = 0;

for await (const event of eventStream) {
  // Truncate the output
  if (eventCount > 30) {
    continue;
  }
  const eventType = event.event;
  if (eventType === "on_chat_model_stream") {
    console.log(`Chat model chunk: ${event.data.chunk.message.content}`);
  } else if (eventType === "on_parser_stream") {
    console.log(`Parser chunk: ${JSON.stringify(event.data.chunk)}`);
  } else {
    console.log(eventType)
  }
  eventCount += 1;
}

## 相关

- [派发自定义事件](/docs/how_to/callbacks_custom_events)