From db06ba1ba97017ea7746bb21f78d96f0c9d4b763 Mon Sep 17 00:00:00 2001 From: Anfimov Dima Date: Wed, 22 Oct 2025 21:19:33 +0200 Subject: [PATCH 1/2] fix: recursive dict in labels during schedule discovery in LabelScheduleSource --- taskiq/schedule_sources/label_based.py | 7 ++- tests/schedule_sources/test_label_based.py | 60 ---------------------- tests/scheduler/test_label_based_sched.py | 16 +++++- 3 files changed, 19 insertions(+), 64 deletions(-) delete mode 100644 tests/schedule_sources/test_label_based.py diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index 94fd42d..4765408 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -46,8 +46,11 @@ async def startup(self) -> None: if "cron" not in schedule and "time" not in schedule: continue labels = schedule.get("labels", {}) - labels.update(task.labels) - schedule_id = uuid.uuid4().hex + + task_labels = {k: v for k, v in task.labels.items() if k != "schedule"} + + labels.update(task_labels) + schedule_id = schedule.get("schedule_id", uuid.uuid4().hex) self.schedules[schedule_id] = ScheduledTask( task_name=task_name, diff --git a/tests/schedule_sources/test_label_based.py b/tests/schedule_sources/test_label_based.py deleted file mode 100644 index fa621b0..0000000 --- a/tests/schedule_sources/test_label_based.py +++ /dev/null @@ -1,60 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, List - -import pytest -import pytz - -from taskiq.brokers.inmemory_broker import InMemoryBroker -from taskiq.schedule_sources.label_based import LabelScheduleSource -from taskiq.scheduler.scheduled_task import ScheduledTask - - -@pytest.mark.anyio -@pytest.mark.parametrize( - "schedule_label", - [ - pytest.param([{"cron": "* * * * *"}], id="cron"), - pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"), - ], -) -async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None: - broker = InMemoryBroker() - - @broker.task( - task_name="test_task", - schedule=schedule_label, - ) - def task() -> None: - pass - - source = LabelScheduleSource(broker) - await source.startup() - schedules = await source.get_schedules() - assert schedules == [ - ScheduledTask( - schedule_id=schedules[0].schedule_id, - cron=schedule_label[0].get("cron"), - time=schedule_label[0].get("time"), - task_name="test_task", - labels={"schedule": schedule_label}, - args=[], - kwargs={}, - ), - ] - - -@pytest.mark.anyio -async def test_label_discovery_no_cron() -> None: - broker = InMemoryBroker() - - @broker.task( - task_name="test_task", - schedule=[{"args": ["* * * * *"]}], - ) - def task() -> None: - pass - - source = LabelScheduleSource(broker) - await source.startup() - schedules = await source.get_schedules() - assert schedules == [] diff --git a/tests/scheduler/test_label_based_sched.py b/tests/scheduler/test_label_based_sched.py index 506990b..6eddc94 100644 --- a/tests/scheduler/test_label_based_sched.py +++ b/tests/scheduler/test_label_based_sched.py @@ -20,6 +20,14 @@ [ pytest.param([{"cron": "* * * * *"}], id="cron"), pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"), + pytest.param( + [{"time": datetime.now(pytz.UTC), "labels": {"foo": "bar"}}], + id="labels_inside_schedule", + ), + pytest.param( + [{"cron": "*/1 * * * *", "schedule_id": "every_minute"}], + id="schedule_with_id", + ), ], ) async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None: @@ -37,16 +45,20 @@ def task() -> None: schedules = await source.get_schedules() assert schedules == [ ScheduledTask( - schedule_id=schedules[0].schedule_id, + schedule_id=schedule_label[0].get("schedule_id", schedules[0].schedule_id), cron=schedule_label[0].get("cron"), time=schedule_label[0].get("time"), task_name="test_task", - labels={"schedule": schedule_label}, + labels=schedule_label[0].get("labels", {}), args=[], kwargs={}, ), ] + # check that labels of tasks are not changed after startup and discovery process + task_from_broker = next(iter(broker.get_all_tasks().values())) + assert task_from_broker.labels == {"schedule": schedule_label} + @pytest.mark.anyio async def test_label_discovery_no_cron() -> None: From a74fc75f1ab79b6cbea1d1d3ae47d96163205c9b Mon Sep 17 00:00:00 2001 From: Anfimov Dima Date: Wed, 22 Oct 2025 22:07:30 +0200 Subject: [PATCH 2/2] docs: add schedule_id as a parameter for schedule in LabelScheduleSource usage example --- docs/available-components/schedule-sources.md | 4 +++- docs/extending-taskiq/schedule-sources.md | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md index 4502ac0..c391048 100644 --- a/docs/available-components/schedule-sources.md +++ b/docs/available-components/schedule-sources.md @@ -34,12 +34,13 @@ The format of the schedule label is the following: @broker.task( schedule=[ { - "cron": "* * * * *", # type: str, either cron or time should be specified. + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. "cron_offset": None # type: str | timedelta | None, can be omitted. "time": None # type: datetime | None, either cron or time should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. "labels": {}, # type: Dict[str, Any] | None, can be omitted. + "schedule_id": "every_minute", # type: str | None, can be omitted. } ] ) @@ -55,6 +56,7 @@ Parameters: - `args` - args to use, when invoking the task. - `kwargs` - key-word arguments to use when invoking the task. - `labels` - additional labels to use when invoking the task. +- `schedule_id` - unique identifier of the schedule. If not specified, a random uuid will be generated. To enable this source, just add it to the list of sources: diff --git a/docs/extending-taskiq/schedule-sources.md b/docs/extending-taskiq/schedule-sources.md index c4a130a..3d28cbd 100644 --- a/docs/extending-taskiq/schedule-sources.md +++ b/docs/extending-taskiq/schedule-sources.md @@ -12,3 +12,8 @@ Here's a minimal example of a schedule source: @[code python](../examples/extending/schedule_source.py) You can implement a schedule source that write schedules in the database and have delayed tasks in runtime. + +::: info Cool tip! +You can also use `LabelScheduleSource` as a base class for your schedule source +if you want to parse schedules from task labels and don't want to implement logic for this from scratch. +:::