# 5. LangChain Expression Language（LCEL）徹底解説


In [None]:
import os
from google.colab import userdata

os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = userdata.get("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "agent-book"

In [None]:
!pip install langchain-core==0.3.0 langchain-openai==0.2.0 langchain-community==0.3.0 pydantic==2.10.6

## 5.1. Runnable と RunnableSequence―LCEL の最も基本的な構成要素


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ユーザーが入力した料理のレシピを考えてください。"),
        ("human", "{dish}"),
    ]
)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
prompt_value = prompt.invoke({"dish": "カレー"})
ai_message = model.invoke(prompt_value)
output = output_parser.invoke(ai_message)

print(output)

In [None]:
chain = prompt | model | output_parser

In [None]:
output = chain.invoke({"dish": "カレー"})
print(output)

### Runnable の実行方法―invoke・stream・batch


In [None]:
chain = prompt | model | output_parser

for chunk in chain.stream({"dish": "カレー"}):
    print(chunk, end="", flush=True)

In [None]:
chain = prompt | model | output_parser

outputs = chain.batch([{"dish": "カレー"}, {"dish": "うどん"}])
print(outputs)

### LCEL の「|」で様々な Runnable を連鎖させる


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
cot_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ユーザーの質問にステップバイステップで回答してください。"),
        ("human", "{question}"),
    ]
)

cot_chain = cot_prompt | model | output_parser

In [None]:
summarize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "ステップバイステップで考えた回答から結論だけ抽出してください。"),
        ("human", "{text}"),
    ]
)

summarize_chain = summarize_prompt | model | output_parser

In [None]:
cot_summarize_chain = cot_chain | summarize_chain
output = cot_summarize_chain.invoke({"question": "10 + 2 * 3"})
print(output)

## 5.2. RunnableLambda―任意の関数を Runnable にする


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        ("human", "{input}"),
    ]
)

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

output_parser = StrOutputParser()

In [None]:
from langchain_core.runnables import RunnableLambda


def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | RunnableLambda(upper)

ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### chain デコレーターを使った RunnableLambda の実装


In [None]:
from langchain_core.runnables import chain


@chain
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | upper

ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### RunnableLambda への自動変換


In [None]:
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | output_parser | upper

In [None]:
ai_message = chain.invoke({"input": "Hello!"})
print(ai_message)

### Runnable の入力の型と出力の型に注意


In [None]:
def upper(text: str) -> str:
    return text.upper()


chain = prompt | model | upper

# 以下のコードを実行するとエラーになります
# output = chain.invoke({"input": "Hello!"})

In [None]:
chain = prompt | model | StrOutputParser() | upper

In [None]:
output = chain.invoke({"input": "Hello!"})
print(output)

### （コラム）独自の関数を stream に対応させたい場合


In [None]:
from typing import Iterator


def upper(input_stream: Iterator[str]) -> Iterator[str]:
    for text in input_stream:
        yield text.upper()


chain = prompt | model | StrOutputParser() | upper

for chunk in chain.stream({"input": "Hello!"}):
    print(chunk, end="", flush=True)

## 5.3. RunnableParallel―複数の Runnable を並列で処理する


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
output_parser = StrOutputParser()

In [None]:
optimistic_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "あなたは楽観主義者です。ユーザーの入力に対して楽観的な意見をください。"),
        ("human", "{topic}"),
    ]
)
optimistic_chain = optimistic_prompt | model | output_parser

In [None]:
pessimistic_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "あなたは悲観主義者です。ユーザーの入力に対して悲観的な意見をください。"),
        ("human", "{topic}"),
    ]
)
pessimistic_chain = pessimistic_prompt | model | output_parser

In [None]:
import pprint
from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
    }
)

output = parallel_chain.invoke({"topic": "生成AIの進化について"})
pprint.pprint(output)

### RunnableParallel の出力を Runnable の入力に連結する


In [None]:
synthesize_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "あなたは客観的AIです。2つの意見をまとめてください。"),
        ("human", "楽観的意見: {optimistic_opinion}\n悲観的意見: {pessimistic_opinion}"),
    ]
)

In [None]:
synthesize_chain = (
    RunnableParallel(
        {
            "optimistic_opinion": optimistic_chain,
            "pessimistic_opinion": pessimistic_chain,
        }
    )
    | synthesize_prompt
    | model
    | output_parser
)

output = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(output)

### RunnableParallel への自動変換


In [None]:
synthesize_chain = (
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
    }
    | synthesize_prompt
    | model
    | output_parser
)

In [None]:
output = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(output)

### RunnableLambda との組み合わせ―itemgetter を使う例


In [None]:
from operator import itemgetter

topic_getter = itemgetter("topic")
topic = topic_getter({"topic": "生成AIの進化について"})
print(topic)

In [None]:
from operator import itemgetter

synthesize_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは客観的AIです。{topic}について2つの意見をまとめてください。",
        ),
        (
            "human",
            "楽観的意見: {optimistic_opinion}\n悲観的意見: {pessimistic_opinion}",
        ),
    ]
)

synthesize_chain = (
    {
        "optimistic_opinion": optimistic_chain,
        "pessimistic_opinion": pessimistic_chain,
        "topic": itemgetter("topic"),
    }
    | synthesize_prompt
    | model
    | output_parser
)

output = synthesize_chain.invoke({"topic": "生成AIの進化について"})
print(output)

## 5.4. RunnablePassthrough―入力をそのまま出力する


In [None]:
import os
from google.colab import userdata

os.environ["TAVILY_API_KEY"] = userdata.get("TAVILY_API_KEY")

In [None]:
!pip install tavily-python==0.5.0

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template('''\
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
''')

model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

In [None]:
from langchain_community.retrievers import TavilySearchAPIRetriever

retriever = TavilySearchAPIRetriever(k=3)

In [None]:
from langchain_core.runnables import RunnablePassthrough

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

output = chain.invoke("東京の今日の天気は？")
print(output)

### assign―RunnableParallel に値を追加する


In [None]:
import pprint

chain = {
    "question": RunnablePassthrough(),
    "context": retriever,
} | RunnablePassthrough.assign(answer=prompt | model | StrOutputParser())

output = chain.invoke("東京の今日の天気は？")
pprint.pprint(output)

In [None]:
from langchain_core.runnables import RunnableParallel

chain = RunnableParallel(
    {
        "question": RunnablePassthrough(),
        "context": retriever,
    }
).assign(answer=prompt | model | StrOutputParser())

In [None]:
output = chain.invoke("東京の今日の天気は？")
print(output)

#### ＜補足：pick ＞


In [None]:
chain = (
    RunnableParallel(
        {
            "question": RunnablePassthrough(),
            "context": retriever,
        }
    )
    .assign(answer=prompt | model | StrOutputParser())
    .pick(["context", "answer"])
)

In [None]:
output = chain.invoke("東京の今日の天気は？")
print(output)

### （コラム）astream_events


In [None]:
# Google Colabでは次のコードの「async」の箇所に「Use of "async" not allowed outside of async function」と表示されますが、エラーなく実行できます

In [None]:
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

async for event in chain.astream_events("東京の今日の天気は？", version="v2"):
    print(event, flush=True)

In [None]:
async for event in chain.astream_events("東京の今日の天気は？", version="v2"):
    event_kind = event["event"]

    if event_kind == "on_retriever_end":
        print("=== 検索結果 ===")
        documents = event["data"]["output"]
        for document in documents:
            print(document)

    elif event_kind == "on_parser_start":
        print("=== 最終出力 ===")

    elif event_kind == "on_parser_stream":
        chunk = event["data"]["chunk"]
        print(chunk, end="", flush=True)

### （コラム）Chat history と Memory


In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", "{input}"),
    ]
)

chain = prompt | model | StrOutputParser()

In [None]:
from langchain_community.chat_message_histories import SQLChatMessageHistory


def respond(session_id: str, human_message: str) -> str:
    chat_message_history = SQLChatMessageHistory(
        session_id=session_id, connection="sqlite:///sqlite.db"
    )

    ai_message = chain.invoke(
        {
            "chat_history": chat_message_history.get_messages(),
            "input": human_message,
        }
    )

    chat_message_history.add_user_message(human_message)
    chat_message_history.add_ai_message(ai_message)

    return ai_message

In [None]:
from uuid import uuid4

session_id = uuid4().hex

output1 = respond(
    session_id=session_id,
    human_message="こんにちは！私はジョンと言います！",
)
print(output1)

output2 = respond(
    session_id=session_id,
    human_message="私の名前が分かりますか？",
)
print(output2)