Skip to content

O1b: Batched retry updates (follow-up to O1a) #25

@lesnik512

Description

@lesnik512

Problem

Sibling of #24 (O1a — batched terminal deletes). After O1a ships, OutboxSubscriber._flush_retry (faststream_outbox/subscriber/usecase.py:448-476) is the last per-row write left in the worker loop — every retry'd row is still one Postgres round-trip + BEGIN/COMMIT. Under workloads where retries are non-trivial (failing downstream, exponential backoff actively rescheduling), the retry path can dominate wall time on its own.

Retries are meaningfully harder to batch than deletes: every row carries its own delay_seconds, attempts_count, first_attempt_at, last_attempt_at — the single UPDATE … WHERE id IN (…) form doesn't fit.

Proposed design

UPDATE … FROM (VALUES …) AS v(id, token, delay, attempts, first_at, last_at) — Postgres' updatable-FROM form lets us carry per-row values inline and join on (id, token) for the lease guard.

New client method

# faststream_outbox/client.py

from sqlalchemy import BigInteger, DateTime, Float, Uuid, column, values

@dataclass(frozen=True)
class _RetryEntry:
    message_id: int
    acquired_token: uuid.UUID
    delay_seconds: float
    attempts_count: int
    first_attempt_at: _dt.datetime
    last_attempt_at: _dt.datetime

class OutboxClient(AbstractOutboxClient):
    async def mark_pending_batch_with_lease(
        self,
        conn: "AsyncConnection | None",
        entries: "Sequence[_RetryEntry]",
    ) -> set[int]:
        """
        Reschedule all entries in one round-trip. Returns the set of IDs whose lease
        still held (analogous to delete_batch_with_lease's return shape). The lease
        guard is preserved by joining on (id, acquired_token) so a row whose token
        changed underneath matches nothing.

        Per-row next_attempt_at is computed server-side as ``now() + delay`` exactly
        like the per-row form, so retry timing stays DB-clock-anchored.
        """
        if conn is None:
            msg = "OutboxClient.mark_pending_batch_with_lease requires a live AsyncConnection (got None)"
            raise TypeError(msg)
        if not entries:
            return set()
        t = self._table
        # values() construct: a VALUES list usable as a FROM target.
        v = values(
            column("id", BigInteger),
            column("token", Uuid),
            column("delay", Float),
            column("attempts", BigInteger),
            column("first_at", DateTime(timezone=True)),
            column("last_at", DateTime(timezone=True)),
            name="v",
        ).data([
            (e.message_id, e.acquired_token, max(0.0, e.delay_seconds),
             e.attempts_count, e.first_attempt_at, e.last_attempt_at)
            for e in entries
        ])
        stmt = (
            update(t)
            .values(
                next_attempt_at=func.now() + func.make_interval(
                    0, 0, 0, 0, 0, 0, v.c.delay,
                ),
                attempts_count=v.c.attempts,
                first_attempt_at=v.c.first_at,
                last_attempt_at=v.c.last_at,
                acquired_at=None,
                acquired_token=None,
            )
            .where(t.c.id == v.c.id, t.c.acquired_token == v.c.token)
            .returning(t.c.id)
        )
        async with conn.begin():
            result = await conn.execute(stmt)
        return {row[0] for row in result.all()}

AbstractOutboxClient gains the matching abstract method; FakeOutboxClient walks _rows in Python.

Subscriber integration

Mirror O1a's shape but for the retry path. New knobs (in addition to O1a's):

@dataclass(kw_only=True)
class OutboxSubscriberConfig(SubscriberUsecaseConfig):
    ...
    retry_batch_size: int = 1            # default 1 = no batching (back-compat)
    retry_flush_interval_ms: float = 100.0

_retry_buffer: list[_RetryEntry] + _maybe_flush_retry_batch parallel to O1a's _terminal_buffer + _maybe_flush_terminal_batch. _safe_flush for terminal=False constructs the entry from the row and appends; lease-lost reporting follows the same set[int]-diff pattern.

Non-obvious traps

SQLAlchemy values() parameter typing. The per-row delay, attempts, first_at, last_at carry through as bound parameters via the VALUES construct. Each column must be typed explicitly (Float, BigInteger, DateTime(timezone=True)) — without the timezone-aware DateTime, asyncpg silently converts naïve datetimes on the wire and the comparison breaks. Tz-aware enforcement at the API boundary (already true via OutboxInnerMessage._record_attempt (message.py:104-109) using _utcnow()) keeps this safe.

make_interval with a column reference. The per-row delay_seconds is a column on the VALUES alias, not a bind param — func.make_interval(0, 0, 0, 0, 0, 0, v.c.delay) is correct, but the existing per-row form uses bindparam("delay", type_=Float) (client.py:257). Test this form against real Postgres before assuming it composes.

Empty-batch fast path. entries=[] must return set() immediately — SQLAlchemy's values().data([]) is an error on most dialects.

Drift between fake and real. FakeOutboxClient.mark_pending_with_lease (testing.py) currently computes next_attempt_at client-side as _utcnow() + delay. The batch fake should do the same per entry; document that prod's server-side now() vs fake's _utcnow() is the same drift that already exists in the per-row path (intentional; see CLAUDE.md "Test broker" section on next_attempt_at recording in sync mode).

Edge cases handled

  • All entries lose their leases: set() returned; one event=lease_lost WARNING per entry.
  • Mixed lease-loss: diff against input set, exactly like O1a.
  • retry_batch_size=1: behaves identically to today's per-row path.
  • Mid-batch worker cancel: conn.begin() rolls back the whole batch; rows' leases expire and are reclaimed.
  • Test broker: fake walks _rows; sync dispatch_one flushes per row (batching is a loop-mode concern only — sync mode has one row at a time).

Test plan

Unit (tests/test_unit.py)

  • test_mark_pending_batch_with_lease_per_row_delays_respected — three entries with different delay_seconds, assert each row's next_attempt_at reflects its own delay (mock func.now() to a fixed value via the fake).
  • test_mark_pending_batch_with_lease_lease_loss_diff — three entries, one with wrong token, assert returned set has 2 ids and one event=lease_lost, phase=retry log.
  • test_retry_buffer_flushes_on_size_and_deadline — mirrors O1a's tests.
  • test_retry_batch_size_1_matches_per_row_semantics — parameterized regression.

Integration (tests/test_integration.py)

  • test_mark_pending_batch_with_lease_against_postgres — three rows with different delay_seconds, batch update, assert each row's next_attempt_at = now() + its delay (within a tolerance).
  • test_retry_batch_drain_under_failing_handler — handler always raises, max_attempts=10, retry_batch_size=20, publish 100 rows, assert all 100 reach terminal under the retry strategy and total wall time is tighter than the per-row baseline.

Files to change

File Change
faststream_outbox/client.py _RetryEntry dataclass; mark_pending_batch_with_lease on Abstract + real client
faststream_outbox/testing.py Fake implementation walking _rows
faststream_outbox/subscriber/config.py retry_batch_size, retry_flush_interval_ms fields
faststream_outbox/registrator.py + subscriber/factory.py Thread new kwargs
faststream_outbox/subscriber/usecase.py _retry_buffer, _maybe_flush_retry_batch, _safe_flush edits
Tests + docs as above

Verification

just lint
just test tests/test_unit.py tests/test_fake.py
just test

Carved out of plan2.md O1 (retry-update scope; deletes-batch tracked in #24).

Blocks on: #24 (O1a) — shares the buffer + drain scaffolding.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions