Skip to content

v2.10.2

Latest

Choose a tag to compare

@mensfeld mensfeld released this 15 Jun 15:53
176f7ac
  • [Feature] Expose Producer#current_variant as a public method. It returns the variant active for the current dispatch on the current fiber - the custom variant while inside a #with/#variant-wrapped call, otherwise the producer's default variant - so middleware and instrumentation listeners running synchronously within a dispatch can read the effective per-dispatch settings (topic_config, max_wait_timeout, default?). The lookup is fiber-local and dispatch-scoped: outside a variant-wrapped call (or from an asynchronous delivery callback) it returns the default variant.
  • [Enhancement] Stop allocating one interpolated string per message in LoggerListener batch produce handlers. The quoted topic strings were only ever counted (quoting is a 1:1 mapping), never displayed, so counting the raw topic values yields the identical number with zero string allocations - relevant for large produce_many_* batches with the default logger listener attached.
  • [Enhancement] Use Array#concat in Producer#buffer_many instead of appending messages one by one.
  • [Enhancement] Skip building the message.acknowledged instrumentation payload in the delivery callback when nothing is subscribed to that event. The notifications bus already short-circuits on empty listeners, but only after the payload hash was allocated - once per delivered message on the polling thread. Mirrors the listener guard already used by the statistics callback. Late subscribers keep working as the check happens on each emission.
  • [Enhancement] Resolve the fiber-local variant once per #produce call and once per #produce_many_sync wait phase instead of re-resolving it for every usage and for every waited delivery handle. For a 1,000-message sync batch this removes ~2,000 redundant fiber-local lookups.
  • [Enhancement] Do not allocate the fiber-local variants hash on the Producer#current_variant read path. Previously every fiber that produced messages got a Hash pinned to it for the fiber's lifetime (per producer use), even when variants were never used - wasteful under fiber-per-request servers (Falcon, async). The hash is now only created by variant wrapper methods that actually need to write to it.
  • [Enhancement] Cache the variant validation contract in a constant instead of instantiating a new Contracts::Variant on every Producer#with / Producer#variant call (mirrors the existing Transactions::CONTRACT pattern).
  • [Enhancement] Cache the tombstone validation contract in a constant instead of instantiating a new Contracts::Tombstone per tombstone message, removing per-message allocations in the tombstone_* APIs (mirrors the existing Transactions::CONTRACT pattern).
  • [Enhancement] Replace explicit Warning[:performance] opt-in with a dynamic approach using Warning.categories (available since Ruby 3.4) to automatically enable all stable opt-in warning categories in the test suite, including :strict_unused_block introduced in Ruby 4.0.
  • [Fix] Prevent a deadlock between a transactional single-message dispatch and #close. A single produce_sync/produce_async on a transactional producer incremented the operations counter (which #close drains while holding @transaction_mutex) before acquiring @transaction_mutex for its per-message transaction - an inverted lock order. A dispatch that had counted itself but not yet taken @transaction_mutex could deadlock a concurrent #close permanently (the close wait loop has no timeout). Transactional dispatches now take @transaction_mutex before the operation is counted, matching #close's lock order (@transaction_mutex -> @operating_mutex -> operations counter).
  • [Fix] Prevent a deadlock (ThreadError: deadlock; recursive locking) when closing an idempotent producer (with reload_on_idempotent_fatal_error enabled) that has buffered messages whose final flush surfaces a fatal librdkafka error. #close performs the final flush while already holding @operating_mutex, and the idempotent fatal-error reload tried to re-acquire that same mutex, leaving the producer stuck in :closing with the native client leaked. The idempotent reload is now skipped on the closing path, and the final buffer flush is best-effort so client teardown always completes.
  • [Fix] Make concurrent idempotent fatal-error reload thread-safe. When several threads shared an idempotent producer (with reload_on_idempotent_fatal_error enabled), a single fatal librdkafka condition failed all their in-flight produces at once and each entered the reload path; the second reload ran reload! after the first had already reset @client to nil, raising NoMethodError. The idempotent reload now bails out if another thread already reloaded (mirroring the transactional path's return if @status.configured? guard). Additionally, Status#active? now classifies the lifecycle from a single atomic read and Producer#ensure_active! branches on one snapshot, so a concurrent configured -> connected transition during a reload can no longer make ensure_active! raise StatusInvalidError for a valid, active producer.
  • [Fix] Stop #flush_async / #flush_sync from silently dropping valid buffered messages when the dispatch fails. #flush removes the batch from the internal buffer before dispatching it, and a failure (a single invalid message failing validation before anything is sent, or a mid-batch inline error such as queue full) previously discarded the entire taken batch - the removed messages were never restored. A failed flush now re-buffers the messages that never reached librdkafka (the whole batch on validation failure or on a transactional rollback, the unsent remainder otherwise) so they can be retried instead of being lost.
  • [Fix] Make Producer#close fork-safe so the GC finalizer inherited by a forked child can no longer close the parent's client. #client registers an ObjectSpace finalizer that calls #close; that finalizer is inherited across fork, and a child that inherited a used producer, never touched it, and exited normally would run #close in the child - flushing and closing (with the real rdkafka client, rd_kafka_destroy on a fork-inherited handle, i.e. undefined behavior) a client owned by the parent. #close now detects when it runs in a process other than the one that built the client, drops the inherited references and finalizer, and returns without touching the native client (matching the existing fork guard on the #client path).
  • [Fix] Guard the internal buffer appends in Producer#buffer and Producer#buffer_many with @buffer_mutex. The appends mutated the shared @messages buffer without the lock that flush/purge/close hold while swapping it for a fresh array, so a concurrent swap landing between reading @messages and appending could drop the message into an orphaned array that is never dispatched - silently losing buffered messages in the documented "buffer in one thread, flush in another" pattern.
  • [Fix] Stop a nested same-producer variant call from clobbering the outer variant inside a variant transaction block. transaction is the only variant-wrapped method that yields user code, so a variant call nested inside it (another variant.produce_*, or a raw producer dispatch in the same scope) used to delete the shared Fiber.current.waterdrop_clients entry on return, making the rest of the block silently fall back to the default variant and dispatch with default topic_config (timeouts, compression, partitioner) instead of the altered one. The wrapper now saves and restores the previous entry instead of unconditionally deleting it (still deleting when there was none, so the fiber-local hash does not accumulate stale keys).
  • [Fix] Stop ConnectionPool#shutdown and #reload from silently dropping in-flight messages. Both closed every pooled producer with close! (force), which flushes for the max wait timeout and then purges whatever has not drained - so on a slow or unreachable broker, queued produce_async messages were cancelled and lost with no delivery report. They now close producers gracefully by default (#reload always; #shutdown unless called with the new force: true), letting messages flush instead of being purged. Pass pool.shutdown(force: true) to keep the old force-and-purge behavior.
  • [Fix] Close a race in the FD poller where a producer registered while the last one was being torn down could be left permanently unpolled (sync produces hang until timeout, async deliveries are never acknowledged). The poller thread decided to exit (last producer unregistered) and cleared its thread reference in two separate, unsynchronized steps, so a register landing in that gap saw the still-alive exiting thread, skipped starting a fresh one, and then had its producer's state closed by the exiting thread's cleanup. The thread now decides to stop and clears its reference in a single mutex section, so a racing register either keeps it running or starts a fresh thread; and the exit cleanup runs only on an abnormal exit, since a normal exit always leaves an empty registry and so can never close a producer registered in the gap.