# 如何运行自定义函数

:::info 先决条件

本指南假定您熟悉以下概念：
- [LangChain Expression Language (LCEL)](/docs/concepts/lcel)
- [链接 runnables](/docs/how_to/sequence/)

:::

您可以将任意函数用作 [Runnables](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable)。这对于格式化或当您需要 LangChain 组件未提供的功能时非常有用，而用作 Runnables 的自定义函数被称为 [`RunnableLambdas`](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableLambda.html)。

请注意，传递给这些函数的所有输入都必须是单个参数。如果您有一个接受多个参数的函数，则应编写一个包装器，该包装器接受单个字典输入并将其解包为多个参数。

本指南将涵盖：

- 如何使用 `RunnableLambda` 构造函数和便捷的 `@chain` 装饰器显式地从自定义函数创建 runnable
- 在链中使用时将自定义函数强制转换为 runnables
- 如何在自定义函数中接受和使用运行元数据
- 如何通过让自定义函数返回生成器来实现流式处理

## 使用构造函数

下面，我们使用 `RunnableLambda` 构造函数显式地包装我们的自定义逻辑：

In [None]:
%pip install -qU langchain langchain_openai

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass()

In [2]:
from operator import itemgetter

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI


def length_function(text):
    return len(text)


def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)


def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])


model = ChatOpenAI()

prompt = ChatPromptTemplate.from_template("what is {a} + {b}")

chain = (
    {
        "a": itemgetter("foo") | RunnableLambda(length_function),
        "b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt
    | model
)

chain.invoke({"foo": "bar", "bar": "gah"})

AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')

## `@chain` 装饰器的便捷用法

你也可以通过添加 `@chain` 装饰器将任意函数转换为可链式调用。这在功能上等同于以上所示的用 `RunnableLambda` 构造函数包装函数。下面是一个例子：

In [3]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain

prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")


@chain
def custom_chain(text):
    prompt_val1 = prompt1.invoke({"topic": text})
    output1 = ChatOpenAI().invoke(prompt_val1)
    parsed_output1 = StrOutputParser().invoke(output1)
    chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
    return chain2.invoke({"joke": parsed_output1})


custom_chain.invoke("bears")

'The subject of the joke is the bear and his girlfriend.'

上面，我们使用 `@chain` 装饰器将 `custom_chain` 转换为一个可运行对象，并通过 `.invoke()` 方法调用它。

如果你正在使用 [LangSmith](https://docs.smith.langchain.com/) 进行追踪，你应该能在其中看到一个 `custom_chain` 追踪记录，其中嵌套着对 OpenAI 的调用。

## 链中的自动类型强制转换

在链中使用自定义函数并结合管道操作符 (`|`) 时，你可以省略 `RunnableLambda` 或 `@chain` 构造函数，并依赖类型强制转换。下面是一个简单示例，展示了一个函数如何接收模型的输出并返回其前五个字母：

In [4]:
prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")

model = ChatOpenAI()

chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])

chain_with_coerced_function.invoke({"topic": "bears"})

'Once '

请注意，我们无需将自定义函数 `(lambda x: x.content[:5])` 包装在 `RunnableLambda` 构造函数中，因为管道运算符左侧的 `model` 本身就是 `Runnable`。自定义函数会被**强制转换为**一个 runnable。有关更多信息，请参阅 [此部分](/docs/how_to/sequence/#coercion)。

## 传递运行元数据

Runnable lambdas 可以选择接受一个 [RunnableConfig](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.config.RunnableConfig.html#langchain_core.runnables.config.RunnableConfig) 参数，它们可以使用该参数将回调、标签和其他配置信息传递给嵌套的运行。

In [5]:
import json

from langchain_core.runnables import RunnableConfig


def parse_or_fix(text: str, config: RunnableConfig):
    fixing_chain = (
        ChatPromptTemplate.from_template(
            "Fix the following text:\n\n```text\n{input}\n```\nError: {error}"
            " Don't narrate, just respond with the fixed data."
        )
        | model
        | StrOutputParser()
    )
    for _ in range(3):
        try:
            return json.loads(text)
        except Exception as e:
            text = fixing_chain.invoke({"input": text, "error": e}, config)
    return "Failed to parse"


from langchain_community.callbacks import get_openai_callback

with get_openai_callback() as cb:
    output = RunnableLambda(parse_or_fix).invoke(
        "{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
    )
    print(output)
    print(cb)

{'foo': 'bar'}
Tokens Used: 62
	Prompt Tokens: 56
	Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05


In [6]:
from langchain_community.callbacks import get_openai_callback

with get_openai_callback() as cb:
    output = RunnableLambda(parse_or_fix).invoke(
        "{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
    )
    print(output)
    print(cb)

{'foo': 'bar'}
Tokens Used: 62
	Prompt Tokens: 56
	Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05


## 流式处理

:::note
[RunnableLambda](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableLambda.html) 最适合不需要支持流式处理的代码。如果您需要支持流式处理（即能够处理输入块并产生输出块），请改用 [RunnableGenerator](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.RunnableGenerator.html)，如下例所示。
:::

您可以在链中使用生成器函数（即使用 `yield` 关键字的函数，其行为类似迭代器）。

这些生成器的签名应该是 `Iterator[Input] -> Iterator[Output]`。或者对于异步生成器：`AsyncIterator[Input] -> AsyncIterator[Output]`。

这些功能很有用：
- 实现自定义输出解析器
- 修改上一步的输出，同时保留流式处理能力

以下是一个逗号分隔列表的自定义输出解析器示例。首先，我们创建一个生成此类列表作为文本的链：

In [7]:
from typing import Iterator, List

prompt = ChatPromptTemplate.from_template(
    "Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers"
)

str_chain = prompt | model | StrOutputParser()

for chunk in str_chain.stream({"animal": "bear"}):
    print(chunk, end="", flush=True)

lion, tiger, wolf, gorilla, panda

接下来，我们定义一个自定义函数，它将聚合当前流式输出，并在模型生成列表中的下一个逗号时将其产生：

In [8]:
# This is a custom parser that splits an iterator of llm tokens
# into a list of strings separated by commas
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # hold partial input until we get a comma
    buffer = ""
    for chunk in input:
        # add current chunk to buffer
        buffer += chunk
        # while there are commas in the buffer
        while "," in buffer:
            # split buffer on comma
            comma_index = buffer.index(",")
            # yield everything before the comma
            yield [buffer[:comma_index].strip()]
            # save the rest for the next iteration
            buffer = buffer[comma_index + 1 :]
    # yield the last chunk
    yield [buffer.strip()]


list_chain = str_chain | split_into_list

for chunk in list_chain.stream({"animal": "bear"}):
    print(chunk, flush=True)

['lion']
['tiger']
['wolf']
['gorilla']
['raccoon']


调用它会得到一个完整的数值数组：

In [9]:
list_chain.invoke({"animal": "bear"})

['lion', 'tiger', 'wolf', 'gorilla', 'raccoon']

## 异步版本

如果你在 `async` 环境中工作，这里有一个上面示例的 `async` 版本：

In [10]:
from typing import AsyncIterator


async def asplit_into_list(
    input: AsyncIterator[str],
) -> AsyncIterator[List[str]]:  # async def
    buffer = ""
    async for (
        chunk
    ) in input:  # `input` is a `async_generator` object, so use `async for`
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    yield [buffer.strip()]


list_chain = str_chain | asplit_into_list

async for chunk in list_chain.astream({"animal": "bear"}):
    print(chunk, flush=True)

['lion']
['tiger']
['wolf']
['gorilla']
['panda']


In [11]:
await list_chain.ainvoke({"animal": "bear"})

['lion', 'tiger', 'wolf', 'gorilla', 'panda']

## 后续步骤

您已经了解了在链中使用自定义逻辑和实现流送的几种不同方法。

欲了解更多信息，请参阅本节中有关可运行对象的其他指南。