In [2]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

In [3]:
import os

llm = ChatOpenAI(
    model = "DMXAPI-DeepSeek-V3-Fast",
    api_key = os.getenv("DMX_API_KEY"),
    base_url = os.getenv("DMX_BASE_URL")
)

In [4]:
prompt = PromptTemplate.from_template("10乘以{num}等于多少？")

In [5]:
chain = prompt | llm

In [6]:
chain.invoke({"num" : 10})


AIMessage(content='10乘以10等于100。  \n\n计算过程如下：  \n\\[ 10 \\times 10 = 100 \\]  \n\n这是一个基本的乘法运算，表示两个10相加的总和（即 \\(10 + 10 + 10 + \\ldots\\) 共10次）。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 56, 'prompt_tokens': 9, 'total_tokens': 65, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'DeepSeek-V3-Fast', 'system_fingerprint': None, 'id': 'chatcmpl-2c582c422da64a808e736a0150531a68', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--173e98da-09be-41b0-a0b1-f4cdd5123383-0', usage_metadata={'input_tokens': 9, 'output_tokens': 56, 'total_tokens': 65, 'input_token_details': {}, 'output_token_details': {}})

In [None]:
chain.invoke(8) # 如果模板中只有一个变量，也可以直接传递该变量的值

AIMessage(content='要计算 \\( 10 \\times 8 \\)，可以按照以下步骤进行：\n\n1. **理解乘法**：乘法是重复相加的简便方法。\\( 10 \\times 8 \\) 表示将 10 加 8 次，或者将 8 加 10 次。\n\n2. **计算**：\n   \\[\n   10 \\times 8 = 80\n   \\]\n\n3. **验证**：\n   - 将 10 加 8 次：\\( 10 + 10 + 10 + 10 + 10 + 10 + 10 + 10 = 80 \\)\n   - 或者将 8 加 10 次：\\( 8 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 8 = 80 \\)\n\n因此，最终答案为：\n\n\\[\n\\boxed{80}\n\\]', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 198, 'prompt_tokens': 9, 'total_tokens': 207, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'DeepSeek-V3-Fast', 'system_fingerprint': None, 'id': 'chatcmpl-e36f0b8c99ea4922baee5d47952eddf1', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--af98cb30-4c7b-4f16-9979-bccf993bd8b1-0', usage_metadata={'input_tokens': 9, 'output_tokens': 198, 'total_tokens': 207, 'input_token_details': {}, 'output_token_details': {}})

RunnablePassthrough 是一个 runnable 类型的对象，具有以下特征：

1. 基本操作

- 执行一个简单的传递函数，将输入值直接传递给输出

- 可以独立使用 invoke() 方法执行

2. 用例

- "在需要通过链式步骤传递数据且无需修改时非常方便"

- 可与其他组件结合，构建复杂的数据管道

- 特别有助于在添加新字段时保持原始输入的完整性

3. 输入管理

- 接受字典类型输入

- 也可以处理单个值
 
- 在整个链条中保持数据结构的完整性

In [8]:
from langchain_core.runnables import RunnablePassthrough

RunnablePassthrough().invoke({"num" : 100})

{'num': 100}

In [None]:
# 使用 RunnablePassthrough 创建链
runnable_chain = {"num" :RunnablePassthrough()} | prompt | llm

In [10]:
runnable_chain.invoke(5)

AIMessage(content='10乘以5的计算过程如下：\n\n\\[ 10 \\times 5 = 50 \\]\n\n因此，**10乘以5等于50**。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 30, 'prompt_tokens': 9, 'total_tokens': 39, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'DeepSeek-V3-Fast', 'system_fingerprint': None, 'id': 'chatcmpl-5a4ff8ca42564031b6530ee9f14bc232', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--1e619270-047f-4a54-8021-383079f9c563-0', usage_metadata={'input_tokens': 9, 'output_tokens': 30, 'total_tokens': 39, 'input_token_details': {}, 'output_token_details': {}})

RunnablePassthrough.assign()

将输入中的键/值对与分配的新键/值对进行合并。

1. 输入数据: {"num": 1}

2. lambda函数处理:

函数接收字典 x = {"num": 1}

计算 x["num"] * 3 = 1 * 3 = 3

3. 字段添加:

使用 assign() 方法将新字段 "new_num": 3 添加到原始数据中

4. 最终结果:

{'num': 1, 'new_num': 3}

In [12]:
RunnablePassthrough.assign(new_num = lambda x : x["num"] * 3).invoke({"num" : 1})

{'num': 1, 'new_num': 3}


使用RunnableParallel高效并行执行

" RunnableParallel 是一个用于同时执行多个 Runnable 对象的工具，旨在简化需要并行处理的流程。它将输入数据分配到不同组件中，收集各组件的结果，并将这些结果整合为统一输出。这一功能使其成为优化那些任务可独立且并行执行的工作流程的强大工具。"

1. 并行处理

同时执行多个 Runnable 对象，减少并行化任务所需的时间。

2. 统一输出管理系统

将所有并行执行的结果整合成一个统一的输出，从而简化后续处理流程。

3. 灵活

可处理多种输入类型，高效分配工作负载，从而支持复杂工作流程。

In [17]:
from langchain_core.runnables import RunnableParallel

runnableparallel = RunnableParallel(
    default = RunnablePassthrough(),
    extend = RunnablePassthrough().assign(mult = lambda x : x["num"] * 3),
    modified = lambda x : x["num"] + 1
)

runnableparallel.invoke({"num" : 1})

{'default': {'num': 1}, 'extend': {'num': 1, 'mult': 3}, 'modified': 2}

链条同样适用于RunnableParallel

In [19]:
chain_1 = {"country" : RunnablePassthrough()} | PromptTemplate.from_template("{country}首都是哪里？") | llm

chain_2 = {"country" : RunnablePassthrough()} | PromptTemplate.from_template("{country}面积多大？") | llm

parallel_combined = RunnableParallel(capital = chain_1,area = chain_2)

parallel_combined.invoke({"country" : "中国"})

{'capital': AIMessage(content='中国的首都是**北京**。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 13, 'total_tokens': 21, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'DeepSeek-V3-Fast', 'system_fingerprint': None, 'id': 'chatcmpl-22250ed0fc034b4fb325dbb252d91b36', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--c6f7aa75-690c-4efa-809a-6b8b3bc182f9-0', usage_metadata={'input_tokens': 13, 'output_tokens': 8, 'total_tokens': 21, 'input_token_details': {}, 'output_token_details': {}}),
 'area': AIMessage(content='中国的国土面积约为 **960万平方公里**，是世界第三大国家（仅次于俄罗斯和加拿大）。这一数据包括陆地面积和内陆水域（如湖泊、河流），但不包括领海和专属经济区。  \n\n若包含领海和争议区域（如台湾地区、南海诸岛等），中国的总面积可达到约 **1,040万平方公里**（不同统计口径可能略有差异）。  \n\n需要说明的是，台湾地区是中国领土不可分割的一部分，中国政府对其拥有主权。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 100, 'prompt_tokens': 12, 'total_tokens': 112, 'completion_tokens_deta

动态处理 RunnableLambda 函数

" RunnableLambda 是一个灵活的工具，能让开发人员利用 lambda 函数定义自定义数据转换逻辑。它通过实现快速简便的自定义处理工作流程，简化了定制数据管道的创建，同时确保了最小的设置开销。"

1. 可定制数据转换功能

允许用户通过 lambda 函数自定义转换输入数据的逻辑，具有极高的灵活性。

2. 轻巧且简便

提供了一种简便的方法来实现临时处理，无需定义大量的类或函数。

In [None]:
from datetime import datetime
# 必须定义一个参数，即使这个参数用不到，这是langchain框架的要求
def get_today(a):
    return datetime.today().strftime("%b-%d")



In [28]:
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser

prompt_lambda = PromptTemplate.from_template("给我{n}个在{today}出生的历史名人，要写出出生日期。")

chain_lambda = ({"today" : RunnableLambda(get_today),"n" : RunnablePassthrough()}) | prompt_lambda | llm | StrOutputParser()

chain_lambda.invoke({"n" : 3})

"以下是3位于8月2日出生的历史名人及其出生日期：  \n\n1. **皮埃尔·德·顾拜旦（Pierre de Coubertin）**（1863年8月2日）  \n   - 法国教育家、历史学家，现代奥林匹克运动会的创始人，被誉为“奥林匹克之父”。  \n\n2. **詹姆斯·鲍德温（James Baldwin）**（1924年8月2日）  \n   - 美国著名作家、社会活动家，代表作《向苍天呼吁》（*Go Tell It on the Mountain*），关注种族、性别和社会正义问题。  \n\n3. **彼得·奥图尔（Peter O'Toole）**（1932年8月2日）  \n   - 爱尔兰裔英国演员，因在《阿拉伯的劳伦斯》（*Lawrence of Arabia*）中的出色表演而闻名，多次获得奥斯卡提名。  \n\n如果需要更多名人或补充信息，请告诉我！"