Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ The format of the schedule label is the following:
@broker.task(
schedule=[
{
"cron": "* * * * *", # type: str, either cron or time should be specified.
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None # type: str | timedelta | None, can be omitted.
"time": None # type: datetime | None, either cron or time should be specified.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
"schedule_id": "every_minute", # type: str | None, can be omitted.
}
]
)
Expand All @@ -55,6 +56,7 @@ Parameters:
- `args` - args to use, when invoking the task.
- `kwargs` - key-word arguments to use when invoking the task.
- `labels` - additional labels to use when invoking the task.
- `schedule_id` - unique identifier of the schedule. If not specified, a random uuid will be generated.

To enable this source, just add it to the list of sources:

Expand Down
5 changes: 5 additions & 0 deletions docs/extending-taskiq/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ Here's a minimal example of a schedule source:
@[code python](../examples/extending/schedule_source.py)

You can implement a schedule source that write schedules in the database and have delayed tasks in runtime.

::: info Cool tip!
You can also use `LabelScheduleSource` as a base class for your schedule source
if you want to parse schedules from task labels and don't want to implement logic for this from scratch.
:::
7 changes: 5 additions & 2 deletions taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ async def startup(self) -> None:
if "cron" not in schedule and "time" not in schedule:
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
schedule_id = uuid.uuid4().hex

task_labels = {k: v for k, v in task.labels.items() if k != "schedule"}

labels.update(task_labels)
schedule_id = schedule.get("schedule_id", uuid.uuid4().hex)

self.schedules[schedule_id] = ScheduledTask(
task_name=task_name,
Expand Down
60 changes: 0 additions & 60 deletions tests/schedule_sources/test_label_based.py

This file was deleted.

16 changes: 14 additions & 2 deletions tests/scheduler/test_label_based_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
[
pytest.param([{"cron": "* * * * *"}], id="cron"),
pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"),
pytest.param(
[{"time": datetime.now(pytz.UTC), "labels": {"foo": "bar"}}],
id="labels_inside_schedule",
),
pytest.param(
[{"cron": "*/1 * * * *", "schedule_id": "every_minute"}],
id="schedule_with_id",
),
],
)
async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None:
Expand All @@ -37,16 +45,20 @@ def task() -> None:
schedules = await source.get_schedules()
assert schedules == [
ScheduledTask(
schedule_id=schedules[0].schedule_id,
schedule_id=schedule_label[0].get("schedule_id", schedules[0].schedule_id),
cron=schedule_label[0].get("cron"),
time=schedule_label[0].get("time"),
task_name="test_task",
labels={"schedule": schedule_label},
labels=schedule_label[0].get("labels", {}),
args=[],
kwargs={},
),
]

# check that labels of tasks are not changed after startup and discovery process
task_from_broker = next(iter(broker.get_all_tasks().values()))
assert task_from_broker.labels == {"schedule": schedule_label}


@pytest.mark.anyio
async def test_label_discovery_no_cron() -> None:
Expand Down