Exactly-once delivery with per-sink independent checkpoints#62
Merged
Conversation
Kafka transactional producer (begin/commit per batch), NATS Nats-Msg-Id server-side dedup, Redis idempotency_key for consumer-side dedup. Per-sink checkpoints: each sink advances independently, source replays from min(sink checkpoints). Legacy checkpoint fallback for seamless migration. Source::compare_checkpoints trait method for correct ordering (MySQL file+pos, Postgres LSN, Turso change_id). Coordinator: policy gate before checkpoint commits, parallel per-sink commits via join_all, SinkError::Fatal for producer fencing. Benchmarks (tuned, single dev machine): - MySQL: 151K events/s (at-least-once) / 134K (exactly-once) — 11% overhead - Postgres: 57K events/s (at-least-once) / 53K (exactly-once) — 7% overhead - Default max_bytes bumped 3MB → 16MB to prevent batch byte-capping Chaos: exactly-once crash recovery scenario, configurable drain target/ writers/timeout/max_bytes, effective config dump before each run. Infra: Docker Compose profiles (base/mysql-infra/pg-infra/kafka-infra), Loki+Promtail log aggregation, Grafana dashboard with template variables (instance/pipeline/source/sink/processor), per-sink checkpoint and transaction commit/abort panels. Docs: updated performance guide, Kafka/NATS/Redis sink references, per-sink checkpoint architecture, delivery guarantee tiers.
recv_many blocked indefinitely when no events were available, preventing the select! loop's ticker branch from flushing the partial batch. Wrap recv_many in tokio::time::timeout(tick_ms) so control returns to the timer when the source goes idle. Adds regression test: test_partial_batch_flushed_by_timer sends 3 events to a coordinator with max_events=10000, keeps the channel open, and asserts the timer flushes the partial batch within max_ms.
Update pipeline_e2e bench to register commit_fn per sink (new API). Remove needless borrow in chaos harness.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
exactly_once: true) —begin_transaction/commit_transactionper batch, producer fencing detection (SinkError::Fatal),transaction.timeout.ms=60000Nats-Msg-Idheader on every message for server-side dedup withinduplicate_windowidempotency_keyfield in every XADD payload for consumer-side dedupSource::compare_checkpoints— required trait method for correct checkpoint ordering (MySQL file+pos, Postgres LSN→u64, Turso change_id)join_all, partial batch flush fix (recv_manytimeout)max_bytesbumped 3MB → 16MB — old default silently capped batches, causing disproportionate exactly-once overheadBenchmarks (tuned, single dev machine, Docker containers)
Chaos & Testing
--scenario exactly-oncecrash recovery withread_committedconsumer verification--drain-target,--drain-writers,--drain-timeout,--drain-max-bytes,--exactly-onceInfrastructure
base,mysql-infra,pg-infra,kafka-infra(start/stop DeltaForge without touching infra)deltaforge_sink_txn_commits_total,deltaforge_sink_txn_aborts_total, per-sink checkpoint status/age gaugesDocs
max_bytestuning guidanceNats-Msg-Id,duplicate_window)idempotency_keyfield)Test plan
cargo test --workspace --lib— 365+ tests passcargo clippy --all-targets --all-features -- -D warnings— cleancargo test -p sinks --test kafka_sink_tests -- --include-ignored— 4 new EO tests pass (requires Docker)--scenario exactly-once --source mysqlchaos crash recovery--scenario all --source mysqlregression (existing scenarios still pass)Checklist
cargo test)cargo fmt)