## 1. 什么是 LangChain 的"链"？

简单理解：链就是把多个处理步骤连接起来，数据从第一步流向最后一步，就像工厂的流水线一样。

用生活中的例子来理解
想象你要做一杯咖啡：

磨豆子 → 2. 冲泡 → 3. 加糖 → 4. 装杯
在 LangChain 中，链就是这样的流水线：

准备提示词 → 2. 调用LLM → 3. 解析结果 → 4. 返回数据

链还能支持 **异步调用**，**流式处理**，**批量处理**，**错误处理和重试** 等高级功能。

## 2. LCEL 语法

### 2.1 Python 魔术方法基础

在 Python 中，当你使用 | 操作符时，实际上调用的是对象的魔术方法：

```python
# 当你写 a | b 时，Python 实际执行：
result = a.__or__(b)

# 如果 a 没有 __or__ 方法，Python 会尝试：
result = b.__ror__(a)  # ror = reverse or
```

**LangChain 中的实现**

LangChain 的所有 Runnable 对象（包括 prompt、model、parser）都实现了这些魔术方法：

```python
class BaseRunnable:
    def __or__(self, other):
        """实现 self | other"""
        return RunnableSequence(first=self, last=other)
    
    def __ror__(self, other):
        """实现 other | self（当 other 没有 __or__ 时）"""
        return RunnableSequence(first=other, last=self)
```

**实际效果**


```python
# 这三种写法是等价的：
chain1 = prompt | model | parser
chain2 = prompt.__or__(model).__or__(parser)
chain3 = RunnableSequence(first=prompt, last=RunnableSequence(first=model, last=parser))
```


In [None]:
# prompt.__or__(model) 的伪代码实现

class PromptTemplate(BaseRunnable):
    def __init__(self, template, input_variables):
        self.template = template
        self.input_variables = input_variables
    
    def __or__(self, other):
        """
        实现 prompt | model 的核心逻辑
        """
        # 创建一个新的序列链
        return RunnableSequence(
            first=self,      # 当前的 prompt
            last=other       # 传入的 model
        )
    
    def invoke(self, inputs):
        """执行 prompt 的格式化"""
        return self.template.format(**inputs)

class RunnableSequence(BaseRunnable):
    def __init__(self, first, last):
        self.first = first    # 第一个组件（prompt）
        self.last = last      # 第二个组件（model）
    
    def invoke(self, inputs):
        """
        执行序列链的核心逻辑
        """
        # 步骤1：执行第一个组件
        intermediate_result = self.first.invoke(inputs)
        
        # 步骤2：将第一个组件的输出作为第二个组件的输入
        final_result = self.last.invoke(intermediate_result)
        
        return final_result
    
    def __or__(self, other):
        """
        支持继续链接：(prompt | model) | parser
        """
        return RunnableSequence(
            first=self,      # 当前的序列链
            last=other       # 新的组件
        )

# 使用示例的内部执行流程
prompt = PromptTemplate("分析：{text}", ["text"])
model = SomeLLM()

# 当执行 prompt | model 时：
chain = prompt.__or__(model)  # 返回 RunnableSequence(first=prompt, last=model)

# 当执行 chain.invoke({"text": "项目进度"}) 时：
# 1. 调用 RunnableSequence.invoke({"text": "项目进度"})
# 2. intermediate_result = prompt.invoke({"text": "项目进度"})  # 返回 "分析：项目进度"
# 3. final_result = model.invoke("分析：项目进度")  # 返回 LLM 的响应
# 4. 返回 final_result


In [None]:
# 完整的三步链：prompt | model | parser
def create_three_step_chain():
    prompt = PromptTemplate("分析：{text}", ["text"])
    model = SomeLLM()
    parser = SomeParser()
    
    # 第一步：prompt | model
    step1_chain = prompt.__or__(model)
    # 等价于：RunnableSequence(first=prompt, last=model)
    
    # 第二步：(prompt | model) | parser
    final_chain = step1_chain.__or__(parser)
    # 等价于：RunnableSequence(first=step1_chain, last=parser)
    
    return final_chain

# 执行时的内部流程
def execute_chain(inputs):
    # inputs = {"text": "项目进度"}
    
    # 第一层 RunnableSequence.invoke()
    # first = RunnableSequence(prompt, model)
    # last = parser
    
    # 执行 first.invoke(inputs)
    # 这会触发第二层 RunnableSequence.invoke()
    # first = prompt, last = model
    
    # 执行 prompt.invoke({"text": "项目进度"})
    prompt_result = "分析：项目进度"
    
    # 执行 model.invoke("分析：项目进度")
    model_result = "这是一个关于项目进度的分析..."
    
    # 第一层继续执行 last.invoke(model_result)
    # 执行 parser.invoke("这是一个关于项目进度的分析...")
    final_result = {"analysis": "项目进度良好"}
    
    return final_result


## 3 其他问题

为什么需要 __ror__？



In [None]:
# 假设你有一个自定义函数想要加入链中
def custom_preprocessor(text):
    return text.upper()

# 如果 custom_preprocessor 没有 __or__ 方法
# 但 prompt 有 __ror__ 方法，这样就能工作：
chain = custom_preprocessor | prompt | model

# Python 会调用：
# prompt.__ror__(custom_preprocessor)


## 3. Runnable 对象接口

Runnable 对象是一个可以被调用、异步执行、批处理、流式处理、并行处理的工作单元，通过schema属性、run方法定义。


1. Runnable 核心执行方法:

基础执行方法

```python
# 同步执行
result = runnable.invoke(input)

# 异步执行
result = await runnable.ainvoke(input)

# 批量执行
results = runnable.batch([input1, input2, input3])
results = await runnable.abatch([input1, input2, input3])

# 流式执行
for chunk in runnable.stream(input):
    print(chunk)

async for chunk in runnable.astream(input):
    print(chunk)
```

输入输出类型

```python
# Runnable 是泛型，定义输入输出类型
class MyRunnable(Runnable[Dict, str]):  # 输入Dict，输出str
    def invoke(self, input: Dict) -> str:
        return f"处理结果: {input}"
```

2. 链式组合操作
管道操作符 |

```python
# 串行组合
chain = prompt | model | parser

# 等价于
chain = RunnableSequence(first=prompt, last=RunnableSequence(first=model, last=parser))

### 3.1 schema 属性的作用

#### 每个 Runnable 都有这两个属性
runnable.input_schema   # 定义期望的输入格式
runnable.output_schema  # 定义输出的数据格式

#### 主要用途
- 类型检查: 验证输入数据是否符合预期格式
- 文档生成: 自动生成API文档
- IDE支持: 提供代码补全和类型提示
- 调试帮助: 快速定位数据格式问题

### 3.2 Schema 的数据结构
Schema 通常是 JSON Schema 格式或 Pydantic 模型：

```python
# JSON Schema 格式示例
{
    "type": "object",
    "properties": {
        "text": {"type": "string"},
        "temperature": {"type": "number", "minimum": 0, "maximum": 2}
    },
    "required": ["text"]
}

# Pydantic 模型格式
class InputModel(BaseModel):
    text: str
    temperature: float = 0.7
```
### 3.3 实际例子 PromptTemplate 的 Schema


In [None]:
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    input_variables=["name", "age"],
    template="你好，我是{name}，今年{age}岁"
)

# 查看输入 schema
print("输入 Schema:")
print(prompt.input_schema.schema())

# 查看输出 schema
print("输出 Schema:")
print(prompt.output_schema.schema())


输入 Schema:
{'properties': {'age': {'title': 'Age', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}}, 'required': ['age', 'name'], 'title': 'PromptInput', 'type': 'object'}
输出 Schema:
{'$defs': {'AIMessage': {'additionalProperties': True, 'description': 'Message from an AI.\n\nAIMessage is returned from a chat model as a response to a prompt.\n\nThis message represents the output of the model and consists of both\nthe raw output as returned by the model together standardized fields\n(e.g., tool calls, usage metadata) added by the LangChain framework.', 'properties': {'content': {'anyOf': [{'type': 'string'}, {'items': {'anyOf': [{'type': 'string'}, {'additionalProperties': True, 'type': 'object'}]}, 'type': 'array'}], 'title': 'Content'}, 'additional_kwargs': {'additionalProperties': True, 'title': 'Additional Kwargs', 'type': 'object'}, 'response_metadata': {'additionalProperties': True, 'title': 'Response Metadata', 'type': 'object'}, 'type': {'const': 'ai', 'default': 

C:\Users\Administrator\AppData\Local\Temp\ipykernel_29948\2222964862.py:10: PydanticDeprecatedSince20: The `schema` method is deprecated; use `model_json_schema` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  print(prompt.input_schema.schema())


**练习**
1. 掌握 LLM 的 Schema
2. 掌握链式组合的 Schema
3. 并行组合的 Schema （后面讲并行）

### 常见的应用场景

```python
# API 文档生成
def generate_api_docs(runnable):
    """根据 schema 生成 API 文档"""
    input_schema = runnable.input_schema.schema()
    output_schema = runnable.output_schema.schema()
    
    docs = f"""
    API 接口文档:
    
    输入格式:
    {json.dumps(input_schema, indent=2, ensure_ascii=False)}
    
    输出格式:
    {json.dumps(output_schema, indent=2, ensure_ascii=False)}
    """
    return docs

# 输入验证
def validate_input(runnable, input_data):
    """验证输入数据是否符合 schema"""
    try:
        # 使用 schema 验证输入
        validated = runnable.input_schema(**input_data)
        return True, validated
    except Exception as e:
        return False, str(e)

# 使用示例
is_valid, result = validate_input(analyzer, {
    "name": "张三",
    "age": 30,
    "email": "zhangsan@example.com"
})

# 类型安全的调用
def safe_invoke(runnable, input_data):
    """类型安全的调用方法"""
    # 验证输入
    is_valid, validated_input = validate_input(runnable, input_data)
    if not is_valid:
        raise ValueError(f"输入验证失败: {validated_input}")
    
    # 执行调用
    result = runnable.invoke(validated_input)
    
    # 可以进一步验证输出格式
    return result

# 调试
# 快速查看 schema
def inspect_runnable(runnable):
    print(f"组件类型: {type(runnable).__name__}")
    print(f"输入类型: {runnable.input_schema}")
    print(f"输出类型: {runnable.output_schema}")
    
    # 如果是链，递归查看每个组件
    if hasattr(runnable, 'steps'):
        for i, step in enumerate(runnable.steps):
            print(f"步骤 {i+1}: {type(step).__name__}")

# 测试数据生成
def generate_test_data(schema):
    """根据 schema 生成测试数据"""
    # 这里可以实现根据 schema 自动生成测试数据的逻辑
    pass

```

### ainvoke 异步方法

#### 1. ainvoke() 方法的基本原理
为什么需要 ainvoke()？

```python
# 同步调用 - 会阻塞当前线程
result = runnable.invoke(input)  # 等待几秒钟才返回

# 异步调用 - 不阻塞，可以并发执行其他任务
result = await runnable.ainvoke(input)  # 非阻塞执行
```

ainvoke() 的声明结构
```python
class BaseRunnable:
    async def ainvoke(self, input, config=None):
        """异步版本的 invoke 方法"""
        # 如果子类没有实现异步版本，就用线程池执行同步版本
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            None,  # 使用默认线程池
            self.invoke,  # 要执行的同步函数
            input,  # 传递给 invoke 的参数
            config
        )
```

#### 2. 执行流程详解

步骤1：获取事件循环  

loop = asyncio.get_running_loop()

作用：

- 获取当前正在运行的异步事件循环
- 事件循环是异步程序的核心，负责调度和执行异步任务
- 如果没有运行中的事件循环，会抛出 RuntimeError

步骤2：使用线程池执行器

await loop.run_in_executor(None, self.invoke, input, config)

参数解释：

- None：使用默认的 ThreadPoolExecutor
- self.invoke：要在线程池中执行的同步函数
- input, config：传递给函数的参数

步骤3：线程池执行

```python
# 内部执行过程（简化版）
def run_in_executor(executor, func, *args):
    # 1. 将同步函数提交到线程池
    future = executor.submit(func, *args)
    
    # 2. 将线程池的 Future 转换为 asyncio.Future
    asyncio_future = asyncio.wrap_future(future)
    
    # 3. 返回可等待的 Future 对象
    return asyncio_future
```

#### 3. 完整的执行示例

In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor

class SimpleRunnable:
    def invoke(self, input):
        """同步方法 - 模拟耗时操作"""
        import time
        print(f"开始处理: {input}")
        time.sleep(2)  # 模拟耗时操作
        print(f"处理完成: {input}")
        return f"结果: {input}"
    
    async def ainvoke(self, input):
        """异步方法 - 使用线程池执行同步方法"""
        print(f"异步调用开始: {input}")
        
        # 步骤1: 获取事件循环
        loop = asyncio.get_running_loop()
        print(f"获取到事件循环: {loop}")
        
        # 步骤2: 在线程池中执行同步方法
        print(f"提交到线程池执行...")
        result = await loop.run_in_executor(
            None,           # 默认线程池
            self.invoke,    # 同步方法
            input          # 参数
        )
        
        print(f"异步调用完成: {input}")
        return result

# 使用示例
async def demo():
    runnable = SimpleRunnable()
    
    # 并发执行多个异步调用
    tasks = [
        runnable.ainvoke("任务1"),
        runnable.ainvoke("任务2"), 
        runnable.ainvoke("任务3")
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")

# 运行演示
asyncio.run(demo())


#### 4. 为什么使用 run_in_executor？

问题：同步代码在异步环境中的困境

```python
# 错误的做法 - 会阻塞整个事件循环
async def bad_example():
    result1 = some_sync_function()  # 阻塞2秒
    result2 = another_sync_function()  # 又阻塞2秒
    # 总共需要4秒，无法并发
```
解决方案：线程池执行
```python
# 正确的做法 - 使用线程池
async def good_example():
    loop = asyncio.get_running_loop()
    
    # 并发执行，总共只需要2秒
    task1 = loop.run_in_executor(None, some_sync_function)
    task2 = loop.run_in_executor(None, another_sync_function)
    
    result1, result2 = await asyncio.gather(task1, task2)
```
#### 5. 事件循环的工作原理
```python
# 事件循环的简化模型
class EventLoop:
    def __init__(self):
        self.tasks = []
        self.thread_pool = ThreadPoolExecutor()
    
    def run_in_executor(self, executor, func, *args):
        """在线程池中执行同步函数"""
        # 1. 提交到线程池
        future = self.thread_pool.submit(func, *args)
        
        # 2. 创建异步 Future
        async_future = asyncio.Future()
        
        # 3. 当线程池任务完成时，设置异步 Future 的结果
        def on_done(thread_future):
            try:
                result = thread_future.result()
                async_future.set_result(result)
            except Exception as e:
                async_future.set_exception(e)
        
        future.add_done_callback(on_done)
        return async_future
```


#### 6. 何时使用异步

```python
# LLM 调用通常是网络请求，天然适合异步
class AsyncLLM:
    async def ainvoke(self, prompt):
        # 直接使用异步 HTTP 客户端
        async with aiohttp.ClientSession() as session:
            response = await session.post(api_url, json={"prompt": prompt})
            return await response.json()

# 但某些组件可能只有同步实现
class SyncProcessor:
    def invoke(self, input):
        # 复杂的同步处理逻辑
        return process_data(input)
    
    async def ainvoke(self, input):
        # 使用线程池包装同步方法
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self.invoke, input)
```

## stream方法
1. stream 方法返回迭代器：使用 yield 逐步返回结果
2. 实时显示：用 print(chunk, end="", flush=True) 实现逐字显示效果
3. 与 invoke 对比：invoke 一次性返回，stream 分步返回


```python
"""
Stream 方法
"""
from langchain_core.runnables import Runnable
from typing import Iterator
import time

class SimpleStreamRunnable(Runnable[str, str]):
    """简单的流式输出演示"""
    
    def invoke(self, input: str) -> str:
        """普通调用 - 一次性返回完整结果"""
        return f"完整处理结果: {input}"
    
    def stream(self, input: str) -> Iterator[str]:
        """流式调用 - 逐步返回部分结果"""
        words = f"逐步处理结果: {input}".split()
        
        for word in words:
            time.sleep(0.5)  # 模拟处理延迟
            yield word + " "  # 逐个返回单词

# 使用演示
def demo_stream():
    runnable = SimpleStreamRunnable()
    
    print("=== 普通调用 ===")
    result = runnable.invoke("测试文本")
    print(result)
    
    print("\n=== 流式调用 ===")
    for chunk in runnable.stream("测试文本"):
        print(chunk, end="", flush=True)  # 实时显示，不换行
    
    print("\n\n演示完成")

# 运行演示
demo_stream()
```

**astream 是 stream 的异步版本，astream的默认实现调用了ainvoke**

### batch方法

batch方法用于处理多个输入（批处理）

工程价值：
- 性能优化：批量处理比逐个处理快3-5倍
- 资源管理：分批处理避免内存溢出
- 错误处理：批次失败时自动降级为单个处理
- 监控报告：提供详细的处理统计和分析

适用场景：
- 大量文档批量分析
- 客户反馈批量处理
- 数据清洗和标注
- 内容审核和分类
- 报告自动生成

In [10]:
"""
LangChain 原生 batch 方法演示
"""
from langchain_core.runnables import Runnable
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Tongyi
from typing import Optional, Dict, Any

class SimpleAnalyzer(Runnable[str, str]):
    """简单分析器"""
    
    def invoke(self, input: str, config: Optional[Dict[str, Any]] = None) -> str:
        """单个分析"""
        return f"分析结果: {input} -> 类型:文档, 重要性:中等"

def demo_langchain_batch():
    """LangChain batch 方法演示"""
    
    # 1. 使用自定义 Runnable
    analyzer = SimpleAnalyzer()
    
    inputs = ["报告A", "合同B", "邮件C"]
    
    print("=== 使用自定义 Runnable ===")
    print(f"输入: {inputs}")
    
    # LangChain 的 batch 方法
    results = analyzer.batch(inputs)
    
    print("批量处理结果:")
    for result in results:
        print(f"  {result}")
    
    # 2. 使用 PromptTemplate
    print("\n=== 使用 PromptTemplate ===")
    
    prompt = PromptTemplate(
        input_variables=["text"],
        template="请分析: {text}"
    )
    
    texts = ["项目进展", "用户反馈", "市场分析"]
    inputs_dict = [{"text": text} for text in texts]
    
    print(f"输入: {texts}")
    
    # PromptTemplate 的 batch 方法
    prompt_results = prompt.batch(inputs_dict)
    
    print("提示词批量生成结果:")
    for result in prompt_results:
        print(f"  {result}")
    
    # 3. 链式 batch
    print("\n=== 链式 batch ===")
    
    # 创建简单链
    chain = prompt | analyzer
    
    # 链的 batch 方法
    chain_results = chain.batch(inputs_dict)
    
    print("链式批量处理结果:")
    for result in chain_results:
        print(f"  {result}")

if __name__ == "__main__":
    demo_langchain_batch()


=== 使用自定义 Runnable ===
输入: ['报告A', '合同B', '邮件C']
批量处理结果:
  分析结果: 报告A -> 类型:文档, 重要性:中等
  分析结果: 合同B -> 类型:文档, 重要性:中等
  分析结果: 邮件C -> 类型:文档, 重要性:中等

=== 使用 PromptTemplate ===
输入: ['项目进展', '用户反馈', '市场分析']
提示词批量生成结果:
  text='请分析: 项目进展'
  text='请分析: 用户反馈'
  text='请分析: 市场分析'

=== 链式 batch ===
链式批量处理结果:
  分析结果: text='请分析: 项目进展' -> 类型:文档, 重要性:中等
  分析结果: text='请分析: 用户反馈' -> 类型:文档, 重要性:中等
  分析结果: text='请分析: 市场分析' -> 类型:文档, 重要性:中等


### 并行组合

```python
from langchain_core.runnables import RunnableParallel

# 并行执行多个分支
parallel = RunnableParallel({
    "summary": summary_chain,
    "keywords": keyword_chain,
    "sentiment": sentiment_chain
})

# 输入会同时发送给所有分支
result = parallel.invoke({"text": "分析这段文本"})
# 输出: {"summary": "...", "keywords": [...], "sentiment": "positive"}
```



### 条件分支

RunnableBranch 是一种重要的路由机制，它根据给定的条件选择不同的处理路径。在运行过程中还能根据输入动态选择不同的执行路径。

```python
from langchain_core.runnables import RunnableBranch

# 根据条件选择不同处理路径
branch = RunnableBranch(
    (lambda x: x["type"] == "question", qa_chain),
    (lambda x: x["type"] == "summary", summary_chain),
    default_chain
)
```


In [11]:
"""
RunnableBranch 条件分支演示
"""
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Tongyi
from typing import Dict, Any

# === 前面的准备工作 ===

# 1. 创建不同类型的处理链
# 问答链
qa_prompt = PromptTemplate(
    input_variables=["content"],
    template="请回答以下问题：{content}"
)
qa_llm = Tongyi(model_name="qwen-max", temperature=0.1)
qa_chain = qa_prompt | qa_llm

# 摘要链
summary_prompt = PromptTemplate(
    input_variables=["content"], 
    template="请总结以下内容：{content}"
)
summary_llm = Tongyi(model_name="qwen-max", temperature=0.3)
summary_chain = summary_prompt | summary_llm

# 默认处理链
default_prompt = PromptTemplate(
    input_variables=["content"],
    template="请分析以下内容：{content}"
)
default_llm = Tongyi(model_name="qwen-max", temperature=0.5)
default_chain = default_prompt | default_llm

# === 核心代码：创建条件分支 ===
branch = RunnableBranch(
    (lambda x: x["type"] == "question", qa_chain),
    (lambda x: x["type"] == "summary", summary_chain),
    default_chain  # 默认分支
)

# === 后面的使用方法 ===

# 使用示例
def demo_branch():
    """演示条件分支的使用"""
    
    # 测试数据
    test_cases = [
        {
            "type": "question",
            "content": "什么是人工智能？"
        },
        {
            "type": "summary", 
            "content": "人工智能是计算机科学的一个分支，致力于创建能够执行通常需要人类智能的任务的系统。它包括机器学习、深度学习、自然语言处理等多个子领域。"
        },
        {
            "type": "other",
            "content": "今天天气很好"
        }
    ]
    
    print("=== RunnableBranch 条件分支演示 ===")
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"\n--- 测试用例 {i} ---")
        print(f"类型: {test_case['type']}")
        print(f"内容: {test_case['content']}")
        
        # 执行条件分支
        result = branch.invoke(test_case)
        print(f"处理结果: {result}")

# 更复杂的条件分支示例
def create_advanced_branch():
    """创建更复杂的条件分支"""
    
    # 定义更多条件
    advanced_branch = RunnableBranch(
        # 条件1：问题类型
        (lambda x: x["type"] == "question", qa_chain),
        
        # 条件2：摘要类型
        (lambda x: x["type"] == "summary", summary_chain),
        
        # 条件3：长文本（超过100字）
        (lambda x: len(x["content"]) > 100, 
         PromptTemplate(input_variables=["content"], template="这是长文本，请详细分析：{content}") | default_llm),
        
        # 条件4：包含特定关键词
        (lambda x: "紧急" in x["content"], 
         PromptTemplate(input_variables=["content"], template="紧急处理：{content}") | default_llm),
        
        # 默认分支
        default_chain
    )
    
    return advanced_branch

# 异步使用
async def demo_async_branch():
    """异步使用条件分支"""
    test_input = {
        "type": "question",
        "content": "如何使用 LangChain？"
    }
    
    # 异步调用
    result = await branch.ainvoke(test_input)
    print(f"异步结果: {result}")

# 批量处理
def demo_batch_branch():
    """批量处理演示"""
    batch_inputs = [
        {"type": "question", "content": "什么是机器学习？"},
        {"type": "summary", "content": "机器学习是人工智能的一个重要分支..."},
        {"type": "other", "content": "随机内容"}
    ]
    
    # 批量处理
    results = branch.batch(batch_inputs)
    
    for i, result in enumerate(results):
        print(f"批量结果 {i+1}: {result}")

if __name__ == "__main__":
    # 运行演示
    demo_branch()
    
    # 高级分支演示
    advanced_branch = create_advanced_branch()
    
    # 批量处理演示
    demo_batch_branch()


=== RunnableBranch 条件分支演示 ===

--- 测试用例 1 ---
类型: question
内容: 什么是人工智能？
处理结果: 人工智能（Artificial Intelligence，简称 AI）是指由人创造的能够感知环境、学习知识、逻辑推理并执行任务的智能体。它是一门研究如何让计算机模拟人类智能行为的科学技术，旨在使机器能够完成一些通常需要人类智能才能胜任的复杂任务。

人工智能的核心目标是让机器具备以下一些能力：

- **学习能力**：从数据中自动学习规律，并用于预测或决策（如机器学习、深度学习）。
- **推理能力**：根据已有知识进行逻辑推导（如专家系统）。
- **感知能力**：识别和理解图像、声音、语言等信息（如计算机视觉、语音识别）。
- **语言能力**：理解和生成人类语言（如自然语言处理）。
- **规划与决策能力**：在复杂环境中做出合理决策（如自动驾驶、游戏AI）。

### 人工智能的主要分支包括：

1. **机器学习（Machine Learning）**：通过数据训练模型，使计算机自动改进性能而无需显式编程。
2. **深度学习（Deep Learning）**：基于神经网络的机器学习方法，擅长处理图像、语音、文本等非结构化数据。
3. **自然语言处理（Natural Language Processing, NLP）**：让机器理解和生成人类语言。
4. **计算机视觉（Computer Vision）**：使机器能够“看懂”图像或视频。
5. **机器人学（Robotics）**：结合感知、决策与执行，使机器人能完成复杂任务。
6. **专家系统（Expert Systems）**：模拟人类专家知识解决特定问题。

### 人工智能的应用领域：

- 自动驾驶汽车
- 智能语音助手（如 Siri、Alexa）
- 推荐系统（如 Netflix、淘宝推荐）
- 医疗诊断辅助
- 金融风控与量化交易
- 工业自动化

### 人工智能的分类：

- **弱人工智能（Narrow AI）**：专注于特定任务的人工智能，如语音识别、图像识别。
- **强人工智能（General AI）**：理论上具备与人类相当的通用认知能力，目前尚未实现。

总结来说，人工智能是一门多学科交叉技术，融合了计算机科学、数学、

### 输入输出处理

RunnableLambda（自定义函数）
```python
from langchain_core.runnables import RunnableLambda

# 将普通函数转换为 Runnable，实现与 LCEL 组件兼容
def preprocess(text):
    return text.upper().strip()

preprocessor = RunnableLambda(preprocess)
chain = preprocessor | model
```


### 错误处理和容错
重试机制
```python
from langchain_core.runnables import RunnableRetry

# 自动重试
retry_runnable = RunnableRetry(
    bound=model,
    max_attempts=3,
    wait_exponential_jitter=True
)
```

回退机制
```python
# 主要方法失败时使用备用方法
fallback_chain = primary_model.with_fallbacks([backup_model, simple_template])
```

超时控制
```python
# 设置执行超时
with_timeout = model.with_timeout(30.0)  # 30秒超时
```


In [13]:
"""
企业级 LangChain 应用 - 智能客服系统
兼容当前版本，包含完整的错误处理和容错机制
"""
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Tongyi
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.exceptions import OutputParserException
from typing import Dict, List, Optional, Any
import json
import logging
import time
import asyncio
from functools import wraps

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def retry_with_backoff(max_attempts=3, base_delay=1.0):
    """自定义重试装饰器，实现指数退避"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        logger.error(f"重试失败，已达到最大尝试次数: {e}")
                        raise e
                    
                    delay = base_delay * (2 ** attempt)  # 指数退避
                    logger.warning(f"第{attempt + 1}次尝试失败，{delay}秒后重试: {e}")
                    time.sleep(delay)
            return None
        return wrapper
    return decorator

def timeout_handler(timeout_seconds=30.0):
    """超时控制装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import signal
            
            def timeout_signal_handler(signum, frame):
                raise TimeoutError(f"操作超时 ({timeout_seconds}秒)")
            
            # 设置超时信号
            old_handler = signal.signal(signal.SIGALRM, timeout_signal_handler)
            signal.alarm(int(timeout_seconds))
            
            try:
                result = func(*args, **kwargs)
                signal.alarm(0)  # 取消超时
                return result
            except TimeoutError:
                logger.error(f"操作超时: {timeout_seconds}秒")
                raise
            finally:
                signal.signal(signal.SIGALRM, old_handler)
        return wrapper
    return decorator

class CustomerServiceResponse(BaseOutputParser[Dict]):
    """客服响应解析器"""
    
    def parse(self, text: str) -> Dict:
        try:
            # 尝试解析JSON格式
            if '{' in text and '}' in text:
                start = text.find('{')
                end = text.rfind('}') + 1
                json_str = text[start:end]
                return json.loads(json_str)
            
            # 如果不是JSON，返回简单格式
            return {
                "response": text.strip(),
                "category": "general",
                "confidence": 0.8,
                "requires_human": False
            }
        except Exception as e:
            raise OutputParserException(f"解析失败: {e}")
    
    def get_format_instructions(self) -> str:
        return """请以JSON格式回复：
{
  "response": "回复内容",
  "category": "问题类别(technical/billing/general)",
  "confidence": 0.9,
  "requires_human": false
}"""

class EnterpriseCustomerService:
    """企业级客服系统"""
    
    def __init__(self):
        self.setup_models()
        self.setup_chains()
        self.setup_fallback_system()
        self.performance_stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "average_response_time": 0.0
        }
    
    def setup_models(self):
        """设置模型"""
        # 主要模型 - 高性能
        self.primary_model = Tongyi(
            model_name="qwen-max",
            temperature=0.3,
            max_tokens=500
        )
        
        # 备用模型 - 稳定性优先
        self.backup_model = Tongyi(
            model_name="qwen-plus", 
            temperature=0.1,
            max_tokens=300
        )
    
    def setup_chains(self):
        """设置处理链"""
        self.parser = CustomerServiceResponse()
        
        # 技术问题处理链
        tech_prompt = PromptTemplate(
            input_variables=["question", "user_info"],
            template="""你是技术支持专家，请回答用户的技术问题。

用户信息：{user_info}
问题：{question}

{format_instructions}

请提供专业的技术解答。""",
            partial_variables={"format_instructions": self.parser.get_format_instructions()}
        )
        
        # 账单问题处理链
        billing_prompt = PromptTemplate(
            input_variables=["question", "user_info"],
            template="""你是账单客服专员，请处理用户的账单相关问题。

用户信息：{user_info}
问题：{question}

{format_instructions}

请提供准确的账单信息和解决方案。""",
            partial_variables={"format_instructions": self.parser.get_format_instructions()}
        )
        
        # 通用问题处理链
        general_prompt = PromptTemplate(
            input_variables=["question", "user_info"],
            template="""你是客服代表，请友好地回答用户问题。

用户信息：{user_info}
问题：{question}

{format_instructions}

请提供有帮助的回复。""",
            partial_variables={"format_instructions": self.parser.get_format_instructions()}
        )
        
        # 创建处理链
        self.tech_chain = tech_prompt | self.primary_model | self.parser
        self.billing_chain = billing_prompt | self.primary_model | self.parser
        self.general_chain = general_prompt | self.primary_model | self.parser
    
    def setup_fallback_system(self):
        """设置容错系统"""
        
        # 创建带有回退机制的处理函数
        def create_fallback_chain(primary_chain, chain_name):
            def fallback_processor(input_data):
                try:
                    # 第一层：主要链处理
                    return primary_chain.invoke(input_data)
                except Exception as e:
                    logger.warning(f"{chain_name} 主链失败，尝试备用模型: {e}")
                    try:
                        # 第二层：备用模型处理
                        backup_chain = primary_chain.first | self.backup_model | self.parser
                        return backup_chain.invoke(input_data)
                    except Exception as e2:
                        logger.error(f"{chain_name} 备用模型失败，使用简单响应: {e2}")
                        # 第三层：简单响应
                        return {
                            "response": "抱歉，系统暂时繁忙，请稍后重试或联系人工客服。",
                            "category": "system_error",
                            "confidence": 1.0,
                            "requires_human": True
                        }
            
            return RunnableLambda(fallback_processor)
        
        # 为每个链添加回退机制
        self.tech_chain_with_fallback = create_fallback_chain(self.tech_chain, "技术支持")
        self.billing_chain_with_fallback = create_fallback_chain(self.billing_chain, "账单服务")
        self.general_chain_with_fallback = create_fallback_chain(self.general_chain, "通用服务")
        
        # 创建智能路由分支
        self.smart_router = RunnableBranch(
            (self._is_technical_question, self.tech_chain_with_fallback),
            (self._is_billing_question, self.billing_chain_with_fallback),
            self.general_chain_with_fallback  # 默认分支
        )
    
    def _is_technical_question(self, x: Dict) -> bool:
        """判断是否为技术问题"""
        question = x.get("question", "").lower()
        tech_keywords = ["bug", "错误", "故障", "技术", "API", "代码", "系统", "登录", "密码"]
        return any(keyword in question for keyword in tech_keywords)
    
    def _is_billing_question(self, x: Dict) -> bool:
        """判断是否为账单问题"""
        question = x.get("question", "").lower()
        billing_keywords = ["账单", "费用", "付款", "充值", "退款", "价格", "订单"]
        return any(keyword in question for keyword in billing_keywords)
    
    @retry_with_backoff(max_attempts=3, base_delay=1.0)
    @timeout_handler(timeout_seconds=30.0)
    def _process_with_retry_and_timeout(self, input_data: Dict) -> Dict:
        """带重试和超时的处理方法"""
        return self.smart_router.invoke(input_data)
    
    def process_customer_inquiry(self, question: str, user_info: Dict) -> Dict:
        """处理客户咨询"""
        start_time = time.time()
        self.performance_stats["total_requests"] += 1
        
        try:
            logger.info(f"处理客户咨询: {question[:50]}...")
            
            # 准备输入
            input_data = {
                "question": question,
                "user_info": json.dumps(user_info, ensure_ascii=False)
            }
            
            # 执行带重试和超时的处理
            result = self._process_with_retry_and_timeout(input_data)
            
            # 添加处理时间和状态
            processing_time = round(time.time() - start_time, 2)
            result["processing_time"] = processing_time
            result["status"] = "success"
            
            # 更新性能统计
            self.performance_stats["successful_requests"] += 1
            self._update_average_response_time(processing_time)
            
            logger.info(f"处理完成，耗时: {processing_time}秒")
            return result
            
        except Exception as e:
            processing_time = round(time.time() - start_time, 2)
            self.performance_stats["failed_requests"] += 1
            
            logger.error(f"处理失败: {e}")
            return {
                "response": "系统出现异常，请联系技术支持。",
                "category": "system_error",
                "confidence": 0.0,
                "requires_human": True,
                "status": "error",
                "error": str(e),
                "processing_time": processing_time
            }
    
    def _update_average_response_time(self, new_time: float):
        """更新平均响应时间"""
        total_successful = self.performance_stats["successful_requests"]
        current_avg = self.performance_stats["average_response_time"]
        
        # 计算新的平均值
        new_avg = ((current_avg * (total_successful - 1)) + new_time) / total_successful
        self.performance_stats["average_response_time"] = round(new_avg, 2)
    
    def batch_process_inquiries(self, inquiries: List[Dict]) -> List[Dict]:
        """批量处理客户咨询"""
        logger.info(f"开始批量处理 {len(inquiries)} 个咨询")
        
        results = []
        for inquiry in inquiries:
            result = self.process_customer_inquiry(
                inquiry["question"],
                inquiry.get("user_info", {})
            )
            results.append(result)
        
        logger.info(f"批量处理完成")
        return results
    
    def get_performance_stats(self) -> Dict:
        """获取性能统计"""
        stats = self.performance_stats.copy()
        if stats["total_requests"] > 0:
            stats["success_rate"] = round(
                (stats["successful_requests"] / stats["total_requests"]) * 100, 2
            )
        else:
            stats["success_rate"] = 0.0
        
        return stats

def demo_enterprise_application():
    """企业应用演示"""
    print("=== 企业级 LangChain 客服系统演示 ===")
    
    # 初始化系统
    customer_service = EnterpriseCustomerService()
    
    # 测试用例
    test_cases = [
        {
            "question": "我的API调用出现500错误，怎么解决？",
            "user_info": {"user_id": "12345", "plan": "企业版", "region": "北京"}
        },
        {
            "question": "我想查看本月的账单详情",
            "user_info": {"user_id": "67890", "plan": "标准版", "region": "上海"}
        },
        {
            "question": "你们的服务怎么样？",
            "user_info": {"user_id": "11111", "plan": "免费版", "region": "深圳"}
        }
    ]
    
    # 单个处理演示
    print("\n--- 单个处理演示 ---")
    for i, case in enumerate(test_cases, 1):
        print(f"\n客户咨询 {i}:")
        print(f"问题: {case['question']}")
        print(f"用户信息: {case['user_info']}")
        
        result = customer_service.process_customer_inquiry(
            case["question"], 
            case["user_info"]
        )
        
        print(f"回复: {result['response']}")
        print(f"类别: {result['category']}")
        print(f"置信度: {result['confidence']}")
        print(f"需要人工: {result['requires_human']}")
        print(f"处理时间: {result['processing_time']}秒")
        print(f"状态: {result['status']}")
    
    # 批量处理演示
    print(f"\n--- 批量处理演示 ---")
    batch_results = customer_service.batch_process_inquiries(test_cases)
    
    print(f"批量处理了 {len(batch_results)} 个咨询")
    for i, result in enumerate(batch_results, 1):
        print(f"结果 {i}: {result['category']} - {result['response'][:50]}...")
    
    # 性能统计
    print(f"\n--- 性能统计 ---")
    stats = customer_service.get_performance_stats()
    print(f"总请求数: {stats['total_requests']}")
    print(f"成功请求数: {stats['successful_requests']}")
    print(f"失败请求数: {stats['failed_requests']}")
    print(f"成功率: {stats['success_rate']}%")
    print(f"平均响应时间: {stats['average_response_time']}秒")

if __name__ == "__main__":
    demo_enterprise_application()


INFO:__main__:处理客户咨询: 我的API调用出现500错误，怎么解决？...


=== 企业级 LangChain 客服系统演示 ===

--- 单个处理演示 ---

客户咨询 1:
问题: 我的API调用出现500错误，怎么解决？
用户信息: {'user_id': '12345', 'plan': '企业版', 'region': '北京'}


ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:处理客户咨询: 我想查看本月的账单详情...


回复: 系统出现异常，请联系技术支持。
类别: system_error
置信度: 0.0
需要人工: True
处理时间: 3.0秒
状态: error

客户咨询 2:
问题: 我想查看本月的账单详情
用户信息: {'user_id': '67890', 'plan': '标准版', 'region': '上海'}


ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:处理客户咨询: 你们的服务怎么样？...


回复: 系统出现异常，请联系技术支持。
类别: system_error
置信度: 0.0
需要人工: True
处理时间: 3.01秒
状态: error

客户咨询 3:
问题: 你们的服务怎么样？
用户信息: {'user_id': '11111', 'plan': '免费版', 'region': '深圳'}


ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:开始批量处理 3 个咨询
INFO:__main__:处理客户咨询: 我的API调用出现500错误，怎么解决？...


回复: 系统出现异常，请联系技术支持。
类别: system_error
置信度: 0.0
需要人工: True
处理时间: 3.01秒
状态: error

--- 批量处理演示 ---


ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:处理客户咨询: 我想查看本月的账单详情...
ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:处理客户咨询: 你们的服务怎么样？...
ERROR:__main__:重试失败，已达到最大尝试次数: module 'signal' has no attribute 'SIGALRM'
ERROR:__main__:处理失败: module 'signal' has no attribute 'SIGALRM'
INFO:__main__:批量处理完成


批量处理了 3 个咨询
结果 1: system_error - 系统出现异常，请联系技术支持。...
结果 2: system_error - 系统出现异常，请联系技术支持。...
结果 3: system_error - 系统出现异常，请联系技术支持。...

--- 性能统计 ---
总请求数: 6
成功请求数: 0
失败请求数: 6
成功率: 0.0%
平均响应时间: 0.0秒


### 5. 配置和定制
运行时配置

```python
# 传递配置参数
config = {"temperature": 0.7, "max_tokens": 100}
result = model.invoke(input, config=config)

# 绑定默认配置
bound_model = model.bind(temperature=0.7)
```


标签和元数据

```python
# 添加标签用于追踪
tagged_chain = chain.with_tags(["production", "v1.0"])

# 添加元数据
chain_with_metadata = chain.with_metadata({"version": "1.0", "author": "team"})
```

6. 调试和监控
中间结果获取

```python
# 获取每一步的输出
def debug_step(step_output):
    print(f"步骤输出: {step_output}")
    return step_output

debug_chain = prompt | RunnableLambda(debug_step) | model
```

性能监控

```python
import time

def timing_wrapper(runnable):
    def timed_invoke(input):
        start = time.time()
        result = runnable.invoke(input)
        print(f"执行时间: {time.time() - start:.2f}秒")
        return result
    return RunnableLambda(timed_invoke)
```

7. 自定义 Runnable
基本实现

```python
from langchain_core.runnables import Runnable

class CustomProcessor(Runnable[str, Dict]):
    def invoke(self, input: str, config=None) -> Dict:
        # 自定义处理逻辑
        return {"processed": input.upper(), "length": len(input)}
    
    async def ainvoke(self, input: str, config=None) -> Dict:
        # 异步版本
        return self.invoke(input, config)
```


完整实现模板
```python
class AdvancedRunnable(Runnable[InputType, OutputType]):
    def __init__(self, param1, param2):
        self.param1 = param1
        self.param2 = param2
    
    def invoke(self, input: InputType, config=None) -> OutputType:
        # 同步执行逻辑
        pass
    
    async def ainvoke(self, input: InputType, config=None) -> OutputType:
        # 异步执行逻辑
        pass
    
    def batch(self, inputs: List[InputType], config=None) -> List[OutputType]:
        # 批量处理逻辑
        return [self.invoke(inp, config) for inp in inputs]
    
    def stream(self, input: InputType, config=None):
        # 流式处理逻辑
        yield from self._stream_generator(input)

```

### 最佳实践

```python
# 1. 明确类型定义
class MyChain(Runnable[Dict[str, str], Dict[str, Any]]):
    pass

# 2. 合理使用并行
parallel_processing = RunnableParallel({
    "fast_analysis": quick_model,
    "detailed_analysis": detailed_model
})

# 3. 添加错误处理
robust_chain = (
    preprocessing |
    model.with_fallbacks([backup_model]) |
    postprocessing
)

# 4. 使用配置管理
configurable_chain = model.configurable_fields(
    temperature=ConfigurableField(id="temp", name="Temperature")
)
```