diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index b64e54958..82496c34d 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -100,7 +100,10 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli def produce(value, topic:, **options) ensure_threads_running! - buffer_overflow(topic) if @queue.size >= @max_queue_size + if @queue.size >= @max_queue_size + buffer_overflow topic, + "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" + end args = [value, **options.merge(topic: topic)] @queue << [:produce, args] @@ -148,14 +151,12 @@ def ensure_threads_running! @timer_thread ||= Thread.new { @timer.run } end - def buffer_overflow(topic) + def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic, }) - @logger.error "Cannot produce message to #{topic}, max queue size (#{@max_queue_size}) reached" - - raise BufferOverflow + raise BufferOverflow, message end class Timer