```{contents}
```
## Async Workers 


**Async workers** are background execution units that process tasks **concurrently** without blocking the main application thread.
They allow the system to handle **many long-running or I/O-bound jobs** efficiently.

They are fundamental for:

* High-throughput APIs
* LLM pipelines
* Data processing systems
* Event-driven architectures

---

### Why Async Workers Matter

Without async workers:

```
Request → Blocking work → Server stalls → Timeouts
```

With async workers:

```
Requests handled instantly + work processed concurrently
```

---

### Where Async Workers Fit

```
Client → API → Task Queue → Async Workers → Database / LLM / Storage
```

---

### Native Python Async Workers (asyncio)

#### Demonstration

```python
import asyncio

async def worker(name, queue):
    while True:
        job = await queue.get()
        print(f"{name} processing {job}")
        await asyncio.sleep(2)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    workers = [asyncio.create_task(worker(f"W{i}", queue)) for i in range(3)]

    for i in range(10):
        await queue.put(f"job-{i}")

    await queue.join()

    for w in workers:
        w.cancel()

asyncio.run(main())
```

---

### Async Workers in FastAPI

#### Demonstration

```python
from fastapi import FastAPI
import asyncio

app = FastAPI()
queue = asyncio.Queue()

async def worker():
    while True:
        job = await queue.get()
        await asyncio.sleep(3)
        print("Processed:", job)
        queue.task_done()

@app.on_event("startup")
async def start_workers():
    for _ in range(2):
        asyncio.create_task(worker())

@app.post("/submit")
async def submit_job(data: str):
    await queue.put(data)
    return {"status": "queued"}
```

---

### Async Worker with LLM Tasks

```python
async def llm_worker():
    while True:
        prompt = await queue.get()
        result = await llm.ainvoke(prompt)
        store(result)
        queue.task_done()
```

---

### Distributed Async Workers (Production Pattern)

```
FastAPI → Redis / RabbitMQ → Worker Fleet → Database / LLM
```

#### Example (Conceptual)

```python
@celery.task
def process_job(data):
    llm.invoke(data)
```

---

### Scaling Strategy

| Layer   | Scaling      |
| ------- | ------------ |
| Workers | Horizontal   |
| Queue   | Distributed  |
| LLM     | Rate limited |
| Storage | Sharded      |

---

### Mental Model

```
Async Workers = Factory workers processing jobs from a conveyor belt
```

---

### Key Takeaways

* Enable concurrency without blocking
* Essential for scalable AI systems
* Combine asyncio for I/O + worker queues for throughput
* Production systems use external queues and worker pools