```{contents}
```
## **Batch Node in LangGraph**

A **Batch Node** in LangGraph is a specialized execution pattern that allows a single node to **process multiple independent inputs concurrently** within one graph step. It is essential for **high-throughput, low-latency, cost-efficient production systems** such as document processing pipelines, data labeling systems, and large-scale inference workflows.

---

### **1. Motivation**

Without batching, each item is processed sequentially:

```
item1 → node → result1  
item2 → node → result2  
item3 → node → result3
```

With a Batch Node:

```
[item1, item2, item3] → Batch Node → [result1, result2, result3]
```

This improves:

| Metric          | Improvement       |
| --------------- | ----------------- |
| Throughput      | High              |
| Latency         | Lower             |
| Cost            | Reduced LLM calls |
| GPU utilization | Better            |

---

### **2. Conceptual Model**

A Batch Node is still a **single node**, but its internal function consumes and produces **lists of state fragments**.

```
Graph Step
   └── Batch Node
           ├─ Task 1
           ├─ Task 2
           ├─ Task 3
           └─ Task N
```

---

### **3. State Design for Batching**

The state must explicitly support collections.

```python
class State(TypedDict):
    items: list[str]
    results: list[str]
```

---

### **4. Implementing a Batch Node**

```python
from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class State(TypedDict):
    items: List[str]
    results: List[str]

def batch_processor(state: State):
    outputs = []
    for text in state["items"]:
        outputs.append(text.upper())
    return {"results": outputs}

builder = StateGraph(State)
builder.add_node("batch", batch_processor)
builder.set_entry_point("batch")
builder.add_edge("batch", END)

graph = builder.compile()

result = graph.invoke({
    "items": ["doc one", "doc two", "doc three"]
})
print(result)
```

---

### **5. Parallel Batch Execution (High Performance)**

```python
import asyncio

async def async_batch_processor(state):
    async def process(item):
        return item.upper()

    tasks = [process(x) for x in state["items"]]
    outputs = await asyncio.gather(*tasks)
    return {"results": outputs}
```

Use:

```python
builder.add_node("batch", async_batch_processor)
```

---

### **6. LLM Batch Node Example**

```python
def summarize_batch(state):
    summaries = llm.batch(state["items"])
    return {"results": summaries}
```

This reduces **N LLM calls → 1 batched LLM call**.

---

### **7. When to Use Batch Nodes**

| Use Case               | Benefit         |
| ---------------------- | --------------- |
| Document summarization | Massive speedup |
| Embedding generation   | Cost reduction  |
| Data labeling          | Throughput      |
| Evaluation pipelines   | Scalability     |
| ETL workflows          | Efficiency      |

---

### **8. Production Concerns**

| Concern         | Handling                |
| --------------- | ----------------------- |
| Memory pressure | Chunk batches           |
| Error isolation | Per-item try/catch      |
| Retry           | Retry failed items only |
| Observability   | Log per item            |
| Backpressure    | Limit batch size        |

---

### **9. Variants of Batch Nodes**

| Variant            | Description                 |
| ------------------ | --------------------------- |
| Static Batch       | Fixed-size input            |
| Dynamic Batch      | Accumulates until threshold |
| Streaming Batch    | Processes rolling windows   |
| Micro-Batch        | Low-latency mini batches    |
| Hierarchical Batch | Batch inside subgraphs      |

---

### **10. Mental Model**

A Batch Node turns LangGraph from a **control system** into a **data-processing engine**, enabling it to behave like:

> **Distributed data pipeline + intelligent orchestrator**


### Demonstration

In [1]:
from typing import TypedDict, List

class State(TypedDict):
    documents: List[str]
    summaries: List[str]

def batch_summarizer(state: State):
    results = []
    for doc in state["documents"]:
        # mock LLM call
        results.append(f"Summary: {doc[:40]}...")
    return {"summaries": results}

import asyncio

async def async_batch_summarizer(state: State):
    async def summarize(text):
        await asyncio.sleep(0.1)  # simulate LLM latency
        return f"Summary: {text[:40]}..."

    tasks = [summarize(d) for d in state["documents"]]
    outputs = await asyncio.gather(*tasks)
    return {"summaries": outputs}


In [2]:
from langgraph.graph import StateGraph, END

builder = StateGraph(State)
builder.add_node("batch", async_batch_summarizer)
builder.set_entry_point("batch")
builder.add_edge("batch", END)

graph = builder.compile()


In [7]:
import warnings
warnings.filterwarnings("ignore")
input_data = {
    "documents": [
        "LangGraph enables cyclic workflows for LLM systems.",
        "Batch nodes improve throughput and reduce cost.",
        "Production graphs require fault tolerance."
    ]
}

import asyncio

result = await graph.ainvoke(input_data)
print(result["summaries"])


['Summary: LangGraph enables cyclic workflows for L...', 'Summary: Batch nodes improve throughput and reduc...', 'Summary: Production graphs require fault toleranc...']
