为什么使用 LCEL 的原因：


1. 统一接口:每个LCEL对象实现Runnable接口，该接口定义了一组通用的调用方法(invoke、batch、stream、ainvoke等)。这使得LCEL对象链也可以自动支持这些调用。也就是说，每个LCEL对象链本身都是一个LCEL对象。

2. 组合原语:LCEL提供了许多原语，可以轻松地组合链、并行化组件、添加回退、动态配置内部链等等。


In [3]:
import os
import sys

current_dir = os.path.dirname(os.path.realpath('__file__'))
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
sys.path.append(parent_dir)

from apikey import apikey, langchain_apikey

os.environ['OPENAI_API_KEY'] = apikey
os.environ['LANGCHAIN_API_KEY'] = langchain_apikey
os.environ["LANGCHAIN_TRACING_V2"] = "true"


## Invoke

In [2]:
# without LCEL

from typing import List

import openai


prompt_template = ""
client = openai.OpenAI()

def call_chat_model(messages: List[dict]) -> str:
    response = client.chat.completions.create(
        model = "gpt-3.5-turbo",
        messages = messages,
    )
    return response.choices[0].message.content

def invoke_chain(topic: str) -> str:
    prompt_value = prompt_template.format(topic=topic)
    messages = [{"role": "user", "content": prompt_value}]
    return call_chat_model(messages)

invoke_chain("ice cream")




In [4]:
# with LCEL

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

prompt = ChatPromptTemplate.from_template(
    "Tell me a short joke about {topic}"
)
output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-3.5-turbo")
chain = (
    {"topic": RunnablePassthrough()}
    | prompt
    | model
    | output_parser
)

chain.invoke("ice cream")

# cc：对于chain 中第一个 {"topic": RunnablePassthrough} 要怎么理解？
# RunnablePassthrough的 用法是什么？官方文档：https://python.langchain.com/docs/expression_language/how_to/passthrough


'Why did the ice cream go to therapy? Because it had too many sprinkles of anxiety!'

In [9]:
chain

{
  topic: RunnablePassthrough()
}
| ChatPromptTemplate(input_variables=['topic'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], template='Tell me a short joke about {topic}'))])
| ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x11b9dd610>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x11bac2d50>, openai_api_key=SecretStr('**********'), openai_proxy='')
| StrOutputParser()

## Stream

In [None]:
# without LCEL

from typing import Iterator

def stream_chat_model(messages: List[dict]) -> Iterator[str]:
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
        stream=True,
    )
    for response in stream:
        content = response.choices[0].delta.content
        if content is not None:
            yield content

def stream_chain(topic: str) -> Iterator[str]:
    prompt_value = prompt.format(topic=topic)
    return stream_chat_model([{"role":"user", "content":prompt_value}])

for chunk in stream_chain("ice cream"):
    print(chunk, end="", flush=True)

    

In [10]:
# with LCEL

for chunk in chain.stream("ice cream"):
    print(chunk, end="", flush=True)

# cc：有了chain 就有了”一切“

Why did the ice cream truck break down? It couldn't handle the rocky road!

## Batch

In [None]:
# without LCEL

from concurrent.futures import ThreadPoolExecutor

def batch_chain(topics: list) -> list:
    with ThreadPoolExecutor(max_workers=5) as executor:
        return list(executor.map(invoke_chain, topics))
    
batch_chain(["ice cream", "spaghetti", "dumplings"])



In [None]:
# with LECL

chain.batch(["ice cream", "spaghetti", "dumplings"])

# cc：有个chain 就有了”一切“

## Async

In [None]:
# without LCEL

async_client = openai.AsyncOpenAI()

async def acall_chat_model(messages: List[dict]) -> str:
    response = await async_client.chat.completions.create(
        model="",
        messages=messages,
    )
    return response.choices[0].message.content

async def ainvoke_chain(topic: str) -> str:
    prompt_value = prompt_template.format(topic=topic)
    messages = [{"role": "user", "content":prompt_value}]
    return await acall_chat_model(messages)

await ainvoke_chain("ice cream")





In [None]:
# with LCEL

await chain.ainvoke("ice cream")

# cc：有了chain 就有了”一切“


## Async Batch

In [None]:
# without LCEL

import openai
import asyncio

async def abatch_chain(topics: list) -> list:
    coros = map(ainvoke_chain, topics)
    return await asyncio.lgather(*coros)

await abatch_chain(["ice cream", "spaghetti", "dumplings"])

In [None]:
# with LCEL
await chain.abatch(["ice cream", "spaghetti", "dumplings"])


## LLM instead of chat model

In [None]:
# without LCEL

def call_llm(prompt_value: str) -> str:
    response = client.completions.create(
        model="gpt-3.5-turbo-instruct",
        prompt=prompt_value,
    )
    return response.choices[0].text

def invoke_llm_chain(topic: str) -> str:
    prompt_value = prompt_template.format(topic=topic)
    return call_llm(prompt_value)

invoke_llm_chain["ice cream"]



In [None]:
from langchain_openai import OpenAI

llm = OpenAI()
llm_chain = (
    {"topic": RunnablePassthrough()}
    | prompt
    | llm
    | output_parser
)

llm_chain.invoke("ice cream")

# cc：chain 中“组成”的替换也很方便；

## Different model provider

In [1]:
# without LCEL
import anthropic

anthropic_template = f"Human:\n\n{prompt_template}\n\nAssistant:"
anthropic_client = anthropic.Anthropic()

def call_anthropic(prompt_value: str) -> str:
    response = anthropic_client.completion.create(
        model="claude-2",
        prompt=prompt_value,
        max_token_to_sample=256,
    )
    return response.completion

def invoke_anthropic_chain(topic: str) -> str:
    prompt_value = anthropic_template.format(topic=topic)
    return call_anthropic(prompt_value)

invoke_anthropic_chain("ice cream")



In [6]:
# with LCEL

from langchain_anthropic import ChatAnthropic

anthropic = ChatAnthropic(model="claude-2")

anthropic_chain = (
    {"topic": RunnablePassthrough()}
    | prompt
    | anthropic
    | output_parser
)

anthropic_chain.invoke("ice cream")

## Runtime configurability

如果我们想让聊天模型或 LLM 在运行时可配置:



In [None]:
# without LCEL
def invoke_configurable_chain(topic: str, *, model: str = "chat_openai") -> str:
    if model == "chat_openai":
        return invoke_chain(topic)
    elif model == "openai":
        return invoke_llm_chain(topic)
    elif model == "anthropic":
        return invoke_anthropic_chain(topic)
    else:
        raise ValueError(
            f"Received invalid model '{model}'."
            " Expected one of chat_openai, openai, anthropic"
        )
    
def stream_configurable_chain(topic: str, *, model: str = "chat_openai") -> Iterator[str]:
    if model == "chat_openai":
        return stream_chain(topic)
    elif model == "openai":
        # Note we haven't implemented this yet.
        return stream_llm_chain(topic)
    elif model == "anthropic":
        # Note we haven't implemented this yet
        return stream_anthropic_chain(topic)
    else:
        raise ValueError(
            f"Received invalid model '{model}'."
            " Expected one of chat_openai, openai, anthropic"
        )

def batch_configurable_chain(topics: List[str], *, model: str = "chat_openai") -> List[str]:
    # You get the idea
    ...


async def abatch_configurable_chain(topics: List[str], *, model: str = "chat_openai") -> List[str]:
    ...


invoke_configurable_chain("ice cream", model="openai")
stream = stream_configurable_chain(
    "ice_cream", 
    model="anthropic"
)
for chunk in stream:
    print(chunk, end="", flush=True)



In [None]:
# with LCEL

from langchain_core.runnables import ConfigurableField

configurable_model = model.configurable_alternatives(
    ConfigurableField(id="model"),
    default_key="chat_openai",
    openai=llm,
    anthropic=anthropic,
)

configurable_chain = (
    {"topic": RunnablePassthrough()}
    | prompt
    | configurable_model
    | output_parser
)

configurable_chain.invoke(
    "ice cream",
    config={"model": "openai"}
)

stream = configurable_chain.stream(
    "ice cream", 
    config={"model": "anthropic"},
)

for chunk in stream:
    print(chunk, end="", flush=True)


configurable_chain.batch(["ice cream", "spaghetti", "dumplings"])


## logging

In [None]:
# withour LCEL

def invoke_anthropic_chain_with_logging(topic: str) -> str:
    print(f"Input: {topic}")
    prompt_value = anthropic_template.format(topic=topic)
    print(f"Formatted prompt: {prompt_value}")
    output = call_anthropic(prompt_value)
    print(f"Output: {output}")
    return output

invoke_anthropic_chain_with_logging("ice cream")



In [None]:
# with LCEL

import os

os.environ["LANGCHAIN_API_KEY"] = "..."
os.environ["LANGCHAIN_TRACING_V2"] = "true"

anthropic_chain.invoke("ice cream")

# cc：langsmith 真的好用

## Fallbacks

如果我们想添加回退逻辑，以防一个模型 API 出现故障


In [None]:
# without LCEL
def invoke_chain_with_fallback(topic: str) -> str:
    try:
        return invoke_chain(topic)
    except Exception:
        return invoke_anthropic_chain(topic)

async def ainvoke_chain_with_fallback(topic: str) -> str:
    try:
        return await ainvoke_chain(topic)
    except Exception:
        # Note: we haven't actually implemented this.
        return await ainvoke_anthropic_chain(topic)

async def batch_chain_with_fallback(topics: List[str]) -> str:
    try:
        return batch_chain(topics)
    except Exception:
        # Note: we haven't actually implemented this.
        return batch_anthropic_chain(topics)

invoke_chain_with_fallback("ice cream")
# await ainvoke_chain_with_fallback("ice cream")
batch_chain_with_fallback(["ice cream", "spaghetti", "dumplings"])

In [None]:
# with ICEL
fallback_chain = chain.with_fallbacks([anthropic_chain])


fallback_chain.invoke("ice cream")
# await fallback_chain.ainvoke("ice cream")
fallback_chain.batch(["ice cream", "spaghetti", "dumplings"])


## Full code comparison

这对比真的是很明显了~

In [7]:
# without ICEL
from concurrent.futures import ThreadPoolExecutor
from typing import Iterator, List, Tuple

import anthropic
import openai

# 模板
prompt_template = "Tell me a short joke about {topic}"
anthropic_template = f"Human:\n\n{prompt_template}\n\nAssistant:"
# llm
client = openai.OpenAI()
async_client = openai.AsyncOpenAI()
anthropic_client = anthropic.Anthropic()

# openai 的chat 通用方法
def call_chat_model(messages: List[dict]) -> str:
    response = client.chat.completions.create(
        model="gpt-3.5-turbo", 
        messages=messages,
    )
    return response.choices[0].message.content

# 得到openai chat 的回复
def invoke_chain(topic: str) -> str:
    print(f"Input: {topic}")
    prompt_value = prompt_template.format(topic=topic)
    print(f"Formatted prompt: {prompt_value}")
    messages = [{"role": "user", "content": prompt_value}]
    output = call_chat_model(messages)
    print(f"Output: {output}")
    return output

# openai的流式回复 chat 通用方法
def stream_chat_model(messages: List[dict]) -> Iterator[str]:
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=messages,
        stream=True,
    )
    for response in stream:
        content = response.choices[0].delta.content
        if content is not None:
            yield content

# 得到openai chat 的流式回复
def stream_chain(topic: str) -> Iterator[str]:
    print(f"Input: {topic}")
    prompt_value = prompt.format(topic=topic)
    print(f"Formatted prompt: {prompt_value}")
    stream = stream_chat_model([{"role": "user", "content": prompt_value}])
    for chunk in stream:
        print(f"Token: {chunk}", end="")
        yield chunk

# openai请求的并行处理
def batch_chain(topics: list) -> list:
    with ThreadPoolExecutor(max_workers=5) as executor:
        return list(executor.map(invoke_chain, topics))

# openai 模型替换（非chat接口）
def call_llm(prompt_value: str) -> str:
    response = client.completions.create(
        model="gpt-3.5-turbo-instruct",
        prompt=prompt_value,
    )
    return response.choices[0].text

# 得到openai llm 的回复
def invoke_llm_chain(topic: str) -> str:
    print(f"Input: {topic}")
    prompt_value = prompt_template.format(topic=topic)
    print(f"Formatted prompt: {prompt_value}")
    output = call_llm(prompt_value)
    print(f"Output: {output}")
    return output

# claude 模型调用
def call_anthropic(prompt_value: str) -> str:
    response = anthropic_client.completions.create(
        model="claude-2",
        prompt=prompt_value,
        max_tokens_to_sample=256,
    )
    return response.completion   

# 得到 claude 返回
def invoke_anthropic_chain(topic: str) -> str:
    print(f"Input: {topic}")
    prompt_value = anthropic_template.format(topic=topic)
    print(f"Formatted prompt: {prompt_value}")
    output = call_anthropic(prompt_value)
    print(f"Output: {output}")
    return output

# 一些异步/并行请求，同openai
async def ainvoke_anthropic_chain(topic: str) -> str:
    ...

def stream_anthropic_chain(topic: str) -> Iterator[str]:
    ...

def batch_anthropic_chain(topics: List[str]) -> List[str]:
    ...


# 配置化不同模型返回
def invoke_configurable_chain(
    topic: str, 
    *, 
    model: str = "chat_openai"
) -> str:
    if model == "chat_openai":
        return invoke_chain(topic)
    elif model == "openai":
        return invoke_llm_chain(topic)
    elif model == "anthropic":
        return invoke_anthropic_chain(topic)
    else:
        raise ValueError(
            f"Received invalid model '{model}'."
            " Expected one of chat_openai, openai, anthropic"
        )

# 配置不同模型的流式返回
def stream_configurable_chain(
    topic: str, 
    *, 
    model: str = "chat_openai"
) -> Iterator[str]:
    if model == "chat_openai":
        return stream_chain(topic)
    elif model == "openai":
        # Note we haven't implemented this yet.
        return stream_llm_chain(topic)
    elif model == "anthropic":
        # Note we haven't implemented this yet
        return stream_anthropic_chain(topic)
    else:
        raise ValueError(
            f"Received invalid model '{model}'."
            " Expected one of chat_openai, openai, anthropic"
        )

def batch_configurable_chain(
    topics: List[str], 
    *, 
    model: str = "chat_openai"
) -> List[str]:
    ...

async def abatch_configurable_chain(
    topics: List[str], 
    *, 
    model: str = "chat_openai"
) -> List[str]:
    ...

# 回退逻辑
def invoke_chain_with_fallback(topic: str) -> str:
    try:
        return invoke_chain(topic)
    except Exception:
        return invoke_anthropic_chain(topic)

async def ainvoke_chain_with_fallback(topic: str) -> str:
    try:
        return await ainvoke_chain(topic)
    except Exception:
        return await ainvoke_anthropic_chain(topic)

async def batch_chain_with_fallback(topics: List[str]) -> str:
    try:
        return batch_chain(topics)
    except Exception:
        return batch_anthropic_chain(topics)


In [None]:
# with LCEL

import os

# llm
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAI

# langchain 核心模块
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 环境变量配置，主要for langsmith 进行日志跟踪
os.environ["LANGCHAIN_API_KEY"] = "..."
os.environ["LANGCHAIN_TRACING_V2"] = "true"

prompt = ChatPromptTemplate.from_template(
    "Tell me a short joke about {topic}"
)

chat_openai = ChatOpenAI(model="gpt-3.5-turbo")
openai = OpenAI(model="gpt-3.5-turbo-instruct")
anthropic = ChatAnthropic(model="claude-2")
model = (
    chat_openai.with_fallbacks([anthropic]).configurable_alternatives(
        ConfigurableField(id="model"),
        default_key="chat_openai",
        openai=openai,
        anthropic=anthropic,
    )
)

chain = (
    {"topic": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

