先看一个 chain 的 stream 示例：

In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()

chain = prompt | llm | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)


content='' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content='Why' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' did' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' the' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' par' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content='rot' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' wear' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' a' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content=' rain' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-4900-b54a-d8eb2eb90372'|content='coat' additional_kwargs={} response_metadata={} id='run-df95c383-24bf-

再来看一个对 stream json 的例子。

langchain 里提到：the parser needs to operate on the input stream, and attempt to "auto-complete" the partial json into a valid state.

In [3]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOpenAI(model="gpt-4o-mini")
parser = JsonOutputParser()

chain = llm | parser

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 652}]}
{'countries': [{'name': 'France', 'population': 652735}]}
{'countries': [{'name': 'France', 'population': 65273511}]}
{'countries': [{'name': 'France', 'population': 65273511}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467547}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46754778}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46754778}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467

Any steps in the chain that operate on finalized inputs rather than on input streams can break streaming functionality via stream or astream.

也就是说中间的组件必须具备处理 input stream 的能力，否则就会打断 stream。

In [5]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOpenAI(model="gpt-4o-mini")
parser = JsonOutputParser()


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = llm | parser | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

所以后续给了一个修正的例子：

In [6]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOpenAI(model="gpt-4o-mini")
parser = JsonOutputParser()

async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""

    country_names_so_far = set()

    # 大模型里提到 input_stream 是一个异步迭代器
    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                # 注意 yield 的用法
                yield name
                country_names_so_far.add(name)


chain = llm | parser | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)

France|Spain|Japan|