Skip to content

omer9564/acelery

Repository files navigation

acelery

A Celery plugin that enables seamless execution of asyncio coroutines as Celery tasks with proper event-loop management and asyncio-native result handling.

It lets you write your tasks as async def functions and consume their results from any asyncio caller without blocking the event loop.

Why?

Celery is synchronous by design: tasks are executed inside worker processes that don't know about asyncio, and AsyncResult.get() blocks the calling thread. That's painful when:

  • Your task logic naturally awaits I/O (HTTP, DB, queues, gRPC).
  • Your caller is an asyncio application (e.g. FastAPI / aiohttp) and can't afford a blocking .get().
  • You want to deduplicate "the same task with the same args" across concurrent callers so you only run the work once.

acelery addresses all three: a worker-side Runner that hosts a persistent event loop, an AIOResult that awaits results over Redis Pub/Sub instead of polling, and a trigger_or_join_task primitive that coalesces duplicate invocations.

Features

  • Async task decorator — write async def and decorate with @async_task() to run it inside a Celery worker.
  • Worker event-loop management — a shared asyncio runner is set up on worker_process_init and torn down on worker_process_shutdown.
  • Dedicated runners — opt into a per-task asyncio.run for stronger isolation when you need it.
  • Asyncio-native result handlingAIOResult.get() is a coroutine, backed by Redis Pub/Sub with periodic ticks to avoid races.
  • Task deduplicationAIOResult.trigger_or_join_task(...) either starts a new task or joins an existing in-flight one keyed by (name, args, kwargs).
  • Leftover-coroutine cleanup — orphan tasks left behind by your coroutine are detected, logged, and cancelled.
  • Retry on timeout — built on tenacity, configurable per call.
  • Pydantic-typed results — declare return_type=... and the raw backend payload is validated/parsed for you (Pydantic v1 and v2 supported).
  • Backends
    • Redis — full support (Pub/Sub-driven awaiting).
    • Other backends not currently supported.

Requirements

  • Python 3.10+
  • A Celery broker (RabbitMQ, Redis, etc. — Celery's defaults apply)
  • Redis as the result backend (required by AIOResult)

Installation

Using uv:

uv add acelery

Using poetry:

poetry add acelery

Using pip:

pip install acelery

Quick Start

Configure Celery

AIOResult reads task metadata over Redis Pub/Sub, so configure Celery with a Redis result backend:

from celery import Celery

app = Celery(
    "myapp",
    broker="amqp://guest:guest@localhost:5672//",
    backend="redis://localhost:6379/0",
)

Define an async task

import asyncio
from acelery import async_task

@app.task()
@async_task()
async def greet(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"Hello, {name}!"

The decorator order matters: @app.task() must wrap @async_task(). The inner decorator turns the coroutine function into a synchronous callable that Celery can run; the outer one registers it as a Celery task.

Dedicated runner (per-task isolation)

By default tasks share the worker's global asyncio runner. Pass dedicated_runner=True for tasks that need their own asyncio.run invocation (at the cost of ~10µs of overhead per call):

@app.task()
@async_task(dedicated_runner=True)
async def isolated_task() -> str:
    await asyncio.sleep(1)
    return "isolated"

Awaiting results from asyncio

Wrap the standard Celery AsyncResult in AIOResult to await it without blocking:

from acelery import AIOResult

async def main() -> None:
    result = AIOResult(greet.delay("World"), return_type=str)
    value = await result.get(timeout=10)
    print(value)  # "Hello, World!"

get() accepts:

Argument Default Purpose
timeout 60 Hard upper bound for the whole wait.
propagate True Re-raise exceptions stored in the backend instead of returning them.
forget True Extend the result key's TTL to forget_cooldown after a successful read (so other late readers can still see it).
forget_cooldown 90 Seconds to retain the result before Redis evicts it.
interrupt_every 5.0 How often to fall back from Pub/Sub to a direct backend read (defends against missed messages).
max_pending_interrupts 3 Maximum consecutive "still pending" ticks before raising MaxPendingInterruptsError.

Trigger-or-join: deduplicate concurrent calls

If multiple callers fire the same task with the same args, you usually want one execution and many readers. trigger_or_join_task does exactly that, keyed by name(args, kwargs):

import asyncio
from acelery import AIOResult

@app.task(track_started=True)
@async_task()
async def sleeping_task(sleep_time: float) -> str:
    await asyncio.sleep(sleep_time)
    return "OK"

async def main() -> None:
    aio_result, value = await AIOResult.trigger_or_join_task(
        sleeping_task.s(2.0),
        timeout=30,
        return_type=str,
        track_started=True,  # set this whenever the task itself sets track_started=True
    )
    print(value)  # "OK"

Notes:

  • The first caller publishes the task and stores task_id under the dedupe key.
  • Subsequent callers find the existing task_id and await its result instead of re-publishing.
  • Retries on timeout are on by default (retry_on_timeout=True, max_retries_on_timeout=3).
  • If the task declares track_started=True, pass track_started=True here too so a PENDING state isn't mis-read as "still alive".

Calling Celery primitives from asyncio

AIOExecutable wraps a Signature / Task and exposes async delay / apply_async / apply, executed in a thread pool so they don't block your event loop:

from acelery import AIOExecutable

async def main() -> None:
    aio_result = await AIOExecutable(greet.s("World"), return_type=str).apply_async()
    print(await aio_result.get(timeout=10))

How it works

  • On worker_process_init, a module-level Runner (asyncio.Runner on 3.11+, polyfilled on 3.10) is started; on worker_process_shutdown it is closed cleanly.
  • @async_task runs the coroutine inside that shared runner, copies the current contextvars context for proper propagation, and after the coroutine returns it cancels any asyncio.Tasks left behind by the user code (with a warning log).
  • AIOResult.get() opens a Redis Pub/Sub subscription on the result key and races it against a periodic "tick" that reads the backend directly — this closes the race where the task finishes before the subscriber is fully attached.

License

See LICENSE.

Contributing

Issues, bug reports, and PRs are welcome. See CONTRIBUTING.md for development setup, testing, and submission guidelines.

About

Asyncio plugin for Celery — write tasks as async def, await results without blocking the event loop.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages