diff --git a/lib/kafka/statsd.rb b/lib/kafka/statsd.rb index bff29d01a..102f1a893 100644 --- a/lib/kafka/statsd.rb +++ b/lib/kafka/statsd.rb @@ -97,6 +97,7 @@ def process_message(event) end def process_batch(event) + lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) @@ -109,6 +110,8 @@ def process_batch(event) timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages) end + + gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag) end def join_group(event)