Here's an extensive summary in markdown format, drawing on the provided sources and our conversation history, with some bolding to highlight key concepts:

**LangChain Expression Language (LCEL) Cheatsheet**

*   **Purpose**: This document serves as a quick reference for the most important primitives in the LangChain Expression Language (LCEL). It is intended to provide a concise overview, and refers to the more comprehensive how-to guides and the full API reference for more advanced use cases.

*   **Key Concepts**:
    *   **Runnables**: The core building blocks of LCEL. Runnables can be any function, a chain of functions, or other LangChain components. They can be invoked, batched, streamed, composed, and configured in various ways.
    *   **Composability**: LCEL allows you to compose runnables in a declarative way, chaining them together using the pipe operator `|` or by using `RunnableParallel` for parallel execution.

*   **Basic Runnable Operations**:
    *   **`Runnable.invoke()` / `Runnable.ainvoke()`**: Executes a runnable with a given input. `invoke()` is synchronous, while `ainvoke()` is asynchronous.
    *   **`Runnable.batch()` / `Runnable.abatch()`**: Executes a runnable on a list of inputs, processing them in batches. `batch()` is synchronous, while `abatch()` is asynchronous.
    *   **`Runnable.stream()` / `Runnable.astream()`**: Streams the output of a runnable in chunks. `stream()` is synchronous, while `astream()` is asynchronous.

*   **Composing Runnables**:
    *   **Pipe Operator `|`**: Chains runnables together sequentially, passing the output of one runnable as the input to the next.
    *   **`RunnableParallel`**: Executes multiple runnables in parallel, combining their outputs into a dictionary.

*   **Creating Runnables**:
    *   **`RunnableLambda`**: Turns any Python function into a runnable, allowing it to be used as part of an LCEL chain.

*   **Manipulating Inputs and Outputs**:
    *   **`RunnablePassthrough.assign()`**: Merges input and output dictionaries, adding new fields to the input based on the results of a runnable.
    *   **`RunnablePassthrough`**: Includes the input dictionary in the output dictionary.
    *   **`Runnable.pick()`**: Returns a subset of the output dictionary, allowing you to select specific fields.

*   **Configuring Runnables**:
    *   **`Runnable.bind()`**: Adds default invocation arguments to a runnable.
    *   **`Runnable.with_fallbacks()`**: Adds fallback runnables that are executed if the primary runnable fails.
    *   **`Runnable.with_retry()`**: Adds retry logic to a runnable, allowing it to be executed multiple times if it fails.
    *   **`RunnableConfig`**: Used to configure the execution of a runnable, such as setting `max_concurrency` for parallel execution.
    *   **`Runnable.with_config()`**: Adds default configuration to a runnable.
    *   **`Runnable.with_configurable_fields()`**: Makes specific attributes of a runnable configurable.
    *   **`Runnable.with_configurable_alternatives()`**: Makes chain components configurable, allowing you to switch between different runnables dynamically.

*   **Advanced Features**:
    *   **Dynamic Chain Building**:  Runnables can be created dynamically based on the input.
    *   **`Runnable.astream_events()`**: Generates a stream of events, providing detailed information about the execution of a runnable, including the start and end of each step, as well as intermediate outputs.
    *  **`Runnable.batch_as_completed()` / `Runnable.abatch_as_completed()`**: Yields batched outputs as they are completed, rather than waiting for all batches to finish.
    *   **`Runnable.map()`**:  Declaratively creates a batched version of a runnable, executing it on each element of a list.
    *   **`Runnable.get_graph()`**:  Generates a graph representation of a runnable, which can be useful for visualizing the structure of complex chains.
    *   **`Runnable.get_prompts()`**:  Gets all prompts in a chain, useful for inspecting or modifying prompts in the chain.
    *   **`Runnable.with_listeners()`**: Adds lifecycle listeners to a runnable, allowing you to monitor the start and end of execution.

*   **Key Takeaways**
    *  LCEL provides a flexible and powerful way to construct chains of operations.
    *  Runnables can be composed, configured, and extended to handle a wide variety of use cases.
    *  LCEL supports synchronous and asynchronous execution modes, along with streaming of results.
    *  LCEL enables declarative construction of chains of operations and dynamic behavior based on input.

This summary provides an overview of the LangChain Expression Language, drawing on the information in the sources and our conversation history. It covers the key concepts, methods and options for constructing runnables, configuring them, and using advanced features.


In [1]:
from langchain_core.runnables import RunnableLambda

runnable = RunnableLambda(lambda x: str(x))
runnable.invoke(5)

# Async variant:
# await runnable.ainvoke(5)

'5'

In [2]:
from langchain_core.runnables import RunnableLambda

runnable = RunnableLambda(lambda x: str(x))
runnable.batch([7, 8, 9])

# Async variant:
# await runnable.abatch([7, 8, 9])

['7', '8', '9']

In [3]:
from langchain_core.runnables import RunnableLambda


def func(x):
    for y in x:
        yield str(y)


runnable = RunnableLambda(func)

for chunk in runnable.stream(range(5)):
    print(chunk)

# Async variant:
# async for chunk in await runnable.astream(range(5)):
#     print(chunk)

0
1
2
3
4


In [4]:
from langchain_core.runnables import RunnableLambda

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)

chain = runnable1 | runnable2

chain.invoke(2)

[{'foo': 2}, {'foo': 2}]

In [5]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)

chain = RunnableParallel(first=runnable1, second=runnable2)

chain.invoke(2)

{'first': {'foo': 2}, 'second': [2, 2]}

In [6]:
chain = RunnableParallel(f=runnable1, s=runnable2)

chain.invoke(2)

{'f': {'foo': 2}, 's': [2, 2]}

In [10]:
from langchain_core.runnables import RunnableLambda


def func(x):
    return x + 5


runnable = RunnableLambda(func)
runnable.invoke(2)

7

In [11]:
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

runnable1 = RunnableLambda(lambda x: x["foo"] + 7)

chain = RunnablePassthrough.assign(bar=runnable1)

chain.invoke({"foo": 10})

{'foo': 10, 'bar': 17}

In [12]:
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

runnable1 = RunnableLambda(lambda x: x["foo"] + 7)

chain = RunnableParallel(bar=runnable1, baz=RunnablePassthrough())

chain.invoke({"foo": 10})

{'bar': 17, 'baz': {'foo': 10}}

In [13]:
from typing import Optional

from langchain_core.runnables import RunnableLambda


def func(main_arg: dict, other_arg: Optional[str] = None) -> dict:
    if other_arg:
        return {**main_arg, **{"foo": other_arg}}
    return main_arg


runnable1 = RunnableLambda(func)
bound_runnable1 = runnable1.bind(other_arg="bye")

bound_runnable1.invoke({"bar": "hello"})

{'bar': 'hello', 'foo': 'bye'}

In [14]:
from langchain_core.runnables import RunnableLambda

runnable1 = RunnableLambda(lambda x: x + "foo")
runnable2 = RunnableLambda(lambda x: str(x) + "foo")

chain = runnable1.with_fallbacks([runnable2])

chain.invoke(5)

'5foo'

In [15]:
from langchain_core.runnables import RunnableLambda

counter = -1


def func(x):
    global counter
    counter += 1
    print(f"attempt with {counter=}")
    return x / counter


chain = RunnableLambda(func).with_retry(stop_after_attempt=2)

chain.invoke(2)

attempt with counter=0
attempt with counter=1


2.0

In [17]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))

chain = RunnableParallel(first=runnable1, second=runnable2, third=runnable3)

chain.invoke(7, config={"max_concurrency": 2})

{'first': {'foo': 7}, 'second': [7, 7], 'third': '7'}

In [18]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))

chain = RunnableParallel(first=runnable1, second=runnable2, third=runnable3)
configured_chain = chain.with_config(max_concurrency=2)

chain.invoke(7)

{'first': {'foo': 7}, 'second': [7, 7], 'third': '7'}

In [19]:
from typing import Any, Optional

from langchain_core.runnables import (
    ConfigurableField,
    RunnableConfig,
    RunnableSerializable,
)


class FooRunnable(RunnableSerializable[dict, dict]):
    output_key: str

    def invoke(
        self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> list:
        return self._call_with_config(self.subtract_seven, input, config, **kwargs)

    def subtract_seven(self, input: dict) -> dict:
        return {self.output_key: input["foo"] - 7}


runnable1 = FooRunnable(output_key="bar")
configurable_runnable1 = runnable1.configurable_fields(
    output_key=ConfigurableField(id="output_key")
)

configurable_runnable1.invoke(
    {"foo": 10}, config={"configurable": {"output_key": "not bar"}}
)

{'not bar': 3}

In [1]:
from typing import Any, Optional

from langchain_core.runnables import (
    ConfigurableField,
    RunnableConfig,
    RunnableSerializable,
)


class FooRunnable(RunnableSerializable[dict, dict]):
    output_key: str

    def invoke(
        self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> list:
        return self._call_with_config(self.subtract_seven, input, config, **kwargs)

    def subtract_seven(self, input: dict) -> dict:
        return {self.output_key: input["foo"] - 7}


runnable1 = FooRunnable(output_key="bar")
configurable_runnable1 = runnable1.configurable_fields(
    output_key=ConfigurableField(id="output_key")
)

configurable_runnable1.invoke(
    {"foo": 10}, config={"configurable": {"output_key": "not bar"}}
)

{'not bar': 3}

In [2]:
from typing import Any, Optional

from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnableParallel


class ListRunnable(RunnableSerializable[Any, list]):
    def invoke(
        self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> list:
        return self._call_with_config(self.listify, input, config, **kwargs)

    def listify(self, input: Any) -> list:
        return [input]


class StrRunnable(RunnableSerializable[Any, str]):
    def invoke(
        self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> list:
        return self._call_with_config(self.strify, input, config, **kwargs)

    def strify(self, input: Any) -> str:
        return str(input)


runnable1 = RunnableLambda(lambda x: {"foo": x})

configurable_runnable = ListRunnable().configurable_alternatives(
    ConfigurableField(id="second_step"), default_key="list", string=StrRunnable()
)
chain = runnable1 | configurable_runnable

chain.invoke(7, config={"configurable": {"second_step": "string"}})

"{'foo': 7}"

In [3]:
chain.invoke(7, config={"configurable": {"second_step": "list"}})

[{'foo': 7}]

In [4]:
chain.invoke(7)


[{'foo': 7}]

In [5]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)

chain = RunnableLambda(lambda x: runnable1 if x > 6 else runnable2)

chain.invoke(7)

{'foo': 7}

In [6]:
chain.invoke(5)



[5, 5]

In [7]:
import nest_asyncio

nest_asyncio.apply()

In [8]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x}, name="first")


async def func(x):
    for _ in range(5):
        yield x


runnable2 = RunnableLambda(func, name="second")

chain = runnable1 | runnable2

async for event in chain.astream_events("bar", version="v2"):
    print(f"event={event['event']} | name={event['name']} | data={event['data']}")

event=on_chain_start | name=RunnableSequence | data={'input': 'bar'}
event=on_chain_start | name=first | data={}
event=on_chain_stream | name=first | data={'chunk': {'foo': 'bar'}}
event=on_chain_start | name=second | data={}
event=on_chain_end | name=first | data={'output': {'foo': 'bar'}, 'input': 'bar'}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=Ru

In [9]:
import time

from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: time.sleep(x) or print(f"slept {x}"))

for idx, result in runnable1.batch_as_completed([5, 1]):
    print(idx, result)

# Async variant:
# async for idx, result in runnable1.abatch_as_completed([5, 1]):
#     print(idx, result)

slept 1
1 None
slept 5
0 None


In [10]:
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

runnable1 = RunnableLambda(lambda x: x["baz"] + 5)
chain = RunnablePassthrough.assign(foo=runnable1).pick(["foo", "bar"])

chain.invoke({"bar": "hi", "baz": 2})

{'foo': 7, 'bar': 'hi'}

In [12]:
from langchain_core.runnables import RunnableLambda

runnable1 = RunnableLambda(lambda x: list(range(x)))
runnable2 = RunnableLambda(lambda x: x + 5)

chain = runnable1 | runnable2.map()

chain.invoke(3)

[5, 6, 7]

In [14]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))

chain = runnable1 | RunnableParallel(second=runnable2, third=runnable3)

chain.get_graph().print_ascii()

        +-------------+          
        | LambdaInput |          
        +-------------+          
                *                
                *                
                *                
           +--------+            
           | Lambda |            
           +--------+            
                *                
                *                
                *                
+-----------------------------+  
| Parallel<second,third>Input |  
+-----------------------------+  
           *         *           
         **           **         
        *               *        
 +--------+          +--------+  
 | Lambda |          | Lambda |  
 +--------+          +--------+  
           *         *           
            **     **            
              *   *              
+------------------------------+ 
| Parallel<second,third>Output | 
+------------------------------+ 


In [15]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda

prompt1 = ChatPromptTemplate.from_messages(
    [("system", "good ai"), ("human", "{input}")]
)
prompt2 = ChatPromptTemplate.from_messages(
    [
        ("system", "really good ai"),
        ("human", "{input}"),
        ("ai", "{ai_output}"),
        ("human", "{input2}"),
    ]
)
fake_llm = RunnableLambda(lambda prompt: "i am good ai")
chain = prompt1.assign(ai_output=fake_llm) | prompt2 | fake_llm

for i, prompt in enumerate(chain.get_prompts()):
    print(f"**prompt {i=}**\n")
    print(prompt.pretty_repr())
    print("\n" * 3)

**prompt i=0**


good ai


{input}




**prompt i=1**


really good ai


{input}


{ai_output}


{input2}






In [16]:
import time

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run


def on_start(run_obj: Run):
    print("start_time:", run_obj.start_time)


def on_end(run_obj: Run):
    print("end_time:", run_obj.end_time)


runnable1 = RunnableLambda(lambda x: time.sleep(x))
chain = runnable1.with_listeners(on_start=on_start, on_end=on_end)
chain.invoke(2)

start_time: 2024-12-25 05:33:23.314988+00:00
end_time: 2024-12-25 05:33:25.315518+00:00
