Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lib/kafka/async_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
def produce(value, topic:, **options)
ensure_threads_running!

buffer_overflow(topic) if @queue.size >= @max_queue_size
if @queue.size >= @max_queue_size
buffer_overflow topic,
"Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached"
end

args = [value, **options.merge(topic: topic)]
@queue << [:produce, args]
Expand Down Expand Up @@ -148,14 +151,12 @@ def ensure_threads_running!
@timer_thread ||= Thread.new { @timer.run }
end

def buffer_overflow(topic)
def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,
})

@logger.error "Cannot produce message to #{topic}, max queue size (#{@max_queue_size}) reached"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep the log statement.

Copy link
Contributor Author

@michaelsauter michaelsauter Nov 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it because the sync producer doesn't have it either, and the exception already has the message. If you want exception AND logging, then I suggest we add the logging to the sync producer as well for consistency. Should I do that?


raise BufferOverflow
raise BufferOverflow, message
end

class Timer
Expand Down