Skip to content

feat: timer mechanics (activate_in/activate_at, timer_id dedup, cancel_timer)#5

Merged
lesnik512 merged 1 commit intomainfrom
feat/timers
May 7, 2026
Merged

feat: timer mechanics (activate_in/activate_at, timer_id dedup, cancel_timer)#5
lesnik512 merged 1 commit intomainfrom
feat/timers

Conversation

@lesnik512
Copy link
Copy Markdown
Member

Add scheduled / delayed delivery to OutboxBroker for parity with the faststream-redis-timers API surface:

  • publish(activate_in=timedelta) / publish(activate_at=datetime) set next_attempt_at so the row is invisible to fetch until the gate opens. publish_batch accepts the same params (applied to every row).
  • publish(timer_id="x") dedupes per (queue, timer_id) via pg_insert ON CONFLICT DO NOTHING, backed by a partial unique index on the outbox table. Re-publishing the same id is a silent no-op (returns None).
  • cancel_timer(queue, timer_id, session) deletes a not-yet-leased row; the acquired_token IS NULL guard preserves the lease invariant so in-flight handlers are never clobbered.
  • NOTIFY is skipped when next_attempt_at is in the future or when an ON CONFLICT made the insert a no-op — listeners can't act on either.

The fetch CTE already gated on next_attempt_at <= now(), so no subscriber-side change was needed. Coverage stays at 100%.

…l_timer)

Add scheduled / delayed delivery to OutboxBroker for parity with the
faststream-redis-timers API surface:

- publish(activate_in=timedelta) / publish(activate_at=datetime) set
  next_attempt_at so the row is invisible to fetch until the gate opens.
  publish_batch accepts the same params (applied to every row).
- publish(timer_id="x") dedupes per (queue, timer_id) via pg_insert
  ON CONFLICT DO NOTHING, backed by a partial unique index on the
  outbox table. Re-publishing the same id is a silent no-op (returns None).
- cancel_timer(queue, timer_id, session) deletes a not-yet-leased row;
  the acquired_token IS NULL guard preserves the lease invariant so
  in-flight handlers are never clobbered.
- NOTIFY is skipped when next_attempt_at is in the future or when an
  ON CONFLICT made the insert a no-op — listeners can't act on either.

The fetch CTE already gated on next_attempt_at <= now(), so no
subscriber-side change was needed. Coverage stays at 100%.
@lesnik512 lesnik512 self-assigned this May 7, 2026
@lesnik512 lesnik512 merged commit a6a1683 into main May 7, 2026
3 checks passed
@lesnik512 lesnik512 deleted the feat/timers branch May 7, 2026 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant