In [None]:
!pip install -qU langchain langchain_openai

## Setting Up the Environment

Before running the examples, you need to install LangChain. You can install it with the following command:

```bash
pip install langchain
```

## Runnable Interface
The Runnable interface in Langchain is a standard interface that many Langchain components adhere to, including chat models, LLMs, output parsers, and more.

### Key Features of the Runnable Protocol
- **Standard Interface**: The Runnable protocol provides a consistent way to interact with different components.
- **Core Methods**:
LangChain supports several methods for running chains, including `invoke`, `stream`, `batch`, and their asynchronous counterparts.

| Method    | Description                                                                 |
|-----------|-----------------------------------------------------------------------------|
| `invoke`  | Calls the runnable on a single input, providing immediate results.             |
| `stream`  | Streams back chunks of the response, allowing for real-time data processing.|
| `batch`   | Processes a list of inputs in one go, optimizing performance.               |
| `ainvoke` | Invokes the runnable on an input asynchronously.                               |
| `astream` | Streams back chunks of the response asynchronously.                        |
| `abatch`  | Handles a list of inputs asynchronously.                                    |



## **Input and output types**
Every `Runnable` is characterized by an input and output type. These input and output types can be any Python object, and are defined by the Runnable itself.

Runnable methods that result in the execution of the Runnable (e.g., `invoke`, `batch`, `stream`, `astream_events`) work with these input and output types:

- `invoke`: Accepts an input and returns an output.
- `batch`: Accepts a list of inputs and returns a list of outputs.
- `stream`: Accepts an input and returns a generator that yields outputs.

The input type and output type vary by component:

| Component     | Input Type                                      | Output Type             |
|---------------|-------------------------------------------------|-------------------------|
| Prompt        | dictionary                                      | PromptValue             |
| ChatModel     | a string, list of chat messages, or a PromptValue | ChatMessage             |
| LLM           | a string, list of chat messages, or a PromptValue | String                  |
| OutputParser  | the output of an LLM or ChatModel                | Depends on the parser   |
| Retriever     | a string                                         | List of Documents       |
| Tool          | a string or dictionary, depending on the tool    | Depends on the tool     |


## Practical Examples

### 1. Invoking a Runnable

Let's create a simple example where we use a RunnableLambda to invoke a function.

```python
from langchain_core.runnables import RunnableLambda

# Define a simple function
runnable = RunnableLambda(lambda x: f'Hello {x}')

# Invoke the function
result = runnable.invoke('World')
print(result)  # Expected output: 'Hello World'
```


### Explanation:
- **Post-Processing**: The `post_process` function removes unwanted code block delimiters (e.g., ```python, ```persian), as well as Persian and English-specific tags.
- **RunnableLambda**: We use `RunnableLambda` to turn the `post_process` function into a runnable that we can invoke with the LLM's raw output.
- **Output**: The unwanted code block delimiters are removed, and the result is cleaner and more user-friendly.



In [None]:
from langchain_core.runnables import RunnableLambda

# Define the post-processing function to clean up code snippets
def post_process(text):
    return text.replace('```python', '')\
        .replace('```text', '')\
        .replace('```', '')\
        .replace('```', '')  # Removing extra code block markers

# Define a RunnableLambda with the post-process function
runnable = RunnableLambda(lambda x: post_process(x))

# Simulated LLM response with unwanted formatting
llm_response = """
```python
def greet(name):
    print(f"Hello, {name}!")
    ```"""


runnable.invoke(llm_response)

'\n\ndef greet(name):\n    print(f"Hello, {name}!")\n    '

## Synchronous vs Asynchronous Programming

### Synchronous Programming:
In synchronous programming, tasks are executed one after the other, in a sequence. This means that the program waits for each task to finish before moving on to the next one. If a task takes time (such as an API call or reading from a file), the entire program halts until that task is completed.

**Example:**



In [None]:
import time
def task1():
    print("Task 1 started")
    # Simulate a time-consuming task
    time.sleep(2)
    print("Task 1 finished")

def task2():
    print("Task 2 started")
    time.sleep(1)
    print("Task 2 finished")

# Synchronous execution
task1()
task2()

Task 1 started
Task 1 finished
Task 2 started
Task 2 finished


## Asynchronous Programming:


In asynchronous programming, tasks are executed concurrently, meaning the program can start a task and move on to others without waiting for the first one to finish. When a task is done, it notifies the program. This allows for better performance, especially when dealing with time-consuming operations.

## Why Asynchronous Programming is Important

Asynchronous programming allows you to execute multiple operations without blocking the main thread. This is particularly useful when dealing with long-running tasks, such as API calls to language models. By leveraging async capabilities, developers can manage multiple tasks concurrently, allowing for smoother user experiences and reduced wait times.

### Benefits of Asynchronous Programming:

- **Improved Performance**: Enables parallel execution of tasks, improving overall application performance.
- **Responsive User Interfaces**: Keeps the UI interactive during time-consuming operations.
- **Scalability**: Handles more users and requests without significant delays.
- **Better Resource Utilization**: Async chains can help in making better use of system resources, especially in applications that require high throughput.



**Example**:

In [None]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def task1():
    print("Task 1 started")
    await asyncio.sleep(2)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(1)
    print("Task 2 finished")

# Asynchronous execution
async def main():
    await asyncio.gather(task1(), task2())

await main()

Task 1 started
Task 2 started
Task 2 finished
Task 1 finished


## **How to call a runnable asynchronously?**



In [None]:
# await some_runnable.ainvoke(some_input)

from langchain_core.runnables import RunnableLambda

# Define the post-processing function to clean up code snippets
def post_process(text):
    return text.replace('```python', '')\
        .replace('```text', '')\
        .replace('```', '')\
        .replace('```', '')  # Removing extra code block markers

# Define a RunnableLambda with the post-process function
runnable = RunnableLambda(lambda x: post_process(x))

# Simulated LLM response with unwanted formatting
llm_response = """
```python
def greet(name):
    print(f"Hello, {name}!")
    ```"""


await runnable.ainvoke(llm_response)

'\n\ndef greet(name):\n    print(f"Hello, {name}!")\n    '

## **When to Use Batch Processing and Its Benefits**

### Overview

Batch processing is ideal for jobs that do not require immediate responses. you can process a group of requests asynchronously, offering several benefits over synchronous API calls. This method is particularly useful for use cases that involve large datasets or require high throughput, but can afford a delay in processing.

### Use Cases for Batch Processing
Batch processing is often used in scenarios like:

- **Running evaluations**: Processing multiple models or tasks that require evaluation.
- **Classifying large datasets**: Handling large volumes of data that need to be categorized or labeled.
- **Embedding content repositories**: Creating embeddings for large collections of documents or data.

### When to Use the Batch API

- **Non-Urgent Tasks**: When your job doesn't need an immediate response but you want to process a large amount of data efficiently.
- **High Throughput**: When you need to handle large volumes of requests but are limited by rate limits or API costs in a synchronous setup.
- **Cost-Effective Solutions**: When working with large datasets or evaluations, batch processing helps save significantly on API costs.




You can also process multiple inputs at once using `Runnable.batch()`.

```python

# Define a simple batch function
runnable = RunnableLambda(lambda x: x * 2)

# Use batch processing
results = runnable.batch([1, 2, 3, 4])
print(results)  # Expected output: [2, 4, 6, 8]
```

In [None]:
runnable = RunnableLambda(lambda x: x * 2)

# Use batch processing
results = runnable.batch([1, 2, 3, 4])
print(results)  # Expected output: [2, 4, 6, 8]

[2, 4, 6, 8]


## Stream a Runnable

`Runnable.stream()` and `Runnable.astream()` are useful for cases where you want to process and retrieve data incrementally or in chunks, without waiting for the entire operation to complete. This is particularly beneficial when you're working with large datasets, long-running tasks, or making repeated calls to external services like LLMs.

### Use Case for Streaming

Streaming is particularly meaningful when you're working with LLMs (Language Models) and you need to process the results as they are generated, especially for long texts or when multiple API calls are required. Instead of waiting for the entire response to be generated, you can stream the output incrementally, which allows for real-time processing and more efficient use of system resources.

For example, consider a scenario where you need to generate a long text from an LLM that involves multiple steps or calls. Instead of waiting for the entire output to be returned, you can process the output piece by piece as the model generates it.

### Code Example: Streaming with an LLM Request

Here’s how you might use `Runnable.stream()` to process the output of an LLM incrementally:



In [None]:
from langchain_core.runnables import RunnableLambda

# A function that simulates a long-running LLM call and streams the response
def generate_text_from_llm(prompt):
    # Simulate streaming the response by yielding parts of the output
    parts = [
        "Once upon a time, ",
        "there was a brave knight who ventured into the forest. ",
        "Along the way, he encountered many challenges, "
        "but he always remained determined. "
    ]

    for part in parts:
        yield part  # Yield parts of the text incrementally

# Create a RunnableLambda that wraps our function
runnable = RunnableLambda(generate_text_from_llm)

# Simulate generating a long text using the LLM
prompt = "Tell me a story about a brave knight."

# Stream and print each part of the text incrementally
for chunk in runnable.stream(prompt):
    print(chunk)


Once upon a time, 
there was a brave knight who ventured into the forest. 
Along the way, he encountered many challenges, but he always remained determined. 


In [None]:
from google.colab import userdata
openai_api_key = userdata.get('openai_api_key')
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(openai_api_key=openai_api_key,model='gpt-4o' ,temperature=0)
prompt = "Tell me a ploite joke about.at most 20 words"

# Stream and print each part of the text incrementally
text = ""
for chunk in llm.stream(prompt):
    text += chunk.content
    print(text)


Why
Why did
Why did the
Why did the scare
Why did the scarecrow
Why did the scarecrow win
Why did the scarecrow win an
Why did the scarecrow win an award
Why did the scarecrow win an award?
Why did the scarecrow win an award? Because
Why did the scarecrow win an award? Because he
Why did the scarecrow win an award? Because he was
Why did the scarecrow win an award? Because he was outstanding
Why did the scarecrow win an award? Because he was outstanding in
Why did the scarecrow win an award? Because he was outstanding in his
Why did the scarecrow win an award? Because he was outstanding in his field
Why did the scarecrow win an award? Because he was outstanding in his field!
Why did the scarecrow win an award? Because he was outstanding in his field!


### 4. Composing Runnables

You can chain runnables together using the pipe operator `|`.


In [None]:
# Define preprocessing and postprocessing steps
preprocess = RunnableLambda(lambda x: x.lower().strip())
postprocess = RunnableLambda(lambda x: f'Processed: {x}')

# Chain the runnables
pipeline = preprocess | postprocess

# Invoke the pipeline
result = pipeline.invoke('   Hello World    ')
print(result)  # Expected output: 'Processed: hello world'

Processed: hello world


### 5. Parallel Execution

You can execute tasks in parallel using `RunnableParallel`.


In [None]:
from langchain_core.runnables import RunnableLambda, RunnableParallel

runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)

chain = RunnableParallel(first=runnable1, second=runnable2)

chain.invoke(2)

{'first': {'foo': 2}, 'second': [2, 2]}

## 6. Compose Runnables Using the .pipe method




In [None]:
runnable1 | runnable2

In [None]:

from langchain_core.runnables import RunnableLambda

# Define the first runnable that returns a dictionary
runnable1 = RunnableLambda(lambda x: {"foo": x})

# Define the second runnable that processes the output of the first runnable
runnable2 = RunnableLambda(lambda x: [x] * 3)

# Chain the runnables together using the pipe operator
chain = runnable1.pipe(runnable2)

# Invoke the composed chain
result = chain.invoke(2)

# Output the result
print(result)  # Expected output: [{'foo': 2}, {'foo': 2}]


[{'foo': 2}, {'foo': 2}, {'foo': 2}]


### 6. Creating Custom Runnables

You can turn any function into a runnable using `RunnableLambda`.

```python
from langchain.runnables import RunnableLambda

# Define a custom function
custom_runnable = RunnableLambda(lambda x: f'Square: {x ** 2}')

# Invoke the custom runnable
result = custom_runnable.invoke(5)
print(result)  # Expected output: 'Square: 25'
```

## Include Input Dictionary in Output Dictionary Using `RunnablePassthrough`

In some scenarios, you may want to include the entire input dictionary in the output alongside the results of transformations or computations. The `RunnablePassthrough` method allows you to pass the input data through unchanged while applying other transformations in parallel. This is useful when you want to retain the original input alongside new or modified data in the output.

In this example, we use `RunnablePassthrough` to pass the input dictionary through unchanged, while another `RunnableLambda` performs a transformation on the `"foo"` key. The `RunnableParallel` is then used to run both tasks in parallel, combining the original input with the new result.


In [None]:
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

# Define a runnable that adds 7 to the value of the "foo" key
runnable1 = RunnableLambda(lambda x: x["foo"] + 7)

# Use RunnableParallel to run the transformation and passthrough in parallel
chain = RunnableParallel(bar=runnable1, baz=RunnablePassthrough())

# Invoke the chain with an input dictionary
result = chain.invoke({"foo": 10})

# Output the result
print(result)  # Expected output: {'foo': 10, 'bar': 17, 'baz': {'foo': 10}}

{'bar': 17, 'baz': {'foo': 10}}


## Merge Input and Output Dictionaries Using `RunnablePassthrough.assign`

In many cases, you may want to merge or assign values from the output of one runnable to the input of another. `RunnablePassthrough.assign` provides a way to pass through the input dictionary while applying transformations to specific keys. This is useful when you need to modify only a part of the input data while preserving the rest.

In this example, we use `RunnablePassthrough.assign` to merge the input dictionary with the output of a transformation. The transformation function (`runnable1`) adds `7` to the value associated with the `"foo"` key in the input dictionary. We then assign the result to a new key, `"bar"`, in the output dictionary.



In [None]:

from langchain_core.runnables import RunnableLambda, RunnablePassthrough

runnable1 = RunnableLambda(lambda x: x["foo"] + 7)

chain = RunnablePassthrough.assign(bar=runnable1)

chain.invoke({"foo": 10})

{'foo': 10, 'bar': 17}