diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 35628608c..07c232a13 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -202,6 +202,15 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica ) batches.each do |batch| + unless batch.empty? + @instrumenter.instrument("fetch_batch.consumer", { + topic: batch.topic, + partition: batch.partition, + offset_lag: batch.offset_lag, + highwater_mark_offset: batch.highwater_mark_offset, + message_count: batch.messages.count, + }) + end batch.messages.each do |message| notification = { topic: message.topic, diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 379c7389e..fea327749 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -75,6 +75,18 @@ ] } + it "instruments" do + expect(instrumenter).to receive(:instrument).once.with('fetch_batch.consumer', anything) + expect(instrumenter).to receive(:instrument).once.with('start_process_message.consumer', anything) + expect(instrumenter).to receive(:instrument).once.with('process_message.consumer', anything) + + allow(instrumenter).to receive(:instrument).and_call_original + + consumer.each_message do |message| + consumer.stop + end + end + it "raises ProcessingError if the processing code fails" do expect { consumer.each_message do |message|