Skip to content

Commit

Permalink
Improve consumer doc
Browse files Browse the repository at this point in the history
  • Loading branch information
thijsc committed Nov 14, 2019
1 parent c055f01 commit d07c959
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ module Rdkafka
#
# To create a consumer set up a {Config} and call {Config#consumer consumer} on that. It is
# mandatory to set `:"group.id"` in the configuration.
#
# Consumer implements `Enumerable`, so you can use `each` to consume messages, or for example
# `each_slice` to consume batches of messages.
class Consumer
include Enumerable

Expand Down Expand Up @@ -313,7 +316,14 @@ def seek(message)
end
end

# Commit the current offsets of this consumer
# Manually commit the current offsets of this consumer.
#
# To use this set `enable.auto.commit`to `false` to disable automatic triggering
# of commits.
#
# If `enable.auto.offset.store` is set to `true` the offset of the last consumed
# message for every partition is used. If set to `false` you can use {store_offset} to
# indicate when a message has been fully processed.
#
# @param list [TopicPartitionList,nil] The topic with partitions to commit
# @param async [Boolean] Whether to commit async or wait for the commit to finish
Expand Down

1 comment on commit d07c959

@KTSCode
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've come across an issue with using each_slice. If I publish 93392 messages and I have code similar to below running:

  total = 0
  consumer.each_slice(1000) do |messages|
    total += messages.count
    Job::DoThing.perform_async(messages)
    consumer.commit
  end

The last 392 messages don't get consumed, and it just listens until it reaches 1000 again. I don't have a constant stream of messages, they arrive in bursts, and I can't let messages sit until the next burst. I don't recommend using each_slice as a batching solution unless you have a constant stream of messages and can afford to let them get stale if there is a gap in message production.

Also if you are trying to drain a topic to transition to a different topic/cluster it is impossible unless you publish an exact multiple of your slice size.

Please sign in to comment.