# Celery

Celery - асинхронная распределенная очередь выполнения задач в фоновом режиме, то есть не блокируя выполнение основной программы. Celery - очень гибкий инструмент, вы можете настраивать чуть ли не все аспекты выполнения задач.

## 1. Что такое celery, основные понятия

### Основные компоненты Celery:

1) Задача (task) - это базовый элемент Celery, который представляет собой отдельную задачу, которая будет выполнена асинхронно. Каждая задача должна быть определена в отдельном модуле Python, который содержит описание задачи и ее реализацию.

2) Очередь (queue) - это очередь задач, которые будут выполнены асинхронно. Каждая задача помещается в очередь и ожидает, когда ее будет обработана.

3) Worker (worker) - это процесс, который запускает и обрабатывает задачи из очереди. 
Каждый worker запускается в отдельном процессе и может обрабатывать несколько задач одновременно.

4) Брокер (broker) - это посредник между задачами и worker'ами. Он предоставляет очередь задач и хранит информацию о задачах, которые должны быть выполнены. В качестве брокера может выступать RabbitMQ, Redis или другие брокеры сообщений.

### Примеры использования celery в разработке


1) Сбор продуктовых метрик. Например, вы хотите каждый день собирать метрики по своему продукту, запускать модели машинного обучения и мониторить результаты.

2) Отложенное выполнение задач. Например, вы хотите, чтобы в фоне ваше приложение отправило всем пользователем на почту в полночь уведомление о том, что им надо пройти тестирование Иннополис.

3) Обработка изображений. Загрузка изображения пользователем на веб-сайт может быть асинхронной. Celery обрабатывает загрузку, ресайз и сохранение изображения в фоновом режиме, не блокируя основной поток веб-сервера и обеспечивая мгновенный отклик пользователю.

4) Распределенные вычисления. Celery может быть использован для организации распределенных вычислений, позволяя разбивать задачу на подзадачи и обрабатывать их на разных машинах, значительно сокращая время выполнения.

### Окей, а что такое эти брокеры сообщений Redis и RabbitMQ?


Расскажу самую базу про них, но по-хорошему это отдельный доклад про каждый надо пилить.

Брокер сообщений представляет собой тип построения архитектуры, при котором элементы системы «общаются» друг с другом с помощью посредника. Благодаря его работе происходит снятие нагрузки с веб-сервисов, так как им не приходится заниматься пересылкой сообщений: всю сопутствующую этому процессу работу он берёт на себя.

Можно сказать, что в работе любого брокера сообщений используются две основные сущности: producer (издатель сообщений) и consumer (потребитель/подписчик).

Одна сущность занимается созданием сообщений и отправкой их другой сущности-потребителю. В процессе отправки есть ещё серединная точка, которая представляет собой папку файловой системы, где хранятся сообщения, полученные от продюсера.



<a href="https://ibb.org.ru/1/fi5sw7"><img src="https://ibb.org.ru/images/2024/12/05/image2dce336435a98e39.png" alt="image2dce336435a98e39.png" border="0"></a>


Если рассмотреть Celery, то в этом случае:

1) *Продюсер*: Это часть кода, которая добавляет задачи в очередь Celery. Например, продюсером будет часть кода, которая запускается при оформлении заказа на сайте. Она создает объект задачи Celery, описывающий, что нужно сделать (например, send_order_confirmation, process_payment, ship_order), и отправляет этот объект в брокер сообщений (очередь).

2) *Потребитель*: Это  процесс, которые извлекают задачи из очереди и выполняют их. В Celery, потребители называются воркерами. Они постоянно мониторят очередь на наличие новых задач. Когда задача появляется, воркер берет её, выполняет соответствующую функцию (например, send_order_confirmation) и сообщает брокеру о завершении. Воркеры запускаются отдельно от продюсера и могут работать на разных машинах.

3) *Очередь сообщений*:  Это промежуточное хранилище задач, связывающее продюсера и потребителя.  Celery может использовать различные брокеры, такие как Redis, RabbitMQ, Amazon SQS и другие.  Брокер  обеспечивает надежное хранение задач, даже если продюсер или потребитель временно недоступны.  В нашем примере мы используем Redis (broker='redis://localhost:6379/0').  Он хранит информацию о задачах (их ID, функцию, аргументы и др.) в своей структуре данных


---



Допустим, вы написали свое веб-приложение, в котором есть хендлер, который отправляет на почту письма 1 миллиону пользователей. Очевидно, что если это будет выполнять ваш веб-сервер, то он зависнет надолго. Это плохо, ведь он нужен для обработки еще кучи других запросов. Тогда вы хотите сделать так, чтобы на моменте получения информации, что надо отправить письма, ваш скрипт хендлера завершался и передавал свою задачу кому-то там другому, кто на фоне работает. 

Для этого придумали Celery: он посылает информацию о том, что надо разослать письма, в брокер сообщений, который, в свою очередь помещает это в очередь задач, откуда задачу забирает воркер (сущность - исполнитель). По завершении задачи воркер кидает информацию об этом на результирующий бэкенд. 

Что мы получили в итоге, внедрив Celery:

- наш веб-сервер не нагружен
- наш веб-сервер принимает больше полезных запросов
- ваша задача разослать письма выполнена
- данные о том, как задача выполнена сохранены

## 2. Базовое приложение celery 

### Запуск окружения и самого celery

Давайте наконец напишем наше первое приложение. Как говорилось выше, для работы необходим брокер сообщений. Для простоты мы будем использовать **Redis**. Чтобы не затруднять вас в установке этой программы, мы запустим его в Docker-контейнере! Что такое Docker и как его установить можно прочитать в интернете.

In [2]:
!pip install celery[redis]

Defaulting to user installation because normal site-packages is not writeable


Чтобы запустить докер-контейнер, необходимо выполнить команду:

```shell
docker-compose up -d
```

Чтобы запустить Celery, необходимо выполнить команду:

```shell
celery -A <расположение файла в котором находится базовое приложение celery> worker
```


В юпитер ноутбуке не будет выполняться код, который создает задачи - все задачи, которые мы будем рассматривать в статье уже объявлены в файле **celery_app.py**

Код задач будет продублирован в юпитер ноутбуке, но не будет вести ни к чему

Код который что-то делает, будет просто запускать задачи

### Создание приложение, подключение к Redis

Мы создаем объект класса Celery, куда мы укажем имя приложения, а также путь до брокера сообщений и бэкенда, куда будет сохраняться вывод задач.

In [3]:
from celery import Celery

app = Celery('app', broker='redis_url', backend='redis_url')

Теперь мы можем создать первую задачу. Это будет функция, которая возвращает 'Hello World' и выводит в поток вывода 'Hello Celery'.

Каждая задача в Celery оборачивается в декоратор task. Потом мы посмотрим, какие полезные аргументы можно прокинуть в этот декоратор.

Задачи иногда также называют сообщениями. По сути брокер сообщений - это нечто, что передает сообщения из одной системы в другую. В нашем случае сообщение представляет собой описание задачи: название (уникальный идентификатор), входные параметры, время ожидания, количество повторных попыток и тд.

В celery задача является классом. Таким образом, каждый раз, когда вы используете декоратор для функции, чтобы сделать ее задачей, под капотом создается класс. Это означает, что у каждой задачи есть self, к которому добавляется множество атрибутов. Если мы хотим получить доступ к этим атрибутам, то нужно указать параметр bind=True.

In [4]:
@app.task
def hello() -> str:
    print('Hello Celery!')
    return 'Hello World!'

В целом, это обычная советская функция, которую можно вызвать и декоратор никак не повлияет на ее поведение, но мы то хотим, чтобы все было с использованием Celery.

Чтобы вызвать функцию в Celery мы можем использовать методы **delay** и **apply_async**, второй предлагает бОльший функционал, поэтому всегда будем использовать его. 

In [None]:
from celery_app import hello

result = hello.apply_async()
print(result)

Мы получили просто айди задачи в брокере сообщений, то есть данная функция просто помещает нашу задачу в очередь задач в брокере сообщений, который перехватывает ответственность за нее, после чего ее заберет воркер и выполнит. 

Но куда же делось 'Hello World!' и 'Hello Celery!'? Ответ прост: все логи, то есть результаты принтов лежат в файле **celery.logs**, который мы указали при запуске Celery. 

Чтобы получить результат выполнения функции, необходимо обратиться к бэкенду, где они лежат. За это отвечает метод **get**. Он будет пытаться забрать из бэкенда результат выполнения задачи, пока не вылетит таймаут. Давайте выполним это

In [None]:
result.get()

Еще важный момент, который надо прочувствовать - какой метод будет выполняться долго в вашем коде

Для демонстрации допишем функцию hello, чтобы она еще спала 5 секунд

In [None]:
import time
@app.task
def sleepy_hello() -> str:
    time.sleep(5)
    print('Hello Celery!')
    return 'Hello World!'

Когда вы выполните код ниже, то заметите, что спал именно **get**

In [None]:
start_time = time.time()
result = sleepy_hello.apply_async()
end_time = time.time()
print(f'время выполнения apply_async = {round(end_time - start_time, 2)}')

start_time = time.time()
result.get()
end_time = time.time()
print(f'время выполнения get = {round(end_time - start_time, 2)}')

Так происходит потому, что apply_async не тратит никаких ресурсов кроме как на добавление задачи в брокер сообщений.

В свою очередь get будет требовать у бэкенда результат, но воркер не выполнит код, пока не пройдет 5 секунд, поэтому функция get тоже будет ждать.

## 3. Погружение в опции запуска задач

### Передача аргументов в задачу

Давайте для начала создадим простую задачу, которая принимает аргументы - сложение двух чисел.

In [6]:
@app.task
def addition(x: int, y: int) -> int:
    return x + y

Чтобы прокинуть аргументы при запуске задачи, нужно передать их как кортеж в параметр args

In [None]:
from celery_app import addition

result = addition.apply_async(args=(1, 1))
result.get()

### Отложенный запуск (простая версия)

В celery есть очень мощный инструмент для периодизации задач, но есть и что-то более простое. 

Сейчас мы рассмотрим обратный отсчет до выполнения, он передается как целое количество секунд до запуска функции

В чем отличие от обычного **time.sleep** внутри функции? Да просто операция засыпания будет блокировать поток выполнения в воркере, в то время как обратный отсчет - нет. То есть отложенный запуск освобождает воркер от торможения.

Количество секунд должно передаваться в аргумент **countdown**

In [None]:
result = addition.apply_async(countdown=5)

In [None]:
result.get()

Можно заметить, что поведение очень похоже на случай с time.sleep(5), но могут возникать отличия во времени выполнения даже исполняя один и тот же код несколько раз. Почему?

Время в ETA задачах (то есть с отложенным запуском) не является точным временем выполнения этой задачи. Вместо этого это самый ранний момент выполнения этой задачи. Как только наступит время ETA, задача должна дождаться освобождения обработчика. Если обработчик перегружен, то задача может выполниться не сразу. Короче, никто не гарантирует, что выполнится в срок.

### Повторный запуск задач

Бывает такое, что ваша задача падает от чего-то непревиденного, например, вы парсите сайт и он радномно вкидывает капчу вам, просто потому что хочет. При этом если через секунду повторить попытку, то все заработает. Или же вы ждете подключения к базе данных. Примеров масса. 

В таких случаях очень удобно будет сделать так, чтобы ваша задача в случае ошибки еще какое-то время сама пыталась повторить себя. 

Для этого в celery существует метод **retry**. Для обращения к нему необходимо обратиться к методам таски селери через self, по умолчанию такого нет, поэтому надо передать в декоратор аргумент bind=True. 

Для примера напишем функцию, которая генерирует ошибку во время выполнения случайным образом, чтобы на какой-то из попыток она заработала. 

In [7]:
@app.task(bind=True, max_retries=10)
def random_error(self) -> str:
    import random
    try:
        if random.randint(1, 20) > 10:
            print(1/0)
    except ZeroDivisionError as exc:
        self.retry(exc=exc, countdown=3)
    print(1)
    return 'Ура наконец-то выполнилось'

Так же можно настраивать повтор так, как вам хочется. Вот опции, которые можно добавить:

- max_retries (максимальное число попыток)
- autoretry_for (кортеж исключений, которые будут автоматически вызывать retry)
- countdown (сколько до повторной попытки секунд)

Есть и другие, но эти исчерпывают большинство кейсов использования)

Давайте посмотрим, как это работает:

In [None]:
from celery_app import random_error

result = random_error.apply_async()
result.get()

Если задача не выполнилась после последней попытки, то вылетет исключение, но опять же с основным потоком ничего не случится - ошибка будет на воркере, он внесет данные, что ошибка была получена и забудет об этом.

In [None]:
# задача которая никогда не будет успешно выполнена после ретраев
from celery_app import generate_error

result = generate_error.apply_async()

In [None]:
result.get()

Еще более подробный гайд по повторному запуску вы найдете здесь: 
http://www.ines-panker.com/2020/10/29/retry-celery-tasks.html

### Состояния задачи

Задачи Celery всегда имеют состояние. Если задача завершила выполнение успешно, ее состоянием является SUCCESS. Если выполнение задачи приводит к исключению, ее состоянием является FAILURE. Celery имеет шесть встроенных состояний:

- PENDING (ожидание выполнения или неизвестный идентификатор задачи)

- STARTED (задача запущена)

- SUCCESS (задача выполнена успешно)

- FAILURE (выполнение задачи привело к исключению)

- RETRY (задача выполняется повторно)

- REVOKED (задача была отозвана)

Вместе со статусом задачи идут метаданные. Мы можем добавлять свои кастомные статусы и писать к ним кастомные метаданные и даже ставить встроенные состояния напрямую. Например, если мы имеем задачу, которая состоит из нескольких частей, выполняющихся последовательно, то мы можем сделать новое состояние **PROGRES**, которое будет обозначать, что задача в работе и добавить метаданные к уже имеющимся о том, на каком этапе задача.

Давайте напишем такую задачу, которая бы кастомизировала метаданные и состояния. Для этого используем метод **update_state**

In [8]:
@app.task(bind=True)
def long_task(self):
    import time
    for i in range(5):
        # что-то делает в этом месте 2 секунды
        time.sleep(2)
        self.update_state(state='PROGRESS', meta={"data": f"process {i} done"})

Для визуализации того, как меняются статусы у задачи, напишет функцию, которая их мониторит

In [None]:
from celery.result import AsyncResult
def monitor_task(task_id):
    task = AsyncResult(task_id, app=app)
    while task.state != 'SUCCESS' and task.state != 'FAILURE' and task.state != 'REVOKED':
        print(f"Task state: {task.state}, meta: {task.info}")
        time.sleep(2)
    print(f"Final Task state: {task.state}, result: {task.result}, meta: {task.info}")

А теперь запустим все.

In [None]:
from celery_app import long_task

result = long_task.delay()
# Мониторим задачу
monitor_task(result.id)

Как говорилось выше, мы так же можем использовать дефолтные состояния, но изменять в них метаданные

Напишем и промониторим задачу, которая будет падать и ставить состояние в FAILURE, также добавим туда кастомизацию из примера выше

In [9]:
import celery.states as states
import random


@app.task(bind=True)
def complex_task(self, iterations: int = 5):
    total_iterations = iterations
    for i in range(1, iterations + 1):
        time.sleep(2)
        self.update_state(
            state="PROGRES",
            meta={
                'current': i,
                'progress': int((i / total_iterations) * 100),
            }
        )
        if random.random() < 0.3: # Симуляция непредвиденной ошибки
            try:
                print(1 / 0)
            except ZeroDivisionError:
                self.update_state(
                    state=states.FAILURE,
                    meta={
                        'current': i,
                        'message': f"Failed at iteration {i}."
                    }
                )
    return {"status": "success", "iterations": iterations}

In [None]:
from celery_app import complex_task

result = complex_task.delay()
# Мониторим задачу
monitor_task(result.id)

print(result.get())

### Ограничение времени выполнения

Иногда подозрительно, что задача работает слишком долго, например, если мы загружаем фотографию. Если это происходит больше 2-3 минут, то точно что-то не так и надо прервать выполнение, чтобы другие задачи могли выполняться без задержек.

Как прервать задачу по таймауту в Celery? Надо просто указать параметры

1. soft_time_limit - мягкое ограничение, после которого, например, можно кинуть алерт, что что-то не так
2. time_limit - жесткое ограничение, после которого задача точно будет завершена со статусом FAILURE

Прокидывать эти параметры можно как в аргументы **apply_async**, так и в аргументы декоратора **task**

Давайте напишем задачу, которая бы точно падала по таймауту:

In [None]:
from celery.exceptions import SoftTimeLimitExceeded

@app.task(bind=True, time_limit=10, soft_time_limit=5)
def time_limit_task(self):
    try:
        time.sleep(6)
    except SoftTimeLimitExceeded:
        print("АААА ЧТО-ТО НЕ ТАК ПОМОГИТЕ СПАСИТЕ")
        self.update_state(state="LONG_PROGRES", meta={"detail": "таска выполняется слишком долго"})
        time.sleep(6)       

Запуск задачи и ошибка по таймауту

In [None]:
from celery_app import time_limit_task

result = time_limit_task.apply_async()
monitor_task(result.id)
result.get()

На этом все по значимым аспектам запуска задач

## 4. Настройка воркеров

В целом, та команда, которой мы запускали celery, она активирует воркера и подключается к брокеру и бэкенду. Мы можем кастомизировать запуск воркеров, а также можем их останавливать.

При запуске celery по умолчанию создается один воркер. Этот обработчик является главным процессом (supervisor process), который будет порождать дочерние процессы или потоки, которые в свою очередь будут выполнять задачи. По умолчанию главный обработчик будет создавать дочерние процессы, а не потоки, и он создаст столько одновременных дочерних процессов, сколько ядер у процессора. Главный процесс будет следить за тем, что происходит с задачами и процессами/потоками, но он не будет запускать сами задачи. Эта группа дочерних процессов или потоков, которая ожидает выполнения задач, называется пулом выполнения (execution pool) или пулом потоков (thread pool)


### Привязка воркера к очереди

Очередей можно создать несколько и назвать их тоже можно по разному. По умолчанию есть только одна такая очередь. Все обработчики принимают задачи из одной очереди. Но вы также можете указать несколько таких очередей и назначить конкретные обработчики на определенные очереди. Очередь по умолчанию называется celery.

Чтобы создать дополнительную очередь необходимо указать аргумент **-Q** при инициализации воркера, после чего указать название(я). Да, можно пихать задачи в разные очереди.

### Запуск копий воркера в нескольких потоках

Это регулируется флагом **--concurency**, дальше указывается число копий воркера.

### Запуск нескольких воркеров

Когда у нас есть разные задачи, например, мы хотим, чтобы селери собирал продуктовые метрики, рассылал на почту напоминалки и загружал картинки, то разумно разделить эти задачи, чтобы не перегружать оперативную память одного воркера. Тогда мы можем создать нескольких воркеров, чтобы каждый занимался своим делом.

Одна команда запускает один воркер, поэтому для запуска трех мы должны три раза прописать команду, задав для каждой свои параметры

Чтобы заименовать воркера мы используем аргумент **-n**, после имени воркера идет кастомизация его названия, подробнее можно почитать в документации https://docs.celeryq.dev/en/stable/userguide/workers.html

```shell
celery -A app.tasks.celery worker -Q queue_aviasales -n worker_aviasales@%h -l INFO --concurrency=8 -f celery.logs
& celery -A app.tasks.celery worker -Q queue_flightradar -n worker_flightradar@%h -l INFO --concurrency=1 -f celery.logs
& celery -A app.tasks.celery worker -Q queue_influx -n worker_influx@%h -l INFO --concurrency=1 -f celery.logs
```

### Окей, а как задача поймет, кто ее воркер и очередь?

Сами задачи не знают, какому воркеру они принадлежат, но они точно знают в какой они очереди. Если не сказано иного, то задача принадлежит главной очереди. Чтобы указать другую надо явно прописать это при объявлении задачи, задав в декораторе **task** аргумент **queue**.

In [11]:
@app.task(name="some_task", queue="some_queue")
def some_task():
    ...

### rate limit 

Еще одна крутая штука, которая позволит не перегружать воркер это **rate_limit**. Данный параметр так же настраивается как аргумент декоратора при объявлении задачи. Он задает максимальную частоту выполнения задачи за определенный период времени. Он не позволит воркеру взять слишком большое количество задач данного типа в единицу времени.

Так же его можно настроить, извне, указав название задачи и воркеров.

In [12]:
# первый способ
@app.task(rate_limit='1/s')
def high_load_task():
    ...

# второй способ
app.control.rate_limit('myapp.mytask', '200/m', destination=['celery@worker1.example.com'])

## 5. Интеграция селери в веб-приложение (пример)

### А ТЕПЕРЬ РЕАЛЬНЫЙ ПРИМЕР!!!!

Сейчас мы напишем простое приложение на FastAPI, в нем будет только одна ручка, которая отправляет реквест на сайт https://github.com/olezha223, будем собирать данные о том, какие у этого пользователя (меня) есть репозитории, после чего данные запишем в json.

Делать она это будет с помощью celery, пусть данные запишутся в csv файл

In [13]:
! pip install fastapi

Defaulting to user installation because normal site-packages is not writeable


In [None]:
from bs4 import BeautifulSoup
import requests


@app.task(bind=True, time_limit=120)
def parse_repositories(self, username: str):
    try:
        url = f"https://github.com/{username}"
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        repositories = soup.find_all("li", class_="repo-list-item")
        self.update_state(state="PROGRES", meta={"detail": "спарсили репо"})
        repo_list = []
        for repository in repositories:
            repo_list.append(repository.find("a").text)
            self.update_state(state="PROGRES", meta={"detail": f'добавили {repository.find("a").text} в список'})

        import json
        with open("repositories.json", "w", encoding="utf-8") as file:
            json.dump({username: repo_list}, file, indent=4, ensure_ascii=False)
        self.update_state(state="PROGRES", meta={"detail": f'добавили данные в файл'})
    except Exception as e:
        self.retry(exc=e, countdown=3)

In [14]:
from fastapi import FastAPI
from celery_app import parse_repositories

fast_api_app = FastAPI()

@fast_api_app.get("/get_repositories/{username}")
def get_repositories(username: str) -> str:
    task_id = parse_repositories.delay(username)
    return task_id.id

In [None]:
# давайте протестируем наш код

from fastapi.testclient import TestClient

client = TestClient(base_url="http://localhost:8000", app=fast_api_app)
response = client.get("/get_repositories/olezha223")
assert response.status_code == 200

monitor_task(response.json())

## 6. Мониторинг задач с Flower (докер контейнер + примеры как выглядит) 

Следить за исполнением задач - важная составляющая работы с ними. Для этого вы можете использовать Flower - это веб-инструмент мониторинга и администрирования Celery в режиме реального времени.

Сейчас мы обернем веб-приложение, брокер-сообщений, селери и этот инструмент в докер контейнеры и запустим, посмотрим, как выглядит интерфейс и какие данные мы можем там получить.

Для этой задачи мы создадим отдельную директорию flower и будем писать код в ней

In [None]:
# нам надо остановить контейнер, который сейчас ранит наш брокер сообщений
! docker-compose down

Теперь выполните в терминале:

```shell
cd flower
docker-compose up -d
```

Дальше вам необходимо открыть localhost:8603 и посмотреть, что там будет

Вы должны увидеть примерно вот такой интерфейс:

** картинка flower **


## 7. Запуск периодических задач

```shell
celery -A celery_app beat
```