Skip to content

Scheduler silently stops dispatching cron tasks if a send task hangs #626

@Rohit-Ekbote

Description

@Rohit-Ekbote

Scheduler silently stops dispatching cron tasks if a send task hangs

Summary

SchedulerLoop.run in taskiq/cli/scheduler/run.py skips cron ticks for any schedule_id whose previous send task is still in running_schedules. If a send task ever hangs (e.g., broker.kick blocks on a wedged Redis/Sentinel socket), the add_done_callback never fires, the entry stays in running_schedules forever, and every subsequent tick for that schedule is silently dropped — no error, no log, no recovery. The scheduler's main loop continues to iterate normally.

We hit this in production on taskiq==0.11.20 with taskiq-redis==1.0.x (ListQueueSentinelBroker) and verified the same logic is unchanged at HEAD (0.12.4).

Reproduction (conceptual)

@broker.task(schedule=[{"cron": "*/5 * * * *"}])
async def my_periodic_task() -> None:
    ...
  1. Scheduler tick fires → spawns send_task = loop.create_task(send(scheduler, source, task)).
  2. send awaits scheduler.on_readybroker.kicker().kiq(...)broker.kick(...).
  3. broker.kick hits a stalled connection (sentinel failover, network blip without TCP RST, kernel half-open socket).
  4. kick never returns. send_task never completes. The done_callback never runs. running_schedules[task.schedule_id] is permanently retained.
  5. On every subsequent main-loop iteration, the check at taskiq/cli/scheduler/run.py:337 (HEAD) returns False for this task:
    if is_ready_to_send and task.schedule_id not in running_schedules:
    so no new send_task is spawned. The cron is silently skipped forever until process restart.

Observed in production

taskiq scheduler ... process while wedged:

  • Event loop alive (epoll_wait, _run_once timer advances every minute per py-spy)
  • Zero log output
  • All scheduled tasks stopped firing simultaneously after one initial successful burst at startup
  • Workers consuming non-scheduled traffic normally (worker process is separate)
  • Recovery requires a manual pod restart

Root cause

taskiq/cli/scheduler/run.py (0.12.4 HEAD), inside SchedulerLoop.run:

running_schedules: dict[ScheduleId, asyncio.Task[Any]] = {}
...
for source, task_list in self.scheduled_tasks:
    for task in task_list:
        is_ready_to_send: bool = self._is_schedule_ready_to_send(task=task, now=now)
        if is_ready_to_send and task.schedule_id not in running_schedules:
            send_task = self._event_loop.create_task(
                send(self.scheduler, source, task),
                name=f"schedule_{task.schedule_id}",
            )
            running_schedules[task.schedule_id] = send_task
            send_task.add_done_callback(
                lambda task_future: running_schedules.pop(
                    task_future.get_name().removeprefix("schedule_"),
                ),
            )

Same pattern as 0.11.20's delayed_send / running_schedules. There is no timeout on send_task, no eviction policy, no observability when one runs past a threshold. The only path out of running_schedules is the done_callback, which requires send_task to complete.

Why user-level wrappers don't fix this from the outside

We tried subclassing TaskiqScheduler and overriding on_ready with asyncio.wait_for(super().on_ready(...), timeout=...). py-spy showed the asyncio loop was alive and iterating, but our wrapper never engaged for the hung tick — because the send_task itself is what the scheduler tracks, not the inner on_ready call. If wait_for inside on_ready is supposed to cancel a wedged broker kick, the cancellation requires the inner code to yield, which it doesn't reliably do for a stalled socket connection.

The only place a timeout works reliably is on the send_task itself — and that's owned by the scheduler.

Proposed fix

Add an optional send_timeout: float | None to SchedulerLoop.run (and a --send-timeout CLI argument on SchedulerArgs). When set, the spawned send_task is wrapped in asyncio.wait_for(send(...), timeout=send_timeout). On timeout, log a warning and return cleanly so the done_callback clears running_schedules and the next tick can re-fire.

Default: None (no timeout — preserves existing behavior, opt-in only).

PR follows. Happy to iterate on the API shape if maintainers prefer a different name / default / mechanism.

Environment

  • taskiq: 0.11.20 (in production); confirmed unchanged in HEAD 0.12.4
  • taskiq-redis: 1.0.x (ListQueueSentinelBroker)
  • Python: 3.12
  • Deployed in Kubernetes; trigger correlates with redis-sentinel pod restarts. One sentinel node going down for ~10s, the broker's existing socket goes stale silently, the next kick hangs, scheduler enters the silent-skip state.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions