## Runnable Interface 介绍与使用

为了尽可能简化创建自定义链的过程，Langchain 实现了一个 **[Runnable](https://api.python.langchain.com/en/stable/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable)** 协议。

许多 LangChain 组件都实现了 Runnable 协议，包括 chat models, LLMs, output parsers, retrievers, prompt templates等等。此外，还有几个用于处理可运行对象的[有用原语](https://python.langchain.com/v0.1/docs/expression_language/primitives/)。

Runnable 是一个标准接口，包括：

- stream：流式返回生成内容（chunk）
- invoke：对输入调用该链
- batch：对输入列表调用该链

不同组件的输入和输出类型有所差异:

| 组件    | 输入类型                                           | 输出类型           |
| ------------ | ----------------------------------------------------- | --------------------- |
| Prompt       | Dictionary                                            | PromptValue           |
| ChatModel    | Single string, list of chat messages or a PromptValue | ChatMessage           |
| LLM          | Single string, list of chat messages or a PromptValue | String                |
| OutputParser | The output of an LLM or ChatModel                     | Depends on the parser |
| Retriever    | Single string                                         | List of Documents     |
| Tool         | Single string or dictionary, depending on the tool    | Depends on the tool   |


所有 Runnable 对象都显式描述输入和输出 Schema，以检查输入和输出格式：

- input_schema：从Runnable的结构自动生成的输入Pydantic模型
- output_schema：从Runnable的结构自动生成的输出Pydantic模型 


### Input Schema

为了演示如何使用，下面我们创建一个超级简单的PromptTemplate + ChatModel链。

In [2]:
from langchain_openai import ChatOpenAI, AzureChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import os

openai_enable = True
if os.getenv('AZURE_OPENAI_ENDPOINT'):
    print('Azure mode')
    openai_enable = False
else:
    print('OpenAI mode')    


if openai_enable:
    model = ChatOpenAI(model="gpt-4o-mini")
else:
    model = AzureChatOpenAI(
        azure_deployment="checkGPT35_16K",
        api_version="2024-05-01-preview",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2
    )
prompt = ChatPromptTemplate.from_template("讲个关于 {topic} 的笑话吧")
chain = prompt | model

Azure mode


#### schema 方法

一个描述 Runnable 接受的输入的说明。这是根据任何 Runnable 结构动态生成的 Pydantic 模型。您可以调用 .schema() 来获取 `JSONSchema` 表示。

In [3]:
# 查看 Chain 的输入类型
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [4]:
# 查看 Prompt 的输入类型（Chain的输入从 Prompt 开始，因此输入类型一致）
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [5]:
# 查看 Model 的输入类型
model.input_schema.schema()

{'title': 'AzureChatOpenAIInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'ToolCall': {'title': 'ToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'object'},
    'i

### Output Schema

输出类型仍然可以调用 .schema() 来获取其 `JSONSchema` 表示。

In [6]:
# 查看 Chain 的输出类型
chain.output_schema.schema()

{'title': 'AzureChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/AIMessage'},
  {'$ref': '#/definitions/HumanMessage'},
  {'$ref': '#/definitions/ChatMessage'},
  {'$ref': '#/definitions/SystemMessage'},
  {'$ref': '#/definitions/FunctionMessage'},
  {'$ref': '#/definitions/ToolMessage'}],
 'definitions': {'ToolCall': {'title': 'ToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'object'},
    'id': {'title': 'Id', 'type': 'string'}},
   'required': ['name', 'args', 'id']},
  'InvalidToolCall': {'title': 'InvalidToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'string'},
    'id': {'title': 'Id', 'type': 'string'},
    'error': {'title': 'Error', 'type': 'string'}},
   'required': ['name', 'args', 'id', 'error']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'Message from an AI.',
   'type': 'object',
   'p

### Stream

使用 .stream() 方法查看（同步）流式输出结果

In [7]:
for s in chain.stream({"topic": "程序员"}):
    print(s.content, end="", flush=True)

当程序员去超市买牛奶时，他们会选择最新鲜的牛奶。为了确保牛奶的新鲜度，他们会检查牛奶上的生产日期。

有一天，一个程序员拿起一瓶牛奶，看到上面写着：“生产日期：2022年01月01日”。

他立刻放下牛奶，走到超市工作人员面前，抱怨道：“这瓶牛奶已经过期了！”

工作人员看了看牛奶，然后笑着说：“先生，今天是2021年12月31日。”

程序员尴尬地笑了笑，然后说：“哦，对不起，我忘记了我是在计算机世界里。”

### Invoke
使用 .invoke() 方法单次（同步）调用

In [8]:
chain.invoke({"topic": "程序员"})

AIMessage(content='当程序员去超市买牛奶时，他们会选择最新鲜的牛奶。为了确保牛奶的新鲜度，他们会检查牛奶上的生产日期。\n\n有一天，一个程序员拿起一瓶牛奶，看到上面写着：“生产日期：2022年01月01日”。\n\n他立刻放下牛奶，走到超市工作人员面前，抱怨道：“这瓶牛奶已经过期了！”\n\n工作人员看了看牛奶，然后笑着说：“先生，今天是2021年12月31日。”\n\n程序员尴尬地笑了笑，然后说：“哦，对不起，我忘记了我是在计算机世界里。”', response_metadata={'token_usage': {'completion_tokens': 213, 'prompt_tokens': 22, 'total_tokens': 235}, 'model_name': 'gpt-35-turbo-16k', 'system_fingerprint': None, 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}, id='run-9154e665-740e-4015-a5f8-5af5b06e5dbe-0')

### Batch
使用 .batch() 方法（同步）批量调用

In [9]:
chain.batch([{"topic": "程序员"}, {"topic": "产品经理"}, {"topic": "测试经理"}])

[AIMessage(content='当程序员去超市买牛奶时，他们会选择最新鲜的牛奶。为了确保牛奶的新鲜度，他们会检查牛奶上的生产日期。\n\n有一天，一个程序员拿起一瓶牛奶，看到上面写着：“生产日期：2022年01月01日”。\n\n他立刻放下牛奶，走到超市工作人员面前，抱怨道：“这瓶牛奶已经过期了！”\n\n工作人员看了看牛奶，然后笑着对程序员说：“先生，今天是2021年12月31日。”', response_metadata={'token_usage': {'completion_tokens': 178, 'prompt_tokens': 22, 'total_tokens': 200}, 'model_name': 'gpt-35-turbo-16k', 'system_fingerprint': None, 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}, id='run-30b6d47b-d92f-4f14-9ae1-b6dde7dd54ca-0'),
 AIMessage(content='好的，这是一个关于产品经理的笑话：\n\n有一天，一个产品经理走进一家餐厅，点了一份牛排。他等了很久，终于上来了一盘烤面包。\n\n产品经理很生气地问：“这是什么？我点的是牛排啊！”\n\n服务员尴尬地回答：“对不起先生，我们的厨师是个产品经理，他觉得烤面包比牛排更好。”\n\n产品经理愤怒地说：“那你们为什么还要雇佣一个产品经理做厨师？”\n\n服务员耸耸肩说：“我们的老板是个产品经理，他觉得这样能提高效率

In [12]:
messages = chain.batch([{"topic": "程序员"}, {"topic": "产品经理"}, {"topic": "测试经理"}])

In [13]:
# 使用 StrOutputParser 来处理 Batch 批量输出
from langchain_core.output_parsers import StrOutputParser

output_parser = StrOutputParser()

for idx, m in enumerate(messages):
    print(f"笑话{idx}:\n")
    print(output_parser.invoke(m))
    print("\n")

笑话0:

当程序员去超市买牛奶时，他们会选择最新鲜的牛奶。为了确保牛奶的新鲜度，他们会检查牛奶上的生产日期。

有一天，一个程序员拿起一瓶牛奶，看到上面写着：“生产日期：2022年01月01日”。

他立刻放下牛奶，走到超市工作人员面前，抱怨道：“这瓶牛奶已经过期了！”

工作人员看了看牛奶，然后笑着说：“先生，今天是2021年12月31日。”

程序员尴尬地笑了笑，然后说：“哦，对不起，我忘记了我是在计算机世界里。”


笑话1:

好的，这是一个关于产品经理的笑话：

有一天，一个产品经理走进一家餐厅，点了一份牛排。他等了很久，终于上来了一盘烤面包。

产品经理很生气地问：“这是什么？我点的是牛排啊！”

服务员尴尬地回答：“对不起先生，我们的厨师是个产品经理，他觉得牛排不够用户友好，所以改成了烤面包。”

产品经理无奈地叹了口气：“这就是为什么产品经理不能做厨师的原因。”


笑话2:

好的，这是一个关于测试经理的笑话：

有一天，一位测试经理走进了一家餐厅。他点了一份牛排，并告诉服务员：“请确保这份牛排完全煮熟，我是一个测试经理，对细节非常敏感。”

服务员点了点头，然后把订单交给了厨师。不久后，厨师将一份完美煮熟的牛排端到了测试经理的桌子上。

测试经理拿起刀叉，准备享用美味的牛排。然而，他突然停下来，仔细地观察了一会儿。

服务员看到了这一幕，好奇地问道：“怎么了？牛排有问题吗？”

测试经理微笑着回答：“不，牛排煮得非常完美。我只是在测试一下刀叉的质量。”




## 异步操作

这些方法也有相应的异步方法，应与 `asyncio` 的 `await` 语法一起使用以进行并发操作：

- astream：异步地流式返回生成内容（chunk）
- ainvoke：异步地对输入调用该链
- abatch：异步地对输入列表调用该链
- astream_log: 在发生时会返回中间步骤，并且最终返回结果之外。
- astream_events: beta 流式传输事件，在 langchain-core 0.1.14 中引入


### Async Stream

In [11]:
async for s in chain.astream({"topic": "程序员"}):
    print(s.content, end="", flush=True)

当然可以！这是一个关于程序员的笑话：

为什么程序员总是喜欢在海边工作？

因为那里有很多“海量数据”！  

希望你喜欢这个笑话！如果还想听更多，随时告诉我！

### Async Invoke

In [12]:
await chain.ainvoke({"topic": "程序员"})

AIMessage(content='当然可以！这是一个关于程序员的笑话：\n\n为什么程序员总是喜欢在海边工作？\n\n因为他们总是想在“代码”中找到“海洋”！ \n\n（“海洋”在这里代表“大量的数据”）希望你喜欢这个笑话！', response_metadata={'token_usage': {'completion_tokens': 60, 'prompt_tokens': 18, 'total_tokens': 78}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_f33667828e', 'finish_reason': 'stop', 'logprobs': None}, id='run-b84ea289-a442-48d6-9319-969535f23133-0')

### Async Batch

In [13]:
await chain.abatch([{"topic": "程序员"}, {"topic": "产品经理"}, {"topic": "测试经理"}])

[AIMessage(content='当然可以！这是一个关于程序员的笑话：\n\n有一天，程序员去买水果。他走到水果摊前，看见老板在称西瓜。\n\n程序员问：“这个西瓜多少钱？”\n\n老板回答：“每个10块。”\n\n程序员思考了一下，问：“那如果我买两个，能不能给我打个折？”\n\n老板笑了笑：“当然可以，两个20块！”\n\n程序员愣了一下，接着说：“我明白了，您是只支持‘零和’交易的！”\n\n老板一脸懵：“什么是‘零和’交易？”\n\n程序员微微一笑：“就是我买两个，您就没得赚了！”\n\n希望这个笑话能让你开心！', response_metadata={'token_usage': {'completion_tokens': 156, 'prompt_tokens': 18, 'total_tokens': 174}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_f33667828e', 'finish_reason': 'stop', 'logprobs': None}, id='run-b05d03a7-de24-40cb-b127-bcc7557a9bcc-0'),
 AIMessage(content='当然可以！这是一个关于产品经理的笑话：\n\n有一天，一个产品经理走进一家咖啡店，点了一杯咖啡。店员问他：“要加糖吗？” \n\n产品经理回答：“请把这个选项放到用户调查中再决定。” \n\n店员无奈地说：“好吧，那我给你一杯无糖的咖啡。” \n\n产品经理愣了一下：“等等，这样用户就没有选择了！” \n\n店员笑着说：“没关系，反正他们也喝不到！” \n\n希望这个笑话能让你开心！', response_metadata={'token_usage': {'completion_tokens': 127, 'prompt_tokens': 16, 'total_tokens': 143}, 'model_name': 'gpt-4o-mini', 'system_fingerprint': 'fp_f33667828e', 'finish_reason': 'stop', 'logprobs': None}, id='run-d10cb1d9-8a7f-4e91-92b6-1