Automatic REST API generation for Celery tasks with FastAPI. This package seamlessly bridges Celery and FastAPI, automatically creating REST endpoints for all your registered Celery tasks.
- π Automatic endpoint generation - REST APIs created automatically for all Celery tasks
- π§ Zero configuration - Works out of the box with sensible defaults
- π Task monitoring - Built-in endpoints for task status, revocation, and worker info
- π― App-scoped operations - Only manages tasks from your specific Celery app, not the entire cluster
- π₯οΈ CLI support - Run as a standalone server from command line
- π¦ Modular design - Use as a library or standalone application
- π Queue-aware routing - Respects Celery queue assignments
- π OpenAPI documentation - Full Swagger/ReDoc support
- π Production ready - Full uvicorn/gunicorn support with SSL, workers, and all options
- β‘ Full Celery options - All task options (countdown, eta, priority, etc.)
- π Pool support - Compatible with eventlet, gevent, prefork, and solo pools
- Python 3.11+
- FastAPI 0.100.0+
- Celery 5.3.0+
# Basic installation
pip install celery-fastapi
# With CLI support
pip install celery-fastapi[cli]
# With uvicorn server
pip install celery-fastapi[server]
# With gunicorn for production
pip install celery-fastapi[gunicorn]
# With Redis broker
pip install celery-fastapi[redis]
# With RabbitMQ broker
pip install celery-fastapi[rabbitmq]
# With eventlet/gevent concurrency
pip install celery-fastapi[eventlet]
pip install celery-fastapi[gevent]
# All extras (recommended for production)
pip install celery-fastapi[all]Or with Poetry:
poetry add celery-fastapi
poetry add celery-fastapi --extras cli # for CLI supportfrom celery import Celery
from celery_fastapi import CeleryFastAPIBridge, create_app
# Your existing Celery app
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def add(x, y):
return x + y
@celery_app.task
def multiply(x, y):
return x * y
# Option 1: Using create_app factory
app = create_app(celery_app)
# Option 2: Using the Bridge class for more control
from fastapi import FastAPI
fastapi_app = FastAPI(title="My Task API")
bridge = CeleryFastAPIBridge(celery_app, fastapi_app)
bridge.register_routes()Run with uvicorn:
uvicorn myapp:app --reload# Start the server (development)
celery-fastapi serve myapp.celery:celery_app --port 8000 --reload
# Production with multiple workers
celery-fastapi serve myapp.celery:celery_app -w 4 --host 0.0.0.0
# With SSL
celery-fastapi serve myapp.celery:celery_app --ssl-keyfile key.pem --ssl-certfile cert.pem
# Using gunicorn (production)
celery-fastapi serve-gunicorn myapp.celery:celery_app -w 4 -k uvicorn.workers.UvicornWorker
# List available routes
celery-fastapi routes myapp.celery:celery_app
# List registered tasks
celery-fastapi tasks myapp.celery:celery_app
# Show active workers
celery-fastapi workers myapp.celery:celery_appOnce running, your Celery tasks are available as REST endpoints:
# Execute a task with basic args
POST /{task_name_with_slashes}
Content-Type: application/json
{
"args": [1, 2],
"kwargs": {}
}
# Execute with advanced Celery options
POST /myapp/process_data
Content-Type: application/json
{
"args": ["data.csv"],
"kwargs": {"output_format": "json"},
"countdown": 60,
"priority": 5,
"queue": "high_priority",
"time_limit": 300,
"soft_time_limit": 280
}
# Response
{
"task_id": "abc123-def456-...",
"status": "PENDING"
}# Get task status
GET /tasks/{task_id}
# Response
{
"task_id": "abc123-def456-...",
"state": "SUCCESS",
"result": 3,
"traceback": null,
"date_done": "2024-01-15T10:30:00Z"
}# Revoke a task
POST /tasks/{task_id}/revoke
Content-Type: application/json
{
"terminate": true,
"signal": "SIGTERM"
}
# Get task result only
GET /tasks/{task_id}/result
# List active workers (filtered to this app's tasks)
GET /workers
# List available tasks in THIS app
GET /available-tasks
# Response
{
"app_name": "my_tasks",
"task_count": 4,
"tasks": [
{"name": "my_tasks.add", "queue": "default", ...},
{"name": "my_tasks.multiply", "queue": "default", ...}
]
}
# List queues
GET /queues
# Purge tasks from a queue
POST /purge# List active, scheduled, reserved, and revoked tasks (filtered to this app only)
GET /tasks
# Response
{
"active": {...},
"scheduled": {...},
"reserved": {...},
"revoked": {...}
}bridge = CeleryFastAPIBridge(
celery_app=celery_app,
fastapi_app=fastapi_app, # Optional, creates new if not provided
prefix="/api/v1", # URL prefix for all endpoints
include_status_endpoints=True, # Include /tasks endpoints
task_filter=lambda name: not name.startswith("internal."), # Filter tasks
)app = create_app(
celery_app, # Celery instance or module path string
title="My API",
description="Task API",
version="1.0.0",
prefix="/api",
include_status_endpoints=True,
fastapi_kwargs={"docs_url": "/swagger"},
)from fastapi import FastAPI
from celery_fastapi import CeleryFastAPIBridge
from myapp import celery_app
app = FastAPI()
# Your existing routes
@app.get("/health")
def health_check():
return {"status": "healthy"}
# Add Celery task endpoints under /celery prefix
bridge = CeleryFastAPIBridge(
celery_app,
app,
prefix="/celery",
)
bridge.register_routes()celery-fastapi --help
Commands:
serve Start the FastAPI server with uvicorn
serve-gunicorn Start the FastAPI server with Gunicorn
routes List all generated routes
tasks List all registered Celery tasks
workers Show active Celery workers
# Serve options (uvicorn)
celery-fastapi serve myapp:celery_app \
--host 0.0.0.0 \
--port 8000 \
--reload \
--workers 4 \
--prefix /api \
--log-level info \
--ssl-keyfile key.pem \
--ssl-certfile cert.pem \
--proxy-headers \
--forwarded-allow-ips '*'
# Serve options (gunicorn)
celery-fastapi serve-gunicorn myapp:celery_app \
--bind 0.0.0.0:8000 \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--timeout 30 \
--daemon \
--pid /var/run/celery-fastapi.pid# Clone the repository
git clone https://github.com/karailker/celery-fastapi.git
cd celery-fastapi
# Install dependencies
poetry install --extras all
# Run tests
poetry run pytest
# Run linting
poetry run ruff check .
poetry run mypy celery_fastapi
# Format code
poetry run ruff format .MIT License - see LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request