Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
steps:
- uses: actions/checkout@v5
- name: Set up Redis instance and Redis cluster
run: docker compose up -d
run: docker compose up -d --wait
- name: Set up uv and enable cache
id: setup-uv
uses: astral-sh/setup-uv@v7
Expand Down
14 changes: 14 additions & 0 deletions taskiq_redis/list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def _get_cron_key(self) -> str:
"""Get the key for a cron-based schedule."""
return f"{self._prefix}:cron"

def _get_interval_key(self) -> str:
return f"{self._prefix}:interval"

def _get_data_key(self, schedule_id: str) -> str:
"""Get the key for a schedule data."""
return f"{self._prefix}:data:{schedule_id}"
Expand Down Expand Up @@ -150,6 +153,8 @@ async def delete_schedule(self, schedule_id: str) -> None:
elif schedule.time is not None:
time_key = self._get_time_key(schedule.time)
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]
elif schedule.interval:
await redis.lrem(self._get_interval_key(), 0, schedule_id) # type: ignore[misc]

async def add_schedule(self, schedule: "ScheduledTask") -> None:
"""Add a schedule to the source."""
Expand All @@ -169,6 +174,11 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None:
self._get_time_key(schedule.time),
schedule.schedule_id,
)
elif schedule.interval:
await redis.rpush( # type: ignore[misc]
self._get_interval_key(),
schedule.schedule_id,
)

async def post_send(self, task: ScheduledTask) -> None:
"""Delete a task after it's completed."""
Expand Down Expand Up @@ -199,6 +209,10 @@ async def get_schedules(self) -> list["ScheduledTask"]:
logger.debug("Got %d cron schedules", len(crons))
if crons:
buffer.extend(crons)
intervals = await redis.lrange(self._get_interval_key(), 0, -1) # type: ignore[misc]
logger.debug("Got %d interval schedules", len(intervals))
if intervals:
buffer.extend(intervals)
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc]
logger.debug("Got %d timed schedules", len(timed))
if timed:
Expand Down
49 changes: 45 additions & 4 deletions tests/test_list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ async def test_schedule_cron(redis_url: str) -> None:
assert scehdules == [schedule]


@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_interval(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
interval=datetime.timedelta(seconds=5),
)
await source.add_schedule(schedule)
scehdules = await source.get_schedules()
assert scehdules == [schedule]


@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_from_past(redis_url: str) -> None:
Expand Down Expand Up @@ -56,7 +74,7 @@ async def test_schedule_from_past(redis_url: str) -> None:

@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_removal(redis_url: str) -> None:
async def test_removal_time(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
Expand All @@ -81,16 +99,39 @@ async def test_schedule_removal(redis_url: str) -> None:

@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_deletion(redis_url: str) -> None:
"""Test adding a cron schedule."""
async def test_removal_cron(redis_url: str) -> None:
"""Test removing cron schedules."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
cron="* * * * *",
)
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
# schedules that are in the past.
scehdules = await source.get_schedules()
assert scehdules == [schedule]
await source.delete_schedule(schedule.schedule_id)
scehdules = await source.get_schedules()
assert scehdules == []


@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_removal_interval(redis_url: str) -> None:
"""Test removing cron schedules."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
time=datetime.datetime.now(datetime.timezone.utc),
interval=datetime.timedelta(seconds=30),
)
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
Expand Down