# 自定义流式生成器函数

您可以在 LCEL 管道中使用生成器函数（即使用 yield 关键字且行为类似于迭代器的函数）。

这些生成器的签名应该是 Iterator[Input] -> Iterator[Output] 。或者对于异步生成器： AsyncIterator[Input] -> AsyncIterator[Output] 。

这些对于： - 实现自定义输出解析器 - 修改上一步的输出，同时保留流功能

让我们为逗号分隔列表实现一个自定义输出解析器。

In [1]:
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

api_key = os.getenv("LOACL_API_KEY")
api_base = os.getenv("LOACL_API_BASE")

model = ChatOpenAI(api_key=api_key, base_url=api_base, temperature=0.3)

In [3]:
from typing import Iterator, List

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


prompt = ChatPromptTemplate.from_template(
    "响应以csv的格式返回中文列表，不要返回其他内容。请给出与{topic}类似的交通工具。"
)

str_chain = prompt | model | StrOutputParser()

str_chain.invoke({"topic": "飞机"})

'```csv\n交通工具\n火车\n汽车\n地铁\n公交车\n船\n电动滑板车\n自行车\n直升机\n高铁\n摩托车\n火箭\n热气球\n缆车\n飞机航天器\n滑翔机\n气垫船\n```'

In [4]:
for chunk in str_chain.stream({"topic": "飞机"}):
    print(chunk, end="", flush=True)

```
交通工具
高铁
火车
汽车
地铁
公交车
船
轮船
直升机
飞艇
电动滑板车
摩托车
自行车
滑雪板
滑板
热气球
马车
滑翔伞
火箭
```

In [7]:
# 这是一个自定义解析器，用于拆分llm令牌的迭代器
# 放入以逗号分隔的字符串列表中
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # 保留部分输入，直到得到逗号
    buffer = ""
    for chunk in input:
        # 将当前块添加到缓冲区
        buffer += chunk
        # 当缓冲区有逗号时
        while "," in buffer:
            # 以逗号分隔缓冲区
            comma_index = buffer.index(",")
            # 产生逗号之间的所有内容
            yield [buffer[:comma_index].strip()]
            # 保存其余部分，以供下一次迭代使用
            buffer = buffer[comma_index + 1:]
    # 产生最后一个块
    yield [buffer.strip()]

In [8]:
list_chain = str_chain | split_into_list

for chunk in list_chain.stream({"topic": "飞机"}):
    print(chunk, end="", flush=True)

['高铁']['汽车']['公交车']['船只']['自行车']

## 异步版本

In [12]:
from typing import AsyncIterator


async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
    buffer = ""
    async for chunk in input:  # input是一个async对象，所以使用async for
        buffer += chunk 
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1:]
    yield [buffer.strip()]


alist_chain = str_chain | asplit_into_list

In [15]:
async for chunk in alist_chain.astream({"topic": "飞机"}):
    print(chunk, end="", flush=True)

['```\n交通工具\n直升机\n火车\n船\n汽车\n地铁\n公共汽车\n摩托车\n高铁\n热气球\n滑翔机\n飞艇\n弹跳车\n单轨列车\n电动车\n自行车\n马车\n```']

In [14]:
await alist_chain.ainvoke({"topic": "飞机"})

['```csv\n交通工具\n直升机\n火车\n公交车\n地铁\n轮船\n汽车\n轻轨\n滑翔机\n热气球\n摩托车\n```']