Клиент для pg_tasks.
Предоставляет классы Tasks и Worker для создания рабочих процессов.
pip install pg_tasks_pythonЗапустим расширение и создадим таблицу для задачи:
CRAETE EXTENSION pg_tasks;
CREATE TABLE tasks.do_something(
LIKE tasks.template
INCLUDING DEFAULTS
INCLUDING IDENTITY
INCLUDING INDEXES,
payload text
);
SELECT tasks.register('tasks', 'do_something');Опишем модуль на python:
from pg_tasks import Task, Worker
import psycopg
class DoSomething(Task):
def __init__(self, conn_factory):
super().__init__()
self.conn_factory = conn_factory
def run(self, id, **kwargs):
# Представим себе, что здесь полезный код))
print('DO HARD WORK')
# Подтверждение выполнения задачи
conn = None
try:
conn = self.conn_factory()
self.finish(conn, task_id=id)
finally:
if conn is not None:
conn.close()
# Композит
do_something = DoSomething(
lambda: psycopg.connect('dbname=test'),
)
worker = Worker('dbname=test')
worker.add(do_something)
worker.run()Базовый класс для задач. Метод run должен быть переопределен, и содержать
в себе саму задачу. Этот метод будет вызван при получении задач из БД.
Метод run принимает строку из БД в виде **kwargs. Все столбцы, описанные
в соответствующей таблице, будут переданы в задачу.
После успешного завершения бизнес-логики задачу необходимо пометить
как завершенную методом .finish(), передав в него id задачи
и время начала задачи. В большинстве случаев достаточно передать created_at
из kwargs.
В случае, когда задачу выполнить невозможно, и нужно повторить ее позже
(например, задача должна обратиться к внешнему сервису,
который сейчас недоступен), нужно вызвать метод retry, передав в него
id задачи и, опционально, время, после которого задача должна быть выполнена.
В случае, когда задачу выполнить невозможно, нужно выполнить метод cancel,
передав в него id задачи. Такая задача не будет доступна для повторения
другим воркерам, пока не будет сброшена.
Метод reset используется для сброса задач, принимает id задачи. Нужен,
в основном, для ручного перезапуска задач при авариях.
Метод clean принимает дату и время, и вычищает все задачи, успешно
выполненные до указанной даты.
Все методы принимают первым обязательным аргументом подключение к БД, через которое задача будет работать с БД.
В случае, когда задача работает в транзакции, крайне рекомендуется после завершения бизнес-логики оперировать задачей (подтверждать, отменять и т.д.) в той же транзакции.
Также в методы finish, cancel, и retry можно передать комментарий
к задаче в поле comment в виде строки, комментарий будет сохранен в БД.
Метод acquire нужен для вызова из Worker, напрямую он не используется.
Worker имеет 3 метода - add, remove и run.
Метод add используется для добавления новых типов задач.
Вторым аргументом можно задать количество рабочих потоков, по умолчанию - 1.
Если воркер уже запущен, то потоки будут запущены сразу, если нет - запуск
будет отложен до запуска воркера.
Метод remove используется для остановки определенного типа задач.
Метод run запускает worker. Метод блокирующий,
содержит в себе бесконечный цикл.
Внутри run делает несколько вещей.
Для каждого типа задачи запускается один или несколько рабочих потоков. Каждый рабочий поток циклически запрашивает пачку задач и последовательно выполняет их. Если получена пустая пачка, значит, задач к выполнению сейчас нет, и рабочий поток встает на паузу в ожидании уведомления о новой задаче. Если уведомление не приходит в интервал, заданный в задаче, то рабочий поток начинает цикл заново.
Также Worker создает поток-слушатель. Слушатель получает уведомления в канале pg_tasks, и уведомляет рабочие потоки о них. Также слушатель регулярно пингует DB, когда уведомлений нет больше определенного времени.