# Talk: Python + Celery

**Meetup Python Grenoble - 2019/10/30**

_Manage your asynchronous tasks with Celery!_

<img src="img/banner.png" width="400px"/>

# About me

<img src="img/me.jpg" width="200px" />

## Romain Clement

- Meetup Python Grenoble co-organizer
- Freelance Software Engineer
- CTO @ Sylha (FinTech startup)
- [Website](https://romain-clement.net)
- [GitHub](https://github.com/rclement)

# Why asynchronous tasks?

More and more use-cases everyday for non-blocking processes:

- Sending e-mails (single, batch, scheduled)
- Data pipelines (ETL)
- Machine learning
- Micro-services communication
- Logging
- ...

# Is it that hard?

Mmmh, kinda, see for yourself:

- Concurrence model (threading, etc.)
- Result storage (databases, etc.)
- Scaling strategy (pool/cluster of workers)
- Scheduling (cron-jobs)
- Retry on failure strategies
- Distributing tasks accross multiple services
- Task broadcasting
- ...

# Why Celery?

Celery is a Python package abstracting task definitions and invocations, using a _message-broker_ and a _result-backend_ behind the scenes:

- Choose a message broker (Redis, RabbitMQ, etc.) and a result backend (Redis, SQLAlchemy, Mongo, etc.), if any
- Define your tasks using Python code
- Define your cron-jobs using Python code
- Define retry on failure strategies for each task
- Call your tasks (almost) as a function call!

# Celery Overview

<img src="img/overview.png" width="600px" />

# Show me some code!

# Initialize Celery

In [None]:
import os

from typing import Optional
from celery import Celery

celery_broker_url: Optional[str] = os.environ.get("CELERY_BROKER_URL")
celery_result_backend: Optional[str] = os.environ.get("CELERY_RESULT_BACKEND")

celery_app: Celery = Celery("example")

celery_app.conf.update(
    broker_url=celery_broker_url,
    result_backend=celery_result_backend,
)

# Define a simple task

In [None]:
def say_hi(name: str) -> None:
    print(f"Hi, {name}!")

In [None]:
celery_app.task(
    say_hi,
    name="tasks.say_hi",
    ignore_result=True,
)

# Call a simple task

Start a Celery worker before-hand:

```
celery worker \
    --app example.celery:celery_app \
    --loglevel=info
```

In [None]:
celery_app.send_task("tasks.say_hi", args=["Joe"])

# Get a result from a task

In [None]:
from datetime import datetime

def talking_clock() -> datetime:
    return datetime.now()

In [None]:
celery_app.task(
    talking_clock,
    name="tasks.talking_clock",
    ignore_result=False
)

In [None]:
from datetime import datetime
from celery.result import AsyncResult

talking_clock_task: AsyncResult = celery_app.send_task("tasks.talking_clock")
# store `talking_clock_task.id` somewhere for further use

In [None]:
talking_clock_task_result = AsyncResult(talking_clock_task.id, app=celery_app)
now: datetime = talking_clock_task_result.get()

print(f"result: {now}")

# Auto-retry failing tasks

In [None]:
def failing_task() -> None:
    raise RuntimeError

In [None]:
celery_app.task(
    failing_task,
    name="tasks.failing_task",
    ignore_result=True,
    autoretry_for=(RuntimeError,),
    max_retries=5,
    retry_backoff=True,
    retry_jitter=True,
)

In [None]:
celery_app.send_task("tasks.failing_task")

# Scheduling tasks

In [None]:
def send_report(email: str) -> None:
    print(f"Sending report to {email}")

In [None]:
celery_app.task(send_report, name="tasks.send_report", ignore_result=True)

In [None]:
from celery import Celery, schedules

celery_app.conf.update(
    enable_utc=True,
    timezone="UTC",
    beat_schedule={
        "send-report-minutely": {
            "task": "tasks.send_report",
            "kwargs": {"email": "admin@domain.com"},
            "options": {"queue" : "default"},
            "schedule": schedules.crontab(
                minute="*",
                hour="*",
                day_of_week="*",
                day_of_month="*",
                month_of_year="*",
            ),
        },
    }
)

Start a Celery beat process (act as an autonomous producer):

```
celery beat \
    --app example.celery:celery_app \
    --loglevel=info
```

**Warning**: beware of distributed beat to only have one of the replicas executing tasks! (use alternative implementations)

# Splitting tasks in queues

When starting a Celery worker, specify which queues to listen to:

```
celery worker \
    --app example.celery:celery_app \
    --queues=mailing,broadcast \
    --loglevel=info
```

In [None]:
celery_app.send_task(
    "mailing.send_email",
    queue="mailing",
    kwargs={"to": "joe@doe.com"}
)

In [None]:
celery.conf.update(
    task_routes={
        "mailing.*": "mailing",
    },
)

# Broadcasting tasks

# Pipelining tasks

Lots of possible workflows using the "canvas" feature, but the main ones are:
    
- `chain`: chain tasks (with or without passing arguments)
- `group`: parallelize tasks
- `chord`: parallelize then executing a finalizing task

**Eliminates the problem of a tasks wanting to call/wait other tasks!**

In [None]:
class User:
    name: str
    email: str

def get_all_users() -> List[User]:
    return [
        User(name="John Doe", email="john@doe.com"),
        User(name="Jane Doe", email="jane@doe.com"),
    ]

celery_app.task(
    get_all_users,
    name="users.get_all",
    ignore_result=False
)

In [None]:
def send_newsletter_to_all(all_users: List[User]) -> None:
    for u in all_users:
        print(f"Sending newsletter to {u.name}: {u.email}")
    
celery_app.task(
    send_newsletter_to_all,
    name="mailing.send_newsletter_to_all",
    ignore_result=True
)

In [None]:
from celery import chain, signature

def send_newsletter():
    user_sig = signature(
        "users.get_all",
        queue="users"
    )
    newsletter_sig = signature(
        "mailing.send_newsletter_to_all",
        queue="mailing"
    )
    task_chain = chain(user_sig | newsletter_sig)
    task_chain.apply_async()

celery_app.task(
    send_newsletter,
    name="tasks.send_newsletter",
    ignore_result=True
)

# Things to consider

- Python only (experimental Node.js client with `node-celery`)
- No `asyncio` (`async` / `await`) support for tasks results (coming to `celery >= 5.0.0`)
- Some regressions recently introduced either within `celery` or `kombu`, be careful

# Follow-up

- Storing tasks results in SQL/NoSQL database
- Testing Celery tasks (mocks)
- Monitoring (`flower`)

# Alternatives

- [NATS](https://nats.io)
- [Kafka](https://kafka.apache.org)

# Sources

- [Celery Documentation](https://docs.celeryproject.org/en/latest/)
- [Orchestrating Background Jobs](https://www.toptal.com/python/orchestrating-celery-python-background-jobs)
- [Building Data Flows with Celery and SQLAlchemy](https://www.youtube.com/watch?v=AhIoAMltzVw)

# Q&A