Skip to content

feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40

Open
endrju19 wants to merge 2 commits intomainfrom
feature/kojak-73-kafka-deliver-batch
Open

feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40
endrju19 wants to merge 2 commits intomainfrom
feature/kojak-73-kafka-deliver-batch

Conversation

@endrju19
Copy link
Copy Markdown
Collaborator

@endrju19 endrju19 commented May 4, 2026

Summary

Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern. Replaces N sequential producer.send().get() round-trips with one batched network round-trip via producer.flush().

KOJAK-73 — Kafka deliverBatch
Builds on KOJAK-72 (deliverBatch interface).

Headline numbers

Smoke benchmark vs KOJAK-68 baseline (same hardware: MacBook M3 Max, JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, fork=1, warmup=1, iter=2, warmup=10s, measurement=15s):

batchSize Baseline This PR Improvement
10 ~109 msg/s ~1,468 msg/s 13.5×
50 ~115 msg/s ~3,731 msg/s 32.3×
100 ~115 msg/s ~4,717 msg/s 41.0×

batchSize is now load-bearing. Pre-KOJAK-73 throughput was flat across batchSize values — the bottleneck was per-record blocking producer.send().get(). Post-KOJAK-73 throughput scales with batchSize, proving Kafka's internal record batching is being exploited.

Full results + interpretation: benchmarks/results-postopt-kojak-73.md.

Implementation

override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
    if (entries.isEmpty()) return emptyList()

    // Phase 1: fire all (non-blocking, records queue in producer buffer)
    val inflight = entries.map { it to fireOne(it) }

    // Phase 2: flush — bypasses linger.ms, one batched RTT for all in-flight
    try { producer.flush() } catch (e: InterruptException) {
        Thread.currentThread().interrupt()
    }

    // Phase 3: collect — Future.get() is non-blocking after flush
    return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) }
}

SendOutcome sealed type distinguishes synchronous send exceptions (e.g., BufferExhaustedException, SerializationException) from in-flight futures — one failing send never aborts the batch.

deliver() refactored to share buildRecord / classifyException helpers with deliverBatch — no behavior change for single-entry path.

Test coverage

New unit tests in KafkaMessageDelivererBatchTest:

  • Empty input → empty output, producer untouched
  • All-success preserves input order
  • Single flush() call (verified via flush counter override)
  • Synchronous send exception → per-entry classification (Permanent + Retriable variants)
  • Future-based async exception (via MockProducer override that drives completeNext/errorNext from inside flush)

Integration tests in okapi-integration-tests continue to pass with real Kafka.

Sublinear scaling: KOJAK-75 motivation

Going from batchSize=50 → 100 gives 32× → 41× (only ~26% more, not 2×). At larger batches, the N individual updateAfterProcessing calls become significant relative to the now-fast Kafka path. This is exactly what KOJAK-75 (batch UPDATE via executeBatch) addresses — it'll be the next big lever once HTTP deliverBatch (KOJAK-74) lands.

Test plan

  • ./gradlew test ktlintCheck -x :okapi-integration-tests:test — all unit tests + ktlint pass
  • ./gradlew :okapi-integration-tests:test — full Testcontainers suite (Postgres + MySQL + Kafka + WireMock) passes
  • ./gradlew :okapi-benchmarks:jmhJar + smoke Kafka throughput run — captured numbers above

endrju19 added 2 commits May 4, 2026 11:45
Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern.
Replaces N sequential producer.send().get() round-trips per batch with one
batched network round-trip:

  Phase 1 (fire)   producer.send() per entry — non-blocking, records queue
                   in producer's internal buffer
  Phase 2 (flush)  one producer.flush() — bypasses linger.ms, drains all
                   in-flight records to broker in one batched RTT
  Phase 3 (await)  Future.get() per entry — non-blocking after flush, just
                   reads settled completion

Per-entry classification preserved via SendOutcome sealed type — synchronous
exceptions in send() (BufferExhaustedException, SerializationException) are
caught individually so one failing send never aborts the batch. flush()
InterruptException restores the interrupt flag and lets remaining futures
surface their own outcome.

deliver() refactored to share buildRecord/classifyException helpers with
deliverBatch — no behavior change for single-entry path.

Benchmark results vs KOJAK-68 baseline (smoke run, MacBook M3 Max,
Postgres 16 + Kafka 3.8.1 in Testcontainers):

  batchSize  baseline    KOJAK-73    improvement
  10         ~109 msg/s  ~1,468 msg/s  13.5x
  50         ~115 msg/s  ~3,731 msg/s  32.3x
  100        ~115 msg/s  ~4,717 msg/s  41.0x

Throughput now SCALES with batchSize (was flat ~115 msg/s pre-KOJAK-73),
proving Kafka's internal record batching is being exploited. Sublinear
scaling 50 -> 100 indicates DB UPDATE per entry is becoming the next
bottleneck — exactly what KOJAK-75 (batch UPDATE via executeBatch)
addresses next.

Tests:
- KafkaMessageDelivererBatchTest covers empty input, all-success ordering,
  flush-counted-once verification, synchronous Permanent + Retriable send
  exceptions, and future-based async exception (driven via MockProducer
  override that completeNext/errorNext per position inside flush)
- Existing KafkaMessageDelivererTest unchanged — single-entry path still
  works identically
- Integration tests (real Kafka via Testcontainers) pass

Documents results in benchmarks/results-postopt-kojak-73.md and bumps
README headline numbers to reflect new Kafka throughput.
…s (KOJAK-73)

Addresses cross-cutting findings from PR #40 review (5 agents converged on
the interrupt/flush error handling).

Critical fix — flush() / interrupt path:
- Broaden flush() exception handling: previously only InterruptException was
  caught, so a fatal KafkaException or IllegalStateException (e.g. closed
  producer) would propagate out of deliverBatch and abandon all in-flight
  futures uncollected, contradicting the documented per-entry classification
  contract. Now any exception from flush() is logged and swallowed; awaitOne
  classifies each entry from its own future state.
- Add explicit InterruptedException handling in awaitOne and deliver. Without
  this, an interrupt during flush() would translate into PermanentFailure on
  pending futures (since java.lang.InterruptedException is NOT a Kafka
  RetriableException), incorrectly marking transient interrupts as terminal.
  Now classified as RetriableFailure with the interrupt flag re-armed.

Important — observability:
- Add SLF4J logger (implementation dep on slf4j-api added to okapi-kafka,
  consistent with okapi-core's pattern). Logs warn on flush failure (with
  batch size) and debug on synchronous send rejection (with outboxId).
- classifyException fallback for null exception messages now uses
  e.javaClass.simpleName instead of generic "Permanent/Retriable Kafka error",
  preserving debuggability when an exception's message is null.

Type design polish:
- SendOutcome.Sent is now @JvmInline value class instead of data class —
  Future has reference-only equals which a data class would falsely advertise.
  Zero runtime cost, removes the misleading promise.

Comments:
- Condensed class-level KDoc, removed unverified "since Kafka 3.0" claim
  about exception hierarchy.
- Reworded deliverBatch KDoc to drop numbered-step restatement and clarify
  that flush() failures never abandon the batch.
- Trimmed redundant test inline comments per "no redundant comments" rule.

New tests (cover gaps the reviews flagged):
- mixed sync-throw + async outcomes in one batch with positional integrity
- poison-pill malformed deliveryMetadata in fire phase
- flush() throws non-Interrupt exception → per-entry classification preserved
- flush() throws InterruptException → pending futures classified as Retriable

Verification:
- All unit tests pass (4 new + existing batch + single-entry tests)
- All integration tests pass (real Postgres + Kafka via Testcontainers)
- ktlint clean
- Smoke benchmark Kafka batchSize=50: 0.260 ms/op vs 0.268 pre-review
  (within noise; logger overhead is on failure paths only, happy path
  unaffected). 32x improvement vs KOJAK-68 baseline preserved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant