Skip to content

modern-python/faststream-concurrent-aiokafka

Repository files navigation

faststream-concurrent-aiokafka

Supported versions downloads GitHub stars

Concurrent message processing middleware for FastStream with aiokafka.

By default FastStream processes Kafka messages sequentially — one message at a time per subscriber. This library turns each incoming message into an asyncio task so multiple messages are handled concurrently, while keeping offset commits correct and shutdown graceful.

Features

  • Concurrent message processing via asyncio tasks
  • Configurable concurrency limit (semaphore-based)
  • Batch offset committing per partition after each task completes
  • Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
  • Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
  • Background observer task to detect and discard stale completed tasks
  • Handler exceptions are logged but do not crash the consumer
  • Health check helper to probe handler status from a ContextRepo

📦 PyPi

📝 License

Installation

pip install faststream-concurrent-aiokafka

Quick Start

ack_policy=AckPolicy.MANUAL is required on every concurrent subscriber — the middleware enforces this at runtime. Without it, FastStream would commit offsets before processing tasks complete, causing silent message loss on crash. Subscribers that use other ack policies are automatically passed through without concurrent processing.

AsgiFastStream note: its lifespan receives an app-level ContextRepo separate from broker.context. Pass broker.context explicitly instead of the injected argument.

from contextlib import asynccontextmanager
from faststream import ContextRepo
from faststream.asgi import AsgiFastStream
from faststream.kafka import KafkaBroker
from faststream.middlewares import AckPolicy
from faststream_concurrent_aiokafka import (
    KafkaConcurrentProcessingMiddleware,
    initialize_concurrent_processing,
    stop_concurrent_processing,
)

broker = KafkaBroker(...)
# Register KCM on the broker before any other middleware (see DI note below)
broker.add_middleware(KafkaConcurrentProcessingMiddleware)

@asynccontextmanager
async def lifespan(_context: ContextRepo):
    await initialize_concurrent_processing(
        context=broker.context,
        concurrency_limit=20,         # max concurrent tasks (minimum: 1)
        commit_batch_size=100,        # commit after this many completed tasks
        commit_batch_timeout_sec=5.0, # or after this many seconds
    )
    try:
        yield
    finally:
        await stop_concurrent_processing(broker.context)

app = AsgiFastStream(broker, lifespan=lifespan)

@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
async def handle(msg: str) -> None:
    ...

# Subscribers without AckPolicy.MANUAL are passed through unchanged
@broker.subscriber("other-topic", group_id="other-group")
async def handle_other(msg: str) -> None:
    ...

Core Concepts

KafkaConcurrentProcessingMiddleware

A FastStream BaseMiddleware subclass. Add it to your broker to enable concurrent processing. It wraps each incoming message in an asyncio task submitted to KafkaConcurrentHandler.

KafkaConcurrentHandler

The processing engine. Manages:

  • An asyncio.Semaphore to enforce concurrency_limit
  • A set of in-flight asyncio tasks
  • A background observer that periodically discards stale completed tasks
  • Signal handlers for graceful shutdown

KafkaBatchCommitter

Runs as a background asyncio task. Receives KafkaCommitTask objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, CommitterIsDeadError is raised to callers.

API Reference

initialize_concurrent_processing(context, ...)

Create and start the concurrent processing handler; store it in FastStream's context.

Parameter Default Description
context required FastStream ContextRepo instance
concurrency_limit 10 Max concurrent asyncio tasks (minimum: 1)
commit_batch_size 10 Max messages per commit batch
commit_batch_timeout_sec 10.0 Max seconds before flushing a batch

Returns the KafkaConcurrentHandler instance.

stop_concurrent_processing(context)

Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.

is_kafka_handler_healthy(context)

Returns True if the KafkaConcurrentHandler stored in context is running and healthy, False otherwise (not initialized, stopped, or observer task dead). Useful for readiness/liveness probes.

KafkaConcurrentProcessingMiddleware

FastStream middleware class. Register it via broker.add_middleware(...). See Quick Start for usage examples.

Must be outermost. consume_scope fires the handler as a background task and returns None immediately. Any middleware that wraps it on the outside will see that premature return and misfire — wrong timing, early cleanup, or missed exceptions. Middlewares added after it (i.e. inner in the chain) run correctly inside the background task.

DI framework compatibility (modern-di-faststream and similar)

DI frameworks like modern-di-faststream register a broker-level middleware that creates a REQUEST-scoped dependency container around each message. If that middleware is outer to KafkaConcurrentProcessingMiddleware, its scope closes as soon as consume_scope returns — before the background task runs — so any dependencies resolved inside the task (database sessions, repositories, …) are created from an already-closed container. Their finalizers never run, leaving connections unreturned to the pool.

Fix: call broker.add_middleware(KafkaConcurrentProcessingMiddleware) before setup_di(...) (or any equivalent DI bootstrap call). FastStream stacks middlewares so the last registered is outermost; adding KCM first ensures the DI middleware ends up inside the background task where it can manage the scope lifetime correctly.

broker = KafkaBroker(...)
broker.add_middleware(KafkaConcurrentProcessingMiddleware)  # must come first
modern_di_faststream.setup_di(app, container=container)    # adds DI middleware after → inner to KCM

How It Works

  1. Message dispatch: On each incoming message, consume_scope calls handle_task(), which acquires a semaphore slot then fires the handler coroutine as a background asyncio.Task.

  2. Concurrency control: The semaphore blocks new tasks when concurrency_limit is reached. The slot is released via a done-callback when the task finishes or fails.

  3. Offset committing: Each dispatched task is paired with its Kafka offset and consumer reference and enqueued in KafkaBatchCommitter. Once the task completes, the committer groups offsets by partition and calls consumer.commit(partitions_to_offsets) with offset + 1 (Kafka's "next offset to fetch" convention).

  4. Graceful shutdown: stop_concurrent_processing sets the shutdown event, flushes the committer, cancels the observer task, and calls asyncio.gather with a 10-second timeout to wait for all in-flight tasks.

Requirements

  • Python >= 3.11
  • faststream[kafka]

About

Concurrent message processing middleware for FastStream with aiokafka.

Resources

Stars

Watchers

Forks

Contributors

Languages