# LCEL

LangChain 表达式语言（LangChain Expression Language，LCEL）是一种声明式的方法，可以轻松地将"链"组合在一起。

## 1.基本原理

### 1.1 函数

(程序语言中的)函数一般看作是对象(object)间的态射(morphsim)，即输入到输出的变换。例如：

$$f: y\to z\qquad h: x\to y$$

※ 简单起见，多个输入参数，可以视为一个元组(tuple)类型的输入参数。

将函数也看作一个对象（object），那么函数和函数也是可以进行运算的。复合函数

$$h(x)=(f\cdot g)(x)=f(g(x))$$

$$h = f\cdot g$$

其中的 "$\cdot$" 就是函数的结合（composition）运算。


Langchain就是把各种函数都封装成`Runnable[Input, Output]`，然后使用运算符`|`进行结合运算。

※ `Runnable[Input, Output]`是Python的泛型写法。它和Java的`Function<T, R>`是一个意思。

### 1.2 运算符重载

LCEL的管道(pipe)`|`运算是基于Python中运算符重载实现的。

- `__or__`：左结合`|`运算
- `__ror__`：右结合`|`运算

※ 对于数字来说，它们通常表示位运算。但对于一般的其它类型，位运算没有意义，所以它可以拿来用做其他用途了。就比如LangChain，把它们用于函数的复合运算。

In [1]:
# __or__运算
class A:
    def __init__(self, value):
        self.value = value

    def __or__(self, other):
        print("A's __or__ method is called")
        return self.value | other.value


class B:
    def __init__(self, value):
        self.value = value

    def __or__(self, other):
        print("B's __or__ method is called")
        return self.value | other.value


objA = A(2)
objB = B(3)
result = objA | objB  # 这个 | 是左结合的，等价于调用左操作数(objA)的__or__方法
result = objA.__or__(objB)

result = objB | objA  # 这个 | 是左结合的，等价于调用左操作数(objB)的__or__方法
result = objB.__or__(objA)


# __ror__运算
class C:
    def __init__(self, value):
        self.value = value


class D:
    def __init__(self, value):
        self.value = value

    def __ror__(self, other):
        print("D's __ror__ method is called")
        return self.value | other.value


objC = C(4)
objD = D(5)
result = objC | objD  # 这个 | 是右结合的，等价于调用右操作数(objD)的__ror__方法
result = objD.__ror__(objC)


# 如下，调用objA的__or__方法？还是调用objD的__ror__方法？
result = objA | objD  # 左结合的优先级高，因此调用objA的__or__方法
result = objA.__or__(objD)

A's __or__ method is called
A's __or__ method is called
B's __or__ method is called
B's __or__ method is called
D's __ror__ method is called
D's __ror__ method is called
A's __or__ method is called
A's __or__ method is called


那么，函数的复合运算就容易实现了。

In [2]:
from abc import ABC, abstractmethod
from typing import Any

from typing_extensions import Self


# "函数"的抽象类，用于描述任意函数
class Func(ABC):
    # 函数原型 （为简化问题，不考虑参数类型，所以不需要泛型）
    @abstractmethod
    def call(self, x: Any) -> Any:
        pass

    # 复合运算
    def __or__(self, other: Self) -> Self:
        return FuncSeq([self, other])

    def __ror__(self, other: Self) -> Self:
        return FuncSeq([other, self])


# 复合函数， 例如 f*g
class FuncSeq(Func):
    # 需要结合运算的函数们，例如，f，g
    functions: list[Func] = []

    def __init__(self, functions: list[Func]):
        self.functions = functions

    # 复合函数运算规则：复合函数，等价于逐层调用各个函数
    def call(self, x: Any) -> Any:
        result = x
        for f in self.functions:
            result = f.call(result)
        return result


class Add(Func):
    def __init__(self, value):
        self.value = value

    def call(self, x):
        return self.value + x


class Mult(Func):
    def __init__(self, value):
        self.value = value

    def call(self, x):
        return self.value * x


add = Add(6)
print(add.call(5))  # 5 + 6 = 11

mult = Mult(7)
print(mult.call(5))  # 5 * 7 = 35

chain = add | mult
print(chain.call(5))  # (5 + 6) * 7 = 77

chain = mult | add
print(chain.call(5))  # 5 * 7 + 6 = 41

11
35
77
41


## 2.基本的类

### 2.1 Runnable基类

抽象类

```
Runnable[Input, Output]
```

任意可执行部件的基类。通过重载`__or__`和`__ror__`运算符，实现了管道运算。

子类需要实现下面一些接口，增加实际也业务逻辑。

- **invoke/ainvoke**: 将一个输入变换成输入
- **batch/abatch**: 批量地将多个输入变换成输出
- **stream/astream**: 将一个输入变换成输入，以流的形式返回
- **astream_log**: 将一个输入变换成输出，将输出结果和指定的中间结果以流的形式返回

In [3]:
from abc import ABC, abstractmethod
from operator import itemgetter
from typing import Any

from langchain_core.runnables import Runnable, RunnablePassthrough
from langchain_core.runnables.config import RunnableConfig
from typing_extensions import override


# 例如，一个自定义的Runnable子类，实现了"加一"功能
class CustomRunnable(Runnable[int, int]):

    @override
    def invoke(self, x: int, config: RunnableConfig | None = None, **kwargs) -> int:
        return x + 1


runnable = CustomRunnable()
print(runnable.invoke(1))  # 1+1应该输出2

runnable1 = CustomRunnable()
runnable2 = CustomRunnable()

runnable3 = runnable1 | runnable2  # 管道运算， runnable1"加1"结果传给runnable2,再"加1"
print(runnable3.invoke(1))  # 1+1+1应该输出3

# 等价于下面的调用
print(runnable2.invoke(runnable1.invoke(1)))  # 应该输出3


# 特别地，callable 会自动转换成Runnable的
# 例如，一个自定义的Runnable子类，实现了"加一"功能
class AnotherRunnable(Runnable[dict, dict]):

    @override
    def invoke(self, x: dict, config: RunnableConfig | None = None, **kwargs) -> dict:
        return {"counter": x["counter"] + 1}


runnable = AnotherRunnable()
print(runnable.invoke({"counter": 1}))  # 1+1应该输出2


# 函数counter是callable类型的
def counter(x: int) -> dict:
    return {"counter": x}


runnable1 = counter | runnable
print(runnable1.invoke(5))  # 5+1应该输出6

# dict中的字段也会处理
runnable1 = {"counter": lambda x: x + 1} | runnable
print(runnable1.invoke(7))  # 7+1+1应该输出9

runnable2 = {"counter": CustomRunnable()} | runnable
print(runnable2.invoke(6))  # 6+1+1应该输出8

2
3
3
{'counter': 2}
{'counter': 6}
{'counter': 9}
{'counter': 8}


#### 2.1.1 RunnableSerializable 子类

抽象类

```
Runnable -> RunnableSerializable
```

增加了序列化相关的功能。



In [4]:
from langchain_core.runnables import RunnablePassthrough, RunnableSerializable
from langchain_core.runnables.config import RunnableConfig


class SquareCalculator(RunnableSerializable):
    number: float = 0.0

    def invoke(self, config: RunnableConfig | None = None, **kwargs) -> float:
        # Calculate the square of the input number
        return self.number**2


square = SquareCalculator(number=5)
print(square.invoke({}))
print(square.to_json())

# 例如，后面要讲的RunnablePassthrough就是一个RunnableSerializable的子类
runnable = RunnablePassthrough()
print(isinstance(runnable, RunnableSerializable))
print(runnable.to_json())

25.0
{'lc': 1, 'type': 'not_implemented', 'id': ['__main__', 'SquareCalculator'], 'repr': 'SquareCalculator(number=5.0)', 'name': 'SquareCalculator', 'graph': {'nodes': [{'id': 0, 'type': 'schema', 'data': 'SquareCalculatorInput'}, {'id': 1, 'type': 'runnable', 'data': {'id': ['__main__', 'SquareCalculator'], 'name': 'SquareCalculator'}}, {'id': 2, 'type': 'schema', 'data': 'SquareCalculatorOutput'}], 'edges': [{'source': 0, 'target': 1}, {'source': 1, 'target': 2}]}}
True
{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'runnable', 'RunnablePassthrough'], 'kwargs': {}, 'name': 'RunnablePassthrough', 'graph': {'nodes': [{'id': 0, 'type': 'schema', 'data': 'PassthroughInput'}, {'id': 1, 'type': 'runnable', 'data': {'id': ['langchain', 'schema', 'runnable', 'RunnablePassthrough'], 'name': 'RunnablePassthrough'}}, {'id': 2, 'type': 'schema', 'data': 'PassthroughOutput'}], 'edges': [{'source': 0, 'target': 1}, {'source': 1, 'target': 2}]}}


### 2.2 两种基础的Runnable

两种基本的Runnable对应了函数和生成器。

- `RunnableLambda`：函数
- `RunnableGenerator`：生成器

#### 2.2.1 RunnableLambda

```
Runnable -> RunnableLambda
```

它包装一个Python的`Callable`对象。


In [5]:
from langchain_core.runnables import RunnableLambda

runnable = RunnableLambda(lambda name: f"Hello! {name}")

runnable.invoke("Alice")

'Hello! Alice'

In [6]:
class Dummy:
    def __init__(self, x):
        self.x = x

    def hello_x(self):
        return f"Hello {self.x}"


runnable = RunnableLambda(Dummy.hello_x)
dummy = Dummy("Bob")

print(runnable.invoke(dummy))
print(isinstance(runnable, Runnable))

Hello Bob
True


#### 2.2.2 RunnableGenerator

```
Runnable -> RunnableGenerator
```

包装的生成器

In [7]:
from langchain_community.chat_models.fake import FakeListChatModel
from langchain_core.runnables import RunnableGenerator

chat = FakeListChatModel(responses=["Hello", "How are you?"])

print(chat.invoke("hello"))
print(chat.invoke("hello"))
print(list(chat.stream("hello")))


def add_chunk_langth_generator(iterator):
    """generator of a tuple of chank length and chank."""
    for ai_message in iterator:
        print("Received: ", ai_message)  # for debug
        yield (len(ai_message.content), ai_message)


runnable_generator = RunnableGenerator(add_chunk_langth_generator)
chat1 = chat | runnable_generator
print(list(chat1.stream("hello")))

# 调用invoke的话，那么整条链每个Runnable都调用invoke。
# 所以chat调用invoke，返回的AIMessage而不是AIMessageChunk了
print(chat1.invoke("hello"))
print(chat1.invoke("hello"))

content='Hello' id='run-23847f65-0c22-4ddf-92a7-90c94d20bd71-0'
content='How are you?' id='run-8343b5a0-1e65-4daa-9804-c9a8933e8b40-0'
[AIMessageChunk(content='H', id='run-ec129d5a-0c61-43b7-a9b3-3e28b0fdad1c'), AIMessageChunk(content='e', id='run-ec129d5a-0c61-43b7-a9b3-3e28b0fdad1c'), AIMessageChunk(content='l', id='run-ec129d5a-0c61-43b7-a9b3-3e28b0fdad1c'), AIMessageChunk(content='l', id='run-ec129d5a-0c61-43b7-a9b3-3e28b0fdad1c'), AIMessageChunk(content='o', id='run-ec129d5a-0c61-43b7-a9b3-3e28b0fdad1c')]
Received:  content='H' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content='o' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content='w' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content=' ' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content='a' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content='r' id='run-76013003-915f-426e-896d-2eb205b02caf'
Received:  content='e' id='run-76013003-915f-426e-896d-2eb205b02caf'
R

## 3.RunnableSerializable的子类

### 3.1 Runnable组合运算

Runnable的结合方法有两种：

*   串联：`RunnableSequence`
*   并联: `RunnableParallel`



#### 3.1.1 RunnableSequence

```
Runnable -> RunnableSequence
```

管道运算的结果，都是`RunnableSequence`类型。

In [8]:
from langchain_core.runnables import RunnableLambda, RunnableSequence

r1 = RunnableLambda(lambda x: x)
r2 = RunnableLambda(lambda x: (x, 1))
r3 = RunnableLambda(lambda x: [x, 10])

seq = r1 | r2 | r3

print(type(seq))
print(isinstance(seq, RunnableSequence))
print(isinstance(r1, RunnableSequence))

print(seq.invoke(-1))

<class 'langchain_core.runnables.base.RunnableSequence'>
True
False
[(-1, 1), 10]


#### 3.1.2 RunnableParallel

并行执行多条链。

In [9]:
from langchain_community.chat_models.fake import FakeListChatModel
from langchain_core.runnables import RunnableGenerator, RunnableParallel

chat1 = FakeListChatModel(
    responses=[
        "Hello, I am Alice.",
    ]
)

chat2 = FakeListChatModel(
    responses=[
        "Hello, I am Bob.",
    ]
)

print(chat1.invoke("hello"))
print(chat2.invoke("hello"))

# 并行地执行
runnable = RunnableParallel(alice=chat1, bob=chat2)
print(runnable.invoke("hello"))

content='Hello, I am Alice.' id='run-6ca22a55-7c3c-4caf-b0ee-629246f3363f-0'
content='Hello, I am Bob.' id='run-8d354469-3903-4151-9ee5-05c38b3e7f0b-0'
{'alice': AIMessage(content='Hello, I am Alice.', id='run-a769a166-09dc-4014-aef6-9a12f065104e-0'), 'bob': AIMessage(content='Hello, I am Bob.', id='run-f1affb7e-7c70-4612-8c72-717166d523ef-0')}


##### RunnableMap

In [10]:
from langchain_core.runnables import RunnableMap

runnable = RunnableMap(alice=chat1, bob=chat2)
print(runnable.invoke("hello"))

{'alice': AIMessage(content='Hello, I am Alice.', id='run-427c8e71-0166-4971-9e1b-52e5db037e20-0'), 'bob': AIMessage(content='Hello, I am Bob.', id='run-047b2b1f-d90c-4f52-b262-59db82fbcc11-0')}


### 3.2 流程控制





#### 3.2.1 RunnableBranch

多条件分支



1.   必须有2个以上分支
2.   必须有default分支。default分支位于最后



In [11]:
from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: x > 0, lambda x: f"{x} is positive!"),
    (lambda x: x < 0, lambda x: f"{x} is negative!"),
    lambda x: f"{x} is Zero!",
)

print(branch.invoke(1))
print(branch.invoke(-1))
print(branch.invoke(0))

1 is positive!
-1 is negative!
0 is Zero!


#### 3.2.2 RouterRunnable

多条件分支。


比`RunnableBranch`更简洁的写法，同时也限制的输入参数的格式。


输入参数必须是字典，有key和input两个键。`key`用来匹配条件，`input`则是匹配后，作为输出传递给相应的`Runnable`。

这个更接近常见编程语言中的`switch`语句。

In [12]:
from langchain_core.runnables.router import RouterRunnable

router = RouterRunnable(
    runnables={
        "x": RunnableLambda(lambda i: f"called x with input {i}"),
        "y": RunnableLambda(lambda i: f"called y with input {i}"),
    }
)

print(router.invoke({"key": "x", "input": "hello"}))
print(router.invoke({"key": "y", "input": "こんにちは"}))

called x with input hello
called y with input こんにちは


### 3.3 集合类型-字典

#### 3.3.1 RunnablePassthrough

任意输入都原文返回的`Runnable`。

In [13]:
from langchain_core.runnables import RunnablePassthrough

passthrough = RunnablePassthrough()

print(passthrough.invoke(1))
print(passthrough.invoke("hello"))

1
hello


#### 3.3.2 RunnableAssign

追加新的字段。如果字段存在，则会覆盖。

In [14]:
from langchain_core.runnables import RunnableAssign, RunnableLambda, RunnablePassthrough

# （非推荐）直接实例化一个RunnableAssign
assign = RunnableAssign({"x": RunnablePassthrough()})
print(assign.invoke({"human": 100}))

# 添加字段并不会修改原来的字典，而是会生成新的字典
dic = {"human": "hello"}
print(assign.invoke(dic))
print(dic)

dic = {"x": "hello"}
print(assign.invoke(dic))  # 新字典中，x: 'hello' 会被覆盖，但原dic不变
print(dic)

# RunnableAssign的输入参数必须是字典，否则是无法添加字段的
try:
    print(assign.invoke(2))
except AssertionError as e:
    print(e)

# 常见写法是在Runnable的基础上调用assign方法
## 先创建一个Runnable
runnable = RunnableLambda(lambda x: {"input": f"Input >> {x}"})
print(runnable.invoke("hello"))
## 调用assign方法生成RunnableAssign
runnable1 = runnable.assign(output=lambda dic: f'Output >> {dic["input"]}')
print(runnable1.invoke("hello"))

{'human': 100, 'x': {'human': 100}}
{'human': 'hello', 'x': {'human': 'hello'}}
{'human': 'hello'}
{'x': {'x': 'hello'}}
{'x': 'hello'}
The input to RunnablePassthrough.assign() must be a dict.
{'input': 'Input >> hello'}
{'input': 'Input >> hello', 'output': 'Output >> Input >> hello'}


#### 3.3.3 RunnablePick

字典中取出字段。

In [15]:
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.passthrough import RunnablePick

# 取出来部分字段，构成新的字典
runnable1 = RunnablePick(["human"])
print(runnable1.invoke({"human": "hi", "extra": "dummy"}))  # {'human': 'hi'}
print(runnable1.invoke({"no": "hi", "extra": "dummy"}))  # 不存在则返回None

# 取出某个字段的值
runnable2 = RunnablePick("human")
print(runnable2.invoke({"human": "hi", "extra": "dummy"}))  # hi
print(runnable2.invoke({"no": "hi", "extra": "dummy"}))  # 不存在则返回None

# 注意，输入参数也必须是字典才行，否则是没法从字典里面取值的。
try:
    print(runnable2.invoke(2))
except AssertionError as e:
    print(e)


def modify(dic):
    dic["test"] = 6
    return dic


runnable3 = RunnableLambda(modify)
print(runnable3.invoke({"human": "hi", "extra": "dummy"}))
runnable4 = runnable3.pick(["human"])
print(runnable4.invoke({"human": "hi", "extra": "dummy"}))
runnable5 = runnable3.pick(["test"])
print(runnable5.invoke({"human": "hi", "extra": "dummy"}))

{'human': 'hi'}
None
hi
None
The input to RunnablePassthrough.assign() must be a dict.
{'human': 'hi', 'extra': 'dummy', 'test': 6}
{'human': 'hi'}
{'test': 6}


### 3.4 集合类型-列表

#### 3.4.1 RunnableEachBase

##### RunnableEach

处理列表

In [16]:
from langchain_core.runnables.base import RunnableEach

each = RunnableEach(bound=chat)
each.invoke(["hello", "good morning"])

[AIMessage(content='Hello', id='run-7378a92b-c734-4265-b524-3fd66d3131ab-0'),
 AIMessage(content='How are you?', id='run-3668a7d8-88e4-4cb4-bee7-10a0b51a0306-0')]

### 3.5 异常处理

#### 3.5.1 RunnableWithFallbacks

异常处理

In [17]:
from langchain_core.runnables import RunnableLambda


def raise_exception(*args):
    raise Exception("Dummy")


runnable = RunnableLambda(raise_exception).with_fallbacks(
    [RunnableLambda(lambda x: "fallback with:" + str(x))]
)
print(
    runnable.invoke("hi")
)  # 注意，默认传递给fallback的值，不是Exception，而是原来Runnable的输入值

# 但是，可以使用exception_key来捕获异常，传递给fallback。注意，此时输入值必须为字典，否则exception_key添加到哪里呢？
runnable = RunnableLambda(raise_exception).with_fallbacks(
    [RunnableLambda(lambda x: "fallback with:" + str(x))], exception_key="err"
)
print(runnable.invoke({"question": "hi"}))

# 还可以控制捕获的异常类型
try:
    runnable = RunnableLambda(raise_exception).with_fallbacks(
        [RunnableLambda(lambda x: "fallback with:" + str(x))],
        exception_key="err",
        exceptions_to_handle=(OSError,),
    )
    print(runnable.invoke({"question": "hi"}))
except Exception as e:
    print(str(e) + " 没有被fallback捕获")


runnable_2nd_fallback = RunnableLambda(raise_exception).with_fallbacks(
    [
        RunnableLambda(raise_exception),
        RunnableLambda(lambda x: "fallback2"),
        RunnableLambda(raise_exception),
    ]
)
print(runnable_2nd_fallback.invoke("hello"))


# 注意：如果fallback无法处理异常，即fallback中仍然抛出异常，
# 那么最终抛出的异常是原Runnable的，而不是fallback的
def raise_1(*args):
    raise Exception("1")


def raise_2(*args):
    raise Exception("2")


def raise_3(*args):
    raise Exception("3")


runnable_all_error = RunnableLambda(raise_1).with_fallbacks(
    [RunnableLambda(raise_2), RunnableLambda(raise_3)]
)
try:
    runnable_all_error.invoke("hello")
except Exception as e:
    print(e)  # 不是2，不是3，而是1

fallback with:hi
fallback with:{'question': 'hi', 'err': Exception('Dummy')}
Dummy 没有被fallback捕获
fallback2
1


fallback只能抛出原来的runnable的异常，如果想在fallback中重新抛出新的异常怎么办？ 用Python的`try-except-finally`和`RunnableLambda`实现。

In [18]:
from langchain_core.runnables import RunnableLambda


def raise_1(n):
    if n < 7:
        raise Exception(f"{n} < 7")
    return n


chain1 = RunnableLambda(raise_1)


def raise_2(n):
    if n < 5:
        raise Exception(f"{n} < 5")
    return n


chain2 = RunnableLambda(raise_2)


def raise_3(n):
    if n < 3:
        raise Exception(f"{n} < 3")
    return n


chain3 = RunnableLambda(raise_3)


def full(n):
    try:
        return chain1.invoke(n)
    except Exception as e1:
        try:
            return chain2.invoke(n)
        except Exception as e2:
            if n < 2:
                try:
                    return chain3.invoke(n)
                except Exception as e3:
                    raise Exception("from chain3: " + str(e3))
            raise Exception("from chain2: " + str(e2))
        raise Exception("from chain1: " + str(e1))


fullchain = RunnableLambda(full)

try:
    print(fullchain.invoke(1))
except Exception as e:
    print(e)  # 此处是raise_3的异常时，重新raise的异常

try:
    print(fullchain.invoke(2))
except Exception as e:
    print(e)  # 此处是raise_2的异常时，重新raise的异常

from chain3: 1 < 3
from chain2: 2 < 5


### 3.6 动态链

#### 3.6.1 DynamicRunnable

##### RunnableConfigurableAlternatives

为链添加可供设置的参数

In [19]:
from langchain_core.runnables import ConfigurableField, RunnableLambda
from langchain_core.runnables.configurable import RunnableConfigurableAlternatives

chain1 = FakeListChatModel(
    responses=[
        "你好",
    ]
)
chain2 = FakeListChatModel(
    responses=[
        "hello",
    ]
)
chain3 = FakeListChatModel(
    responses=[
        "こんにちは",
    ]
)
# Add a new config "mode" with key in ['default', 'en', 'jp']
chain_configured = chain1.configurable_alternatives(
    ConfigurableField(id="mode"), default_key="default", en=chain2, jp=chain3
)

isinstance(chain_configured, RunnableConfigurableAlternatives)

# switch model demo
chain_configured.invoke("hello")


print(chain_configured.with_config(configurable={"mode": "default"}).invoke(""))

print(chain_configured.with_config(configurable={"mode": "en"}).invoke(""))

print(chain_configured.with_config(configurable={"mode": "jp"}).invoke(""))

content='你好' id='run-d3078158-63a2-4988-844e-b7b918a65a4f-0'
content='hello' id='run-685dae3e-ba5b-4ba6-9f9a-308721efeab0-0'
content='こんにちは' id='run-2bb03494-0766-4705-a633-e5badf9b91d9-0'


##### RunnableConfigurableFields

In [20]:
from langchain_core.runnables import ConfigurableField, RunnableLambda
from langchain_core.runnables.configurable import RunnableConfigurableAlternatives

chat = FakeListChatModel(
    responses=[
        "你好",
    ]
)

print(chat.__fields__.keys())

chat.configurable_fields(responses=ConfigurableField(id="responses")).with_config(
    responses=["xxxxxx"]
).invoke("")

dict_keys(['name', 'cache', 'verbose', 'callbacks', 'tags', 'metadata', 'custom_get_token_ids', 'callback_manager', 'responses', 'sleep', 'i'])


AIMessage(content='你好', id='run-553210c2-d851-4ddb-b8e7-aa6237d82f1f-0')

### 3.7 其它高级用法

#### 3.7.1 RunnableBindingBase

以委托的方式，通过`kwargs`为既存的`Runnable`添加功能。

In [21]:
from typing import Any, Type, get_args

from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.base import RunnableBindingBase


# 首先，使用RunnableLambda创建一个底层的Runnbale。
def plus3(*args: Any, **kwargs: Any) -> int:
    return 3 + kwargs.get("n", 0)


runnable = RunnableLambda(plus3)


# 然后，这是一个可以向Runnable的kwargs中添加 n 的Runnable。
class CustomRunnableBinding(RunnableBindingBase):

    def set_custom_number(self, n):
        self.kwargs.update(n=n)


# 创建一个Runnable实例
runnable1 = CustomRunnableBinding(bound=runnable)
## 设定kwargs
runnable1.set_custom_number(7)
## 在RunnableBindingBase的invoke逻辑中，kwargs被传递给invoke了
print(runnable1.invoke({}))  # 因此，相当于 runnable.invoke({}, n = 7)

print(runnable.invoke({}, n=7))

10
10


#### 3.7.2 RunnableBinding

与`RunnableBindingBase`相比，提供了更简单的设定方法：直接通过构造函数指定`kwargs`即可。

In [22]:
from typing import Any, Type, get_args

from langchain_core.runnables import RunnableBinding, RunnableLambda


def plus3(*args: Any, **kwargs: Any) -> int:
    return 3 + kwargs.get("n", 0)


runnable = RunnableLambda(plus3)
print(runnable.invoke({}, n=4))  # 7

# （非推荐）直接实例化
runnable1 = RunnableBinding(bound=runnable, kwargs={"n": 4})  # 直接指定kwargs

print(runnable1.invoke({}))  # 7

# 推荐用法: Runnable基类的bind方法，其实就是调用RunnableBinding构造函数

## kwargs
runnable2 = runnable.bind(n=4)
print(type(runnable2))
print(runnable2.invoke({}))  # 7

# 除了kwargs以外，还有config, callback, types等可以设定
## config

## callback
runnable4 = runnable.with_listeners(
    on_start=lambda _: print("start"), on_end=lambda _: print("end")
)
print(type(runnable4))
print(runnable4.invoke({}))  # 3

## with_types
runnable5 = runnable.with_types(input_type=int)
print(type(runnable5))
print(runnable5.invoke({}))  # 3

echo = RunnableLambda(lambda x: "Input is" + x).with_types(output_type=str)

test = echo | runnable5

print(test.invoke("7"))  # 3

7
7
<class 'langchain_core.runnables.base.RunnableBinding'>
7
<class 'langchain_core.runnables.base.RunnableBinding'>
start
end
3
<class 'langchain_core.runnables.base.RunnableBinding'>
3
3


#### 3.7.3 RunnableWithMessageHistory

将`Runnable`给包装一层，添加历史记录管理功能。


首先，需要一个会话管理的函数，通过`get_session_history`可以根据会话ID取出该会话的历史记录管理部品（`BaseChatMessageHistory`，可以通过它在会话中查找消息，或向会话中添加消息）。相当于一个历史记录管理部品的工厂。


历史记录管理部品有以下接口：

- add_messages/aadd_messages: 向历史记录中批量添加消息
- messages/aget_messages: 取出所有历史记录
- clear/aclear: 清空历史记录


其次，被包装的`Runnable`应该满足以下条件：

*   输入
    * BaseMessage列表
    * 字典，其中指定key为所有消息的BaseMessage列表
    * 字典，其中一个key为最新的一条消息的字符串/BaseMessage/BaseMessage列表，另一个key为历史记录消息BaseMessage列表
       * 最新一条消息为字符串的话，自动被当作HumanMessage
*   输出
    * 作为AIMessage内容的字符串
    * 一条BaseMessage或 BaseMessage列表
    * 字典，其中指定key为一条BaseMessage或 BaseMessage列表


另外，与输出输出相关的三个比较重要的参数：
- input_messages_key：输入为字典时，必须指定该参数，取出消息或消息列表
- output_messages_key：输出为字典时，必须指定该参数，取出消息或消息列表
- history_messages_key：输入为字典，且历史记录使用单独的key管理时，必须指定该参数


主要逻辑就是:

```python
# 取出历史记录
history_chain: Runnable = RunnableLambda(
    self._enter_history, self._aenter_history
).with_config(run_name="load_history")
# 如果runnable需要的输入是字典的话，稍微调整一下格式
messages_key = history_messages_key or input_messages_key
if messages_key:
    history_chain = RunnablePassthrough.assign(
        **{messages_key: history_chain}
    ).with_config(run_name="insert_history")

# 历史记录传递给runnable执行，并且执行完时，触发_exit_history方法，更新保存历史记录
bound = (
    history_chain | runnable.with_listeners(on_end=self._exit_history)
).with_config(run_name="RunnableWithMessageHistory")
```

In [23]:
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.language_models.fake_chat_models import ParrotFakeChatModel
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory

# 会话存储位置
global_store = {}


# 根据会话ID，创建或返回既有的历史记录管理部品
def get_session_history(session_id):
    if session_id not in global_store:
        # 历史记录管理使用InMemoryChatMessageHistory
        global_store[session_id] = InMemoryChatMessageHistory()
    return global_store[session_id]


prompt = ChatPromptTemplate.from_messages(
    [MessagesPlaceholder(variable_name="history"), ("human", "{input}")]
)
# 模拟一个LLM，单纯将用户的输入原文返回的
chat = ParrotFakeChatModel()

# add a memory to a chain with prompt and chat
chat_with_memory: Runnable = RunnableWithMessageHistory(
    prompt | chat,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

res = chat_with_memory.invoke(
    {"input": "echo 1"}, config={"configurable": {"session_id": "abc123"}}
)
print(res)

res = chat_with_memory.invoke(
    {"input": "add 1 to the previous number"},
    config={"configurable": {"session_id": "abc123"}},
)
print(res)

res = chat_with_memory.invoke(
    {"input": "add 1 to the previous number"},
    config={"configurable": {"session_id": "abc123"}},
)
print(res)

res = chat_with_memory.invoke(
    {"input": "add 1 to the previous number"},
    config={"configurable": {"session_id": "abc123"}},
)
print(res)

print(global_store)

Parent run 185c2f68-3061-45cb-b7a3-0b3f68f71c35 not found for run 2254beb4-56d9-435c-a3b0-4befa0a959ff. Treating as a root run.
Parent run 258cdf03-9cbf-4054-abfa-cb5d119785db not found for run 60e98539-02a2-4bf3-a754-b6804045ef88. Treating as a root run.
Parent run b699011a-37d5-4a19-b1ef-aaeecb0c1b40 not found for run 166e68a3-d8fc-442a-a75f-892de913bcaa. Treating as a root run.


content='echo 1' id='run-0ff4fba1-ccdb-426f-a6c1-9f89d2c2f13e-0'
content='add 1 to the previous number' id='run-63477712-68fc-4b59-92e3-cd79447fd3ab-0'
content='add 1 to the previous number' id='run-e9066462-1d86-40ae-b3cc-e22d67884fc8-0'


Parent run 014ec37a-9146-422c-ab01-b47cca11f2d8 not found for run 770a6508-3a6b-446b-9949-5b653a382d7c. Treating as a root run.


content='add 1 to the previous number' id='run-71189361-0243-4242-8f15-65b4b18aa567-0'
{'abc123': InMemoryChatMessageHistory(messages=[HumanMessage(content='echo 1'), HumanMessage(content='echo 1', id='run-0ff4fba1-ccdb-426f-a6c1-9f89d2c2f13e-0'), HumanMessage(content='add 1 to the previous number'), HumanMessage(content='add 1 to the previous number', id='run-63477712-68fc-4b59-92e3-cd79447fd3ab-0'), HumanMessage(content='add 1 to the previous number'), HumanMessage(content='add 1 to the previous number', id='run-e9066462-1d86-40ae-b3cc-e22d67884fc8-0'), HumanMessage(content='add 1 to the previous number'), HumanMessage(content='add 1 to the previous number', id='run-71189361-0243-4242-8f15-65b4b18aa567-0')])}


###### RunnableRetry

失败时重试。可以设定失败次数，当哪些异常出现时进行重试等等。

In [24]:
class SuccessAfter2:
    def __init__(self):
        self.c = 0

    def __call__(self, *args, **kwargs):
        print("called")
        if self.c < 2:
            self.c += 1
            raise ValueError(f"self.c = {self.c} < 2")
        return "success"


from langchain_core.runnables.retry import RunnableRetry

# Without Retry
runnable = RunnableLambda(SuccessAfter2())

try:
    runnable.invoke(None)
except ValueError as e:
    print(e)
    try:
        runnable.invoke(None)
    except ValueError as e:
        print(e)
        res = runnable.invoke(None)
        print(res)


# With retry
runnable_with_retry = RunnableRetry(
    bound=RunnableLambda(SuccessAfter2()),
    max_attempt_number=3,  # How many times to retry ?
    retry_exception_types=(ValueError,),  # Which exceptions will be retried?
)
print(runnable_with_retry.invoke(None))

# 更常见的写法
runnable = RunnableLambda(SuccessAfter2())
runnable_with_retry = runnable.with_retry(
    stop_after_attempt=3, retry_if_exception_type=(ValueError,)
)
print(runnable_with_retry.invoke(None))

called
self.c = 1 < 2
called
self.c = 2 < 2
called
success
called
called
called
success
called
called
called
success


### 3.8 其它Langchain概念

基本结构就是“构造输入数据-->LLM模型-->解析输出结果”

`Prompot | Model | OutputParser`

特别的，`Retriever`也借用了`Runnable`的概念。

#### 3.8.1 BaseLanguageModel

继承自`RunnableSerializable`。大语言模型的基类。

In [25]:
from typing import List, Optional, Sequence

from langchain_core.callbacks import Callbacks
from langchain_core.language_models import BaseLanguageModel
from langchain_core.messages import BaseMessage
from langchain_core.outputs import Generation, LLMResult
from langchain_core.prompt_values import PromptValue


class MyLLM(BaseLanguageModel):

    class Config:
        arbitrary_types_allowed = True

    mock_output: str

    def __init__(self, mock_output: str):
        super(MyLLM, self).__init__(mock_output=mock_output)

    async def agenerate_prompt(self, *args, **kwargs) -> LLMResult:
        return LLMResult(generations=[[Generation(text=self.mock_output)]])

    def generate_prompt(self, *args, **kwargs) -> LLMResult:
        return LLMResult(generations=[[Generation(text=self.mock_output)]])

    async def apredict(self, *args, **kwargs) -> str:
        return self.mock_output

    def predict(self, *args, **kwargs) -> str:
        return self.mock_output

    async def apredict_messages(self, *args, **kwargs) -> BaseMessage:
        return BaseMessage(self.mock_output)

    def predict_messages(self, *args, **kwargs) -> BaseMessage:
        return BaseMessage(self.mock_output)

    def invoke(self, *args, **kwargs) -> str:
        return self.mock_output


llm = MyLLM(mock_output="输出")

print(llm.predict("测试"))

输出


`BaseLanguageModel`有两个子类，对应两种模式：LLM 和 聊天模式。

- `BaseLLM`：LLM 指的是纯文本 I/O 的模型，其包装的 API 将字符串提示作为输入，并输出字符串。
- `BaseChatModel`：聊天模型通常由 LLM 支持，但专门针对对话进行了调整，其 API 采用聊天消息列表作为输入，而不是单个字符串。通常，这些消息都标有角色（例如，“System”，“AI”，“Human”）。聊天模型会返回一条 AI 聊天消息作为输出。

##### BaseChatModel

##### BaseLLM

#### 3.8.2 BasePromptTemplate

继承自` RunnableSerializable`。Prompt模板的基类。

模板是一个构建prompt的可执行模块。它的输入是变量，输出是替换后得到的prompt。

##### BaseChatPromptTemplate

#### 3.8.3 BaseOutputParser

继承自`BaseLLMOutputParser`和`RunnableSerializable`。结果解析器的基类。

- `BaseLLMOutputParser`定义了解析函数的接口
- `RunnableSerializable`保证了可以链式串接的能力

In [26]:
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.runnables import RunnableLambda


class CommaSeparatedListOutputParser(BaseOutputParser):
    def parse(self, text: str):
        return text.strip().split(", ")


result = CommaSeparatedListOutputParser().parse("A, B, C")
print(result)

chain = RunnableLambda(lambda x: x) | CommaSeparatedListOutputParser()

print(chain.invoke("1, 2, 3, 4, 5, 6"))

['A', 'B', 'C']
['1', '2', '3', '4', '5', '6']


In [27]:
from langchain_core.exceptions import OutputParserException
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.runnables import RunnablePassthrough


class BooleanOutputParser(BaseOutputParser[bool]):
    true_val: str = "YES"
    false_val: str = "NO"

    def parse(self, text: str) -> bool:
        cleaned_text = text.strip().upper()
        if cleaned_text not in (self.true_val.upper(), self.false_val.upper()):
            raise OutputParserException(
                f"BooleanOutputParser expected output value to either be "
                f"{self.true_val} or {self.false_val} (case-insensitive). "
                f"Received {cleaned_text}."
            )
        return cleaned_text == self.true_val.upper()

    @property
    def _type(self) -> str:
        return "boolean_output_parser"


chain = RunnablePassthrough() | BooleanOutputParser()

print(chain.invoke("NO"))
print(chain.invoke("YES"))
print(chain.invoke("Yes"))
print(chain.invoke("No"))

False
True
True
False


#### 3.8.4 BaseRetriever

继承自`RunnableSerializable`。检索器的基类。

检索器借用了`Runnable`的概念。它是一个特殊的`Runnable`，输入为字符串，输出为`Document`列表。

In [28]:
from typing import List

from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever


class SimpleRetriever(BaseRetriever):
    docs: List[Document]
    k: int = 5

    def _get_relevant_documents(self, query: str) -> List[Document]:
        """Return the first k documents from the list of documents"""
        return self.docs[: self.k]

    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """(Optional) async native implementation."""
        return self.docs[: self.k]


retriever = SimpleRetriever(
    docs=[
        Document(page_content="1.Introduction"),
        Document(page_content="2.Basic"),
        Document(page_content="3.Design"),
        Document(page_content="4.Implementation"),
        Document(page_content="5.Test"),
        Document(page_content="6.Conclusion"),
    ]
)

documents = retriever.invoke("任意问题")
for doc in documents:
    print(doc)

page_content='1.Introduction'
page_content='2.Basic'
page_content='3.Design'
page_content='4.Implementation'
page_content='5.Test'
