# 链的高级使用

In [42]:
import os
from dotenv import load_dotenv

# 加载 .env 文件中的环境变量
load_dotenv(override=True)  # 使用 override=True 确保加载最新的 .env 数据

True

In [3]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)

## 使用@chain修饰符快速将函数变为链

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import chain

# 创建第一个提示词模板：请求关于特定主题的笑话
prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}.")
# 创建第二个提示词模板：询问笑话的主题时什么
prompt2 = ChatPromptTemplate.from_template("What is the topic of this joke: {joke}?")


# 使用@chain装饰器定义一个自定义链
@chain
def custom_chain(text):
    # 步骤1：将输入文本填充到第一个提示词模板中
    prompt_val1 = prompt1.invoke({"topic": text})
    # 步骤2：将第一个提示词模板的输出传递给语言模型
    output1 = llm.invoke(prompt_val1)
    # 步骤3：将语言模型的输出进行解析为字符串
    parsed_output1 = StrOutputParser().invoke(output1)

    # 步骤4：创建第二个处理链，用于分析笑话主题
    # 这个链将提示词模板、模型和字符串解析器串联起来
    chain2 = prompt2 | llm | StrOutputParser()

    # 步骤5：将第一个链的输出传递给第二个链
    output2 = chain2.invoke({"joke": parsed_output1})
    return output2


# 调用自定义链，输入主题"bears"(熊)
# 整个过程：
# 1. 先生成一个关于熊的笑话
# 2. 然后分析这个笑话的主题
# 3. 返回分析结果
custom_chain.invoke("bears")


'The topic of this joke is a **pun** that plays on the double meaning of the words "bare" and "bear."  \n\n- **Literal meaning**: Bears don\'t wear shoes, so their feet are uncovered (*bare feet*).  \n- **Pun twist**: Since they\'re bears, their feet are also *bear feet*.  \n\nThe humor comes from the homophones (words that sound the same but have different meanings) "bare" and "bear." The joke is lighthearted and centers on wordplay related to bears. 🐻😄'

## 在链中使用lambda函数

In [4]:
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda

# 定义函数


def length_function(text):
    return len(text)


def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)


def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])


# 创建一个简单的聊天提示模板，询问a和b的和
prompt = ChatPromptTemplate.from_template("What is the sum of {a} and {b}?")

# 构建一个复杂的处理链
chain = (
    {
        # 处理"a"参数：
        # 1. 从输入字典中提取“foo"键的值
        # 2. 将提取的值传递给length_function函数，计算其长度
        "a": itemgetter("foo") | RunnableLambda(length_function),
        # 处理"b"参数：
        # 1. 创建一个包含两个键值对的字典：
        #   - "text1": 从输入字典中提取“foo"键的值
        #   - "text2": 从输入字典中提取“bar"键的值
        # 2. 将这个新字典传递给multiple_length_function函数
        "b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt  # 将处理后的"a"和"b"传递给提示模板
    | llm  # 将提示模板的输出传递给语言模型
)

# 调用链处理流程，输入一个包含"foo"和"bar"键的字典
chain.invoke({"foo": "hello", "bar": "world!"})


AIMessage(content='To find the sum of **5** and **30**, follow these simple steps:\n\n1. **Identify the numbers to be added:**\n   \n   \\[\n   5 \\quad \\text{and} \\quad 30\n   \\]\n\n2. **Add the numbers together:**\n   \n   \\[\n   5 + 30\n   \\]\n\n3. **Calculate the sum:**\n   \n   \\[\n   5 + 30 = 35\n   \\]\n\n**Final Answer:**\n\n\\[\n\\boxed{35}\n\\]', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 109, 'prompt_tokens': 14, 'total_tokens': 123, 'completion_tokens_details': {'accepted_prediction_tokens': None, 'audio_tokens': None, 'reasoning_tokens': 0, 'rejected_prediction_tokens': None}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'deepseek-v3-250324', 'system_fingerprint': None, 'id': '021757570940697076ea4837f74e966daea642e85370365fa4f81', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--4a19aa82-5266-4dd4-a6e0-cf21c6e90eed-0', usage_metadata={'input_tokens

## 在链中自定义支持流输出的函数
+ 当链被使用stream或astream调用的时候
+ 如果在链中增加自定义函数

In [9]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 创建一个聊天提示模板，要求生成5个与给定动物相似的动物名称，以逗号分隔
prompt = ChatPromptTemplate.from_template(
    "请列出5个与一下动物相似的动物名称，用逗号分隔：{animal}。不要包含数字"
)

# 创建一个处理链：提示词模板 -> 语言模型 -> 字符串解析器
str_chain = prompt | llm | StrOutputParser()

# 流式输出结果，输入为“熊”
for chunk in str_chain.stream({"animal": "熊"}):
    print(chunk, end="", flush=True)


棕熊,北极熊,熊猫,马来熊,眼镜熊

### 增加自定义函数
+ 聚合当前流传输的输出
+ 在生产下一个逗号的时候组合
+ 注意：使用yield

In [8]:
# 定义自定义解析器，将LLM输出的标记迭代器
# 按逗号分隔转换为字符串列表
from typing import Iterator, List


def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # 保存部分输入直到遇到逗号
    buffer = ""
    for chunk in input:
        # 将当前块添加到缓冲区
        buffer += chunk
        # 当缓存区中有逗号时
        while "," in buffer:
            # 分割缓存区，逗号前的部分作为一个完整的输出
            comma_index = buffer.index(",")
            # 输出逗号之前的所有内容
            yield [buffer[:comma_index].strip()]
            # 保存剩余部分用于下一次迭代
            buffer = buffer[comma_index + 1 :]

    # 输出最后剩余的部分（如果有的话）
    yield [buffer.strip()]


list_chain = str_chain | split_into_list

for chunk in list_chain.stream({"animal": "熊"}):
    print(chunk, end="", flush=True)

['棕熊']['北极熊']['熊猫']['马来熊']['眼镜熊']

### yield 与 return 区别
+ return函数立即计算并返回所有结果，而yield函数按需计算结果
+ return函数返回一个数据结构（如列表），yield函数返回一个生成器对象
+ yield函数可以处理潜在的无限序列，而return函数必须在有限时间内完成
+ 生成器对象是一次性的，遍历完后就被消耗完毕，而return返回的数据结构可以重复使用
+ yield 特别适合处理大数据集或流式数据，因为它不需要一次性将所有数据加载到内容中

In [None]:
# 使用return
def get_squares_return(n):
    """返回包含0到n-1的平方的列表"""
    result = []
    for i in range(n):
        result.append(i * i)
    return result  # 一次性返回所有结果


# 使用 return 函数
squares = get_squares_return(5)
print("使用 return 函数:", squares)  # 输出: [0, 1, 4, 9, 16]
print("返回类型：", type(squares))  # 输出: <class 'list'>

# 遍历结果
for num in squares:
    print("遍历结果:", num)

使用 return 函数: [0, 1, 4, 9, 16]
类型： <class 'list'>
遍历结果: 0
遍历结果: 1
遍历结果: 4
遍历结果: 9
遍历结果: 16


In [12]:
# 使用 yield
def get_squares_yield(n):
    """生成包含0到n-1的平方的值"""
    for i in range(n):
        yield i * i  # 按需生成结果


# 使用 yield 函数
squares_gen = get_squares_yield(5)
print(
    "使用 yield 函数:", squares_gen
)  # 输出: <generator object get_squares_yield at 0x...>
print("返回类型：", type(squares_gen))  # 输出: <class 'generator'>

# 遍历结果
for num in squares_gen:
    print("遍历结果:", num)

# 再次遍历生成器
print("再次遍历")
for num in squares_gen:
    print("再次遍历结果:", num)  # 不会输出任何内容，因为生成器已经被消耗完毕

使用 yield 函数: <generator object get_squares_yield at 0x7c128db31cb0>
返回类型： <class 'generator'>
遍历结果: 0
遍历结果: 1
遍历结果: 4
遍历结果: 9
遍历结果: 16
再次遍历


## 使用RunnablePassthrough来传递值

In [None]:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 创建一个可以并行运行的处理流程
runnable = RunnableParallel(
    passed=RunnablePassthrough(),  # 第一个处理器：直接传入输入，不做修改
    modified=lambda x: x["num"] + 1,  # 第二个处理器：取出输入中的“num”值并+1
)

# 执行这个处理流程，输入时一个包含“num”字段的字典
runnable.invoke({"num": 1})


{'passed': {'num': 1}, 'modified': 2}

## LCEL支持在运行时候对链进行配置
+ 动态改写模型的温度
+ 动态切换提示词

### 动态改写模型温度

In [16]:
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
).configurable_fields(
    temperature=ConfigurableField(
        id="llm_temperature",
        name="LLM Temperature",
        description="Temperature for response randomness",
    )
)

# 未修改temperature
response1 = llm.invoke("随意挑选一个随机数，输出为一个整数")
print(response1.content)

# 动态修改temperature
response2 = llm.with_config(configurable={"llm_temperature": 0.9}).invoke(
    "随意挑选一个随机数，输出为一个整数"
)
print(response2.content)

好的，我将随机挑选一个整数。以下是结果：

**随机整数：** 42
好的，以下是一个随机挑选的整数：

**42**  

（*注：虽然这个数字看起来像是“生命、宇宙以及任何事情的终极答案”，但它确实是随机生成的！如果需要其他随机数，可以告诉我范围或数量哦~*）


### 链的提示词动态切换

In [18]:
from langchain.runnables.hub import HubRunnable

prompt = HubRunnable("rlm/rag-prompt").configurable_fields(
    owner_repo_commit=ConfigurableField(
        id="hub_commit",
        name="Hub Commit",
        description="The Hub commit to pull from",
    )
)

# 未切换提示词
prompt_value1 = prompt.invoke({"question": "foo", "context": "bar"})
print(prompt_value1)

# 动态切换提示词
prompt_value2 = prompt.with_config(
    configurable={"hub_commit": "rlm/rag-prompt-llama"}
).invoke({"question": "foo", "context": "bar"})
print(prompt_value2)

messages=[HumanMessage(content="You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.\nQuestion: foo \nContext: bar \nAnswer:", additional_kwargs={}, response_metadata={})]
messages=[HumanMessage(content="[INST]<<SYS>> You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.<</SYS>> \nQuestion: foo \nContext: bar \nAnswer: [/INST]", additional_kwargs={}, response_metadata={})]


## 为链添加记忆能力
+ 注意：简单的链的记忆添加可以使用v0.2方式，复杂的记忆官方推荐使用LangGraph
+ 短时记忆：InMemoryHistory
+ 长时记忆：RunnableWithMessageHistory


### 短时记忆：InMemoryHistory

In [24]:
from typing import List
from pydantic import BaseModel, Field
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage


class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    """内存中实现的聊天消息历史记录."""

    messages: List[BaseMessage] = Field(
        default_factory=list
    )  # 使用空列表作为默认值存储消息

    def add_message(self, message: List[BaseMessage]) -> None:
        """添加一组消息到存储中."""
        self.messages.append(message)

    def clear(self) -> None:
        """Clear the history."""
        self.messages = []


# 这里我们使用全局遍历来存储聊天消息历史
# 这样可以更容易地检查它以查看底层结果
store = {}  # 创建空字典用于存储不同会话的历史记录


def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
    """
    根据会话ID获取历史记录.
    """
    if session_id not in store:
        store[session_id] = InMemoryHistory()  # 为新会话创建新的历史记录对象
    return store[session_id]


# 获取会话ID为“1”的历史记录
history = get_by_session_id("1")
# 添加一条AI消息到历史记录
history.add_message(AIMessage(content="你好"))
# 打印存储的所有历史记录
print(store)

{'1': InMemoryHistory(messages=[AIMessage(content='你好', additional_kwargs={}, response_metadata={})])}


### 在链中增加短期记忆

In [25]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)

# 创建聊天提示模板，包含系统提示，历史记录和用户问题
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "你是一个擅长{ability}的助手",
        ),  # 系统角色提示，使用ability遍历定义助手专长
        MessagesPlaceholder(variable_name="history"),  # 放置历史消息的占位符
        ("human", "{input}"),  # 用户问题的占位符
    ]
)

# 将提示词模板和大模型连接成一个链
chain = prompt | llm

# 创建带有消息历史功能的可运行链
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_by_session_id,  # 获取历史记录的函数
    input_messages_key="input",  # 用户输入的键名
    history_messages_key="history",  # 历史消息的键名
)

# 首次调用链，询问余弦的含义
response = chain_with_history.invoke(
    {"ability": "数学", "input": "余弦的含义是什么？"},
    config={
        "configurable": {
            "session_id": "1",  # 配置会话ID
        }
    },
)
print(response)

# 打印存储中的历史记录
print(store)

content='余弦（cosine）是三角函数中的一种基本函数，通常表示为 \\(\\cos \\theta\\)，其中 \\(\\theta\\) 是一个角度。余弦函数的定义和含义可以从多个角度理解：\n\n### 1. **直角三角形中的定义**\n在直角三角形中，余弦表示**邻边与斜边的比值**：\n\\[\n\\cos \\theta = \\frac{\\text{邻边}}{\\text{斜边}}\n\\]\n例如，若角 \\(\\theta\\) 的邻边长度为 \\(a\\)，斜边长度为 \\(c\\)，则 \\(\\cos \\theta = \\frac{a}{c}\\)。\n\n### 2. **单位圆上的定义**\n在直角坐标系中，以原点为中心、半径为1的单位圆上，任意角度 \\(\\theta\\) 对应的终边与单位圆的交点坐标为 \\((\\cos \\theta, \\sin \\theta)\\)。此时：\n- **余弦值**是该点的横坐标（\\(x\\) 坐标）。\n- 当角度 \\(\\theta\\) 变化时，\\(\\cos \\theta\\) 的值在 \\([-1, 1]\\) 之间周期性波动。\n\n### 3. **周期性函数**\n余弦函数是周期为 \\(2\\pi\\)（或 \\(360^\\circ\\)）的周期函数，满足：\n\\[\n\\cos (\\theta + 2\\pi) = \\cos \\theta\n\\]\n其图像为一条连续的波浪线（余弦曲线），关于 \\(y\\) 轴对称（偶函数）。\n\n### 4. **级数展开**\n余弦可以通过无限级数（泰勒级数）表示：\n\\[\n\\cos \\theta = 1 - \\frac{\\theta^2}{2!} + \\frac{\\theta^4}{4!} - \\frac{\\theta^6}{6!} + \\cdots \\quad (\\theta \\text{为弧度})\n\\]\n\n### 5. **应用领域**\n- **几何学**：计算角度或边长。\n- **物理学**：描述振动、波动（如简谐运动）。\n- **工程学**：信号处理、傅里叶分析。\n- **计算机图形学**：旋转和坐标变换。\n\n### 示例\n若 \\(\\

In [27]:
store.clear()  # 清空存储以便下次测试

In [28]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import ConfigurableFieldSpec
from langchain_core.runnables.history import RunnableWithMessageHistory

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)


def get_session_history(user_id: str, conversation_id: str) -> BaseChatMessageHistory:
    """
    根据用户ID和对话ID获取聊天历史记录
    如果不存在则创建新的历史记录对象

    参数:
        user_id: 用户的唯一标识符
        conversation_id: 对话的唯一标识符

    返回:
        对应的聊天历史记录对象
    """
    if (user_id, conversation_id) not in store:
        store[(user_id, conversation_id)] = (
            InMemoryHistory()
        )  # 为新会话创建新的历史记录对象
    return store[(user_id, conversation_id)]


# 创建聊天提示模板，包含系统提示，历史记录和用户问题
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "你是一个擅长{ability}的助手",
        ),  # 系统角色提示，使用ability遍历定义助手专长
        MessagesPlaceholder(variable_name="history"),  # 放置历史消息的占位符
        ("human", "{input}"),  # 用户问题的占位符
    ]
)

# 将提示词模板和大模型连接成一个链
chain = prompt | llm

# 创建带有消息历史功能的可运行链
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history=get_session_history,  # 获取历史记录的函数
    input_messages_key="input",  # 用户输入的键名
    history_messages_key="history",  # 历史消息的键名
    history_factory_config=[  # 历史记录工厂配置
        ConfigurableFieldSpec(
            id="user_id",  # 配置字段ID
            annotation=str,  # 类型注解
            name="用户ID",  # 字段名称
            description="用户的唯一标识符",  # 字段描述
            default="",  # 默认值
            is_shared=True,  # 是否在多个调用间共享
        ),
        ConfigurableFieldSpec(
            id="conversation_id",  # 配置字段ID
            annotation=str,  # 类型注解
            name="对话ID",  # 字段名称
            description="对话的唯一标识符",  # 字段描述
            default="",  # 默认值
            is_shared=True,  # 是否在多个调用间共享
        ),
    ],
)

# 首次调用链，询问余弦的含义
response = chain_with_history.invoke(
    {"ability": "数学", "input": "余弦的含义是什么？"},
    config={
        "configurable": {
            # "session_id": "1",  # 配置会话ID
            "user_id": "123",
            "conversation_id": "1",
        }
    },
)
print(response)

# 打印存储中的历史记录
print(store)

content='余弦（cosine）是三角函数之一，通常用于描述直角三角形中角度与边长之间的关系，或在单位圆中表示角度与坐标的对应关系。以下是其核心含义和定义：\n\n### 1. **直角三角形中的定义**\n在直角三角形中，余弦值定义为某一锐角的邻边长度与斜边长度的比值：\n\\[\n\\cos \\theta = \\frac{\\text{邻边}}{\\text{斜边}}\n\\]\n例如，若角θ的邻边长为3，斜边为5，则 \\(\\cos \\theta = \\frac{3}{5}\\)。\n\n### 2. **单位圆中的定义**\n在直角坐标系中，以原点为中心、半径为1的单位圆上，任意角度θ对应的终边与圆交于点 \\((x, y)\\)，则：\n\\[\n\\cos \\theta = x \\text{（横坐标）}\n\\]\n此时，余弦值直接反映角度θ在单位圆上的水平投影。\n\n### 3. **周期性函数**\n余弦函数是周期为 \\(2\\pi\\)（360°）的周期函数，满足：\n\\[\n\\cos(\\theta + 2\\pi) = \\cos \\theta\n\\]\n其图像为连续的波浪形曲线（余弦波），在 \\([0, \\pi]\\) 单调递减，值域为 \\([-1, 1]\\)。\n\n### 4. **扩展定义（任意角）**\n通过单位圆定义，余弦可推广到任意角度（包括负角和大于360°的角）。例如：\n- \\(\\cos 0° = 1\\)（终边与x轴正方向重合）\n- \\(\\cos 90° = 0\\)（终边与y轴正方向重合）\n- \\(\\cos 180° = -1\\)\n\n### 5. **与其他函数的关系**\n- **与正弦的关系**：\\(\\cos \\theta = \\sin\\left(\\frac{\\pi}{2} - \\theta\\right)\\)\n- **勾股定理**：\\(\\sin^2 \\theta + \\cos^2 \\theta = 1\\)\n- **导数**：\\(\\frac{d}{d\\theta} \\cos \\theta = -\\sin \\theta\\)\n\n### 6. **应用场景**\n- **物理学**：描述简谐振动、波动现象。\

### 长时记忆：RunnableWithMessageHistory
使用Redis构建长期记忆
+ 安装redis（推荐安装Redis Stack）
+ 运行redis服务
+ 配置长期记忆

In [29]:
! pip install -qU langchain-redis langchain-openai redis

#### 测试Redis连接正常

In [31]:
import os

REDIS_URL = "redis://localhost:6379"
print(f"Connection to Redis at: {REDIS_URL}")

Connection to Redis at: redis://localhost:6379


In [32]:
# 简单使用Redis来存储聊天消息
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_redis import RedisChatMessageHistory

history = RedisChatMessageHistory(session_id="user_123", redis_url=REDIS_URL)
history.clear()  # 首先清空历史记录

# 向历史记录中添加消息
history.add_user_message("你好, AI助手")  # 添加用户消息
history.add_ai_message("你好! 我今天能为你提供什么帮助？")  # 添加AI消息

# 检索并显示历史消息
print("聊天历史：")
for message in history.messages:
    # 打印每条消息的类型和内容
    print(f"{type(message).__name__}: {message.content}")

ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import ConfigurableFieldSpec
from langchain_core.runnables.history import RunnableWithMessageHistory

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)


## 使用LCEL来自定义路由链

In [36]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda

llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)

# 创建分类链 - 用于确定问题类型
chain = (
    # 创建提示模板，要求模型将问题分类为LangChain、Anthropic或Other
    PromptTemplate.from_template(
        """根据下面的用户问题，将其分类为`LangChain`、`Anthropic`、`Other`
        请只回复一个词作为答案。
        <question>
        {question}
        </question>

        分类结果："""
    )
    | llm
    | StrOutputParser()
)

# 创建LangChain专家链 - 模拟Harrison Chase(LangChain创始人)的回答风格
langchain_chain = (
    PromptTemplate.from_template(
        """你将扮演一个LangChain专家，请以他的视角回答问题。\
        你的回答必须以“正如Harrison Chase告诉我的”开头，否则你会受到惩罚。 \
        请回答以下问题：

        问题：{question}
        回答："""
    )
    | llm
)

# 创建Anthropic专家链 - 模拟Dario Amodei(Anthropic创始人)的回答风格
anthropic_chain = (
    PromptTemplate.from_template(
        """你将扮演一个Anthropic专家，请以他的视角回答问题。\
        你的回答必须以“正如Dario Amode告诉我的”开头，否则你会受到惩罚。 \
        请回答以下问题：

        问题：{question}
        回答："""
    )
    | llm
)

# 创建通用回答链 - 用于处理其他类型的问题
general_chain = (
    PromptTemplate.from_template(
        """
        请回答以下问题：

        问题：{question}
        回答："""
    )
    | llm
)


# 自定义路由函数 - 根据问题分类结果选择合适的回答链
def route(info):
    print(info)  # 打印分类结果
    if "anthropic" in info["topic"].lower():  # 如果问题与Anthropic相关
        print("claude")
        return anthropic_chain.invoke(info)  # 使用Anthropic专家链
    elif "langchain" in info["topic"].lower():  # 如果问题与LangChain相关
        print("langchain")
        return langchain_chain.invoke(info)  # 使用LangChain专家链
    else:  # 其他类型的问题
        print("general")
        return general_chain.invoke(info)  # 使用通用回答链


# 创建完整的处理链
# 1.首先将问题分类并保留原始问题
# 2.然后根据分类结果路由到相应的专家链处理
full_chain = {"topic": chain, "question": lambda x: x["question"]} | RunnableLambda(
    route
)

# 调用完整链处理用户问题
full_chain.invoke({"question": "我该如何使用claude?"})

16:19:14 httpx INFO   HTTP Request: POST https://ark.cn-beijing.volces.com/api/v3/chat/completions "HTTP/1.1 200 OK"
{'topic': 'Anthropic', 'question': '我该如何使用claude?'}
claude
16:19:20 httpx INFO   HTTP Request: POST https://ark.cn-beijing.volces.com/api/v3/chat/completions "HTTP/1.1 200 OK"


AIMessage(content='正如Dario Amodei告诉我的，使用Claude非常简单。您可以通过以下方式使用：\n\n1. 直接访问Claude的官方网站或集成平台\n2. 像使用其他AI助手一样输入您的问题或请求\n3. Claude会以自然语言与您对话，帮助完成各种任务\n\n关键是要清晰表达您的需求，Claude擅长理解自然语言指令。建议您：\n- 明确说明任务类型（写作、分析、编程等）\n- 提供足够的背景信息\n- 必要时可以要求分步解答\n\n记住，Claude的设计初衷是成为安全、有帮助的AI助手。如果您有任何使用上的疑问，随时可以继续询问。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 138, 'prompt_tokens': 55, 'total_tokens': 193, 'completion_tokens_details': {'accepted_prediction_tokens': None, 'audio_tokens': None, 'reasoning_tokens': 0, 'rejected_prediction_tokens': None}, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'deepseek-v3-250324', 'system_fingerprint': None, 'id': '021757578754626472b6d1fbda1b6afdc8beaf9e8f97c72849ee9', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--403e9911-b8a2-41ae-bc53-13934d7ac6c4-0', usage_metadata={'input_tokens': 55, 'output_tokens': 138, 'total_tokens': 193, 'input_token_details': 

## 回退机制
模型API速率限制

In [43]:
# 导入必要的库
from unittest.mock import patch  # 导入mock库,用于模拟函数行为
from langchain_anthropic import ChatAnthropic  # 导入Anthropic的语言模型接囗
from langchain_openai import ChatOpenAI  # 导入OpenAI的语言模型接囗
from langchain_deepseek import ChatDeepSeek
import httpx  # HTTP客户端库
from openai import RateLimitError  # OpenAI的速率限制错误类

# 创建模拟HTTP请求和响应对象，用于构造模拟的API错误
request = httpx.Request("GET", "/")  # 创建一个GET请求
response = httpx.Response(200, request=request)  # 创建一个状态码为200的响应
# 创建一个openAI速率限制错误对象，用于模拟API调用超出速率限制的情况
error = RateLimitError("rate limit", response=response, body="")

# 初始化主模型 (OpenAI)
# 注意: 设置max retries=0是为了避免在遇到速率限制等错误时自动重试
openai_llm = ChatOpenAI(
    model=os.environ.get("OPENAPI_MODEL"),
    base_url=os.environ.get("OPENAPI_API_BASE"),
    api_key=os.environ.get("OPENAPI_API_KEY"),
    temperature=0,
)

# 初始化备用模型 (DeepSeek)
deepseek_llm = ChatDeepSeek(
    model=os.environ.get("DEEPSEEK_MODEL"),
    api_base=os.environ.get("DEEPSEEK_API_BASE"),
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    temperature=0.0,
)

# 创建带有备用选项的语言模型
# 如果主模型调用失败,将自动尝试使用备用模型
llm_with_fallbacks = openai_llm.with_fallbacks([deepseek_llm])

# 测试使用备用机
# 使用patch模拟OpenAI API调用失败(抛出速率限制错误)
with patch("openai.resources.chat.completions.Completions.create", side_effect=error):
    try:
        # 尝试调用语言模型回答中文问题
        # 由于OpenAI被模拟为失败,应该自动切换到DeepSeek模型
        print(llm_with_fallbacks.invoke("为什么程序员要学会python?"))
    except RateLimitError as e:
        # 如果仍然遇到错误(备用机制失败),则打印错误信息
        print(f"仍然遇到错误: {e}")
        print("Hit error")

仍然遇到错误: rate limit
Hit error
