diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ffbf37b..85d8209 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,7 +42,7 @@ jobs: steps: - uses: actions/checkout@v5 - name: Set up Redis instance and Redis cluster - run: docker compose up -d + run: docker compose up -d --wait - name: Set up uv and enable cache id: setup-uv uses: astral-sh/setup-uv@v7 diff --git a/taskiq_redis/list_schedule_source.py b/taskiq_redis/list_schedule_source.py index 4c0f5e2..977a16d 100644 --- a/taskiq_redis/list_schedule_source.py +++ b/taskiq_redis/list_schedule_source.py @@ -85,6 +85,9 @@ def _get_cron_key(self) -> str: """Get the key for a cron-based schedule.""" return f"{self._prefix}:cron" + def _get_interval_key(self) -> str: + return f"{self._prefix}:interval" + def _get_data_key(self, schedule_id: str) -> str: """Get the key for a schedule data.""" return f"{self._prefix}:data:{schedule_id}" @@ -150,6 +153,8 @@ async def delete_schedule(self, schedule_id: str) -> None: elif schedule.time is not None: time_key = self._get_time_key(schedule.time) await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc] + elif schedule.interval: + await redis.lrem(self._get_interval_key(), 0, schedule_id) # type: ignore[misc] async def add_schedule(self, schedule: "ScheduledTask") -> None: """Add a schedule to the source.""" @@ -169,6 +174,11 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None: self._get_time_key(schedule.time), schedule.schedule_id, ) + elif schedule.interval: + await redis.rpush( # type: ignore[misc] + self._get_interval_key(), + schedule.schedule_id, + ) async def post_send(self, task: ScheduledTask) -> None: """Delete a task after it's completed.""" @@ -199,6 +209,10 @@ async def get_schedules(self) -> list["ScheduledTask"]: logger.debug("Got %d cron schedules", len(crons)) if crons: buffer.extend(crons) + intervals = await redis.lrange(self._get_interval_key(), 0, -1) # type: ignore[misc] + logger.debug("Got %d interval schedules", len(intervals)) + if intervals: + buffer.extend(intervals) timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc] logger.debug("Got %d timed schedules", len(timed)) if timed: diff --git a/tests/test_list_schedule_source.py b/tests/test_list_schedule_source.py index 1038fed..c21486b 100644 --- a/tests/test_list_schedule_source.py +++ b/tests/test_list_schedule_source.py @@ -27,6 +27,24 @@ async def test_schedule_cron(redis_url: str) -> None: assert scehdules == [schedule] +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_schedule_interval(redis_url: str) -> None: + """Test adding a cron schedule.""" + prefix = uuid.uuid4().hex + source = ListRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + interval=datetime.timedelta(seconds=5), + ) + await source.add_schedule(schedule) + scehdules = await source.get_schedules() + assert scehdules == [schedule] + + @pytest.mark.anyio @freeze_time("2025-01-01 00:00:00") async def test_schedule_from_past(redis_url: str) -> None: @@ -56,7 +74,7 @@ async def test_schedule_from_past(redis_url: str) -> None: @pytest.mark.anyio @freeze_time("2025-01-01 00:00:00") -async def test_schedule_removal(redis_url: str) -> None: +async def test_removal_time(redis_url: str) -> None: """Test adding a cron schedule.""" prefix = uuid.uuid4().hex source = ListRedisScheduleSource(redis_url, prefix=prefix) @@ -81,8 +99,31 @@ async def test_schedule_removal(redis_url: str) -> None: @pytest.mark.anyio @freeze_time("2025-01-01 00:00:00") -async def test_deletion(redis_url: str) -> None: - """Test adding a cron schedule.""" +async def test_removal_cron(redis_url: str) -> None: + """Test removing cron schedules.""" + prefix = uuid.uuid4().hex + source = ListRedisScheduleSource(redis_url, prefix=prefix) + schedule = ScheduledTask( + task_name="test_task", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + ) + await source.add_schedule(schedule) + # When running for the first time, the scheduler will get all the + # schedules that are in the past. + scehdules = await source.get_schedules() + assert scehdules == [schedule] + await source.delete_schedule(schedule.schedule_id) + scehdules = await source.get_schedules() + assert scehdules == [] + + +@pytest.mark.anyio +@freeze_time("2025-01-01 00:00:00") +async def test_removal_interval(redis_url: str) -> None: + """Test removing cron schedules.""" prefix = uuid.uuid4().hex source = ListRedisScheduleSource(redis_url, prefix=prefix) schedule = ScheduledTask( @@ -90,7 +131,7 @@ async def test_deletion(redis_url: str) -> None: labels={}, args=[], kwargs={}, - time=datetime.datetime.now(datetime.timezone.utc), + interval=datetime.timedelta(seconds=30), ) await source.add_schedule(schedule) # When running for the first time, the scheduler will get all the