Skip to content

Commit

Permalink
Fix inconsistent beat scheduling and compatibility with db scheduler (#…
Browse files Browse the repository at this point in the history
…10185) (#10196)

This fixes the following bugs:
- `tick()` could decide to never schedule anything else than `send-sale-toggle-notifications` if `send-sale-toggle-notifications` doesn't return `is_due = False` (stuck forever until beat restart or a `is_due = True`)
- `tick()` was sometimes scheduling other schedulers such as observability to be ran every 5m instead of every 20s
- `is_due()` from `send-sale-toggle-notifications` was being invoked every 5s on django-celery-beat instead of every 60s
- `send-sale-toggle-notifications` would crash on django-celery-beat with `Cannot convert schedule type <saleor.core.schedules.sale_webhook_schedule object at 0x7fabfdaacb20> to model`

This fixes both the `shelve` and the database backends. Users should be able to add compatibility to other Celery beat backends using the same flow.

Tickets are still pending creation to notify Celery about the issues.
  • Loading branch information
NyanKiyoshi committed Jul 19, 2022
1 parent 5aab679 commit dbc821b
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 7 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

All notable, unreleased changes to this project will be documented in this file. For the released changes, please visit the [Releases](https://github.com/mirumee/saleor/releases) page.

# 3.5.0 [Unreleased]
# 3.5.1 [Unreleased]
- Fix inconsistent beat scheduling and compatibility with db scheduler - #10185 by @NyanKiyoshi<br/>
This fixes the following bugs:
- `tick()` could decide to never schedule anything else than `send-sale-toggle-notifications` if `send-sale-toggle-notifications` doesn't return `is_due = False` (stuck forever until beat restart or a `is_due = True`)
- `tick()` was sometimes scheduling other schedulers such as observability to be ran every 5m instead of every 20s
- `is_due()` from `send-sale-toggle-notifications` was being invoked every 5s on django-celery-beat instead of every 60s
- `send-sale-toggle-notifications` would crash on django-celery-beat with `Cannot convert schedule type <saleor.core.schedules.sale_webhook_schedule object at 0x7fabfdaacb20> to model`

Usage:
- Database backend: `celery --app saleor.celeryconf:app beat --scheduler saleor.schedulers.schedulers.DatabaseScheduler`
- Shelve backend: `celery --app saleor.celeryconf:app beat --scheduler saleor.schedulers.schedulers.PersistentScheduler`

# 3.5.0 [Released]

### Other changes
- Fix inaccurate tax calculations - #9799 by @IKarbowiak
Expand Down
15 changes: 12 additions & 3 deletions saleor/core/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from datetime import datetime

import pytz
from celery.schedules import BaseSchedule
from celery.utils.time import maybe_timedelta, remaining
from django.db.models import F, Q

from ..schedulers.customschedule import CustomSchedule

schedstate = namedtuple("schedstate", ("is_due", "next"))


class sale_webhook_schedule(BaseSchedule):
class sale_webhook_schedule(CustomSchedule):
"""Schedule for sale webhook periodic task.
The lowercase with an underscore is used for the name as all celery schedules
Expand All @@ -28,7 +29,12 @@ class sale_webhook_schedule(BaseSchedule):
def __init__(self, initial_timedelta=60, nowfun=None, app=None):
self.initial_timedelta = maybe_timedelta(initial_timedelta)
self.next_run = self.initial_timedelta
super().__init__(nowfun=nowfun, app=app)
super().__init__(
schedule=self,
nowfun=nowfun,
app=app,
import_path="saleor.core.schedules.initiated_sale_webhook_schedule",
)

def remaining_estimate(self, last_run_at):
"""Estimate of next run time.
Expand Down Expand Up @@ -101,3 +107,6 @@ def is_due(self, last_run_at):

self.next_run = min((next_upcoming_date - now), self.initial_timedelta)
return schedstate(is_due, self.next_run.total_seconds())


initiated_sale_webhook_schedule = sale_webhook_schedule()
Empty file added saleor/schedulers/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions saleor/schedulers/customschedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from celery import schedules


class CustomSchedule(schedules.BaseSchedule):
def __init__(
self,
import_path: str,
schedule: schedules.BaseSchedule,
nowfun=None,
app=None,
):
super().__init__(nowfun=nowfun, app=app)
self.schedule = schedule
self.import_path = import_path
if not import_path:
raise ValueError("Missing import path")

def remaining_estimate(self, last_run_at):
return self.schedule.remaining_estimate(last_run_at)

def is_due(self, last_run_at):
return self.schedule.is_due(last_run_at)
86 changes: 86 additions & 0 deletions saleor/schedulers/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Generated by Django 3.2.14 on 2022-07-18 07:44

from django.apps.registry import Apps
from django.db import migrations, models
import django.db.models.deletion


def delete_django_celery_beat_data(apps: Apps, _schema_editor):
"""Wipe all exiting data from django-celery-beat in db.
We need to do this otherwise we may get unique errors when trying to recreate
the tasks using the new models.
'PeriodicTasks' models doesn't need to be wiped as it doesn't have any relations.
"""
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.all().delete()


class Migration(migrations.Migration):

initial = True

dependencies = [
("django_celery_beat", "0016_alter_crontabschedule_timezone"),
]

operations = [
migrations.RunPython(delete_django_celery_beat_data, migrations.RunPython.noop),
migrations.CreateModel(
name="CustomSchedule",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"schedule_import_path",
models.CharField(
help_text=(
"The python import path where the Celery scheduler "
"is defined at"
),
max_length=255,
unique=True,
),
),
],
),
migrations.CreateModel(
name="CustomPeriodicTask",
fields=[
(
"periodictask_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="django_celery_beat.periodictask",
),
),
(
"custom",
models.ForeignKey(
blank=True,
help_text=(
"Custom Schedule to run the task on. Set only one "
"schedule type, leave the others null."
),
null=True,
on_delete=django.db.models.deletion.CASCADE,
to="schedulers.customschedule",
verbose_name="Custom Schedule",
),
),
],
bases=("django_celery_beat.periodictask",),
),
]
Empty file.
125 changes: 125 additions & 0 deletions saleor/schedulers/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import importlib
from typing import Dict

from celery.schedules import BaseSchedule
from django.core.exceptions import SuspiciousOperation, ValidationError
from django.db import models
from django.db.models import signals
from django_celery_beat import models as base_models

from . import customschedule


class CustomSchedule(models.Model):
"""Defines the db model storing the details of a custom Celery beat schedulers.
This model keeps track of the Python import path of the custom Celery beat scheduler
(class MyCustomScheduler(celery.schedules.BaseScheduler)).
Then uses the import path to invoke the custom scheduler when the time is due
to invoke it.
Import path should be pointing to the initialized object (variable), like so:
>>> # ./my_pkg/scheduler.py
>>> class MyScheduler(BaseSchedule):
... # Do something
... pass
...
>>> my_scheduler = MyScheduler()
>>> import_path = "my_pkg.scheduler.my_scheduler"
"""

no_changes = False

CACHED_SCHEDULES: Dict[str, BaseSchedule] = {}
schedule_import_path = models.CharField(
max_length=255,
help_text="The python import path where the Celery scheduler is defined at",
unique=True,
)

@property
def schedule(self):
"""Return the custom Celery scheduler from cache or from the import path."""
obj = self.CACHED_SCHEDULES.get(self.schedule_import_path)
if obj is None:
module_path, class_name = self.schedule_import_path.rsplit(".", 1)
module = importlib.import_module(module_path)
obj = getattr(module, class_name)
if not isinstance(obj, BaseSchedule):
raise SuspiciousOperation(
f"Expected type of {self.schedule_import_path!r} to be inheriting "
f"from BaseScheduler but found: "
f"{type(obj)!r} ({obj.__class__.__bases__!r})",
)
self.CACHED_SCHEDULES[module_path] = obj
return obj

@classmethod
def from_schedule(cls, schedule: customschedule.CustomSchedule):
spec = {
"schedule_import_path": schedule.import_path,
}
try:
return cls.objects.get(**spec)
except cls.DoesNotExist:
return cls(**spec)

def __str__(self):
return f"{self.schedule_import_path=}"


class CustomPeriodicTask(base_models.PeriodicTask):
no_changes = False

custom = models.ForeignKey(
CustomSchedule,
on_delete=models.CASCADE,
null=True,
blank=True,
verbose_name="Custom Schedule",
help_text=(
"Custom Schedule to run the task on. "
"Set only one schedule type, leave the others null."
),
)

def validate_unique(self, *args, **kwargs):
models.Model.validate_unique(self, *args, **kwargs)

# Schedule types list is hard-coded in the super-method
schedule_types = ["interval", "crontab", "solar", "clocked", "custom"]
selected_schedule_types = [s for s in schedule_types if getattr(self, s)]

if len(selected_schedule_types) == 0:
raise ValidationError(
"One of clocked, interval, crontab, solar, or custom must be set."
)

err_msg = "Only one of clocked, interval, crontab, solar, or custom must be set"
if len(selected_schedule_types) > 1:
error_info = {}
for selected_schedule_type in selected_schedule_types:
error_info[selected_schedule_type] = [err_msg]
raise ValidationError(error_info)

# clocked must be one off task
if self.clocked and not self.one_off:
err_msg = "clocked must be one off, one_off must set True"
raise ValidationError(err_msg)

@property
def schedule(self):
if self.custom:
return self.custom.schedule
return super().schedule


# The hooks are needed by django-celery-beat in order to detect other Python modules
# dynamically changing the model data
# CustomPeriodicTask
signals.pre_delete.connect(base_models.PeriodicTasks.changed, sender=CustomPeriodicTask)
signals.pre_save.connect(base_models.PeriodicTasks.changed, sender=CustomPeriodicTask)

# CustomSchedule
signals.pre_delete.connect(base_models.PeriodicTasks.changed, sender=CustomSchedule)
signals.pre_save.connect(base_models.PeriodicTasks.changed, sender=CustomSchedule)

0 comments on commit dbc821b

Please sign in to comment.