Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume In Batches API #99

Closed
KTSCode opened this issue Nov 13, 2019 · 6 comments
Closed

Consume In Batches API #99

KTSCode opened this issue Nov 13, 2019 · 6 comments

Comments

@KTSCode
Copy link

KTSCode commented Nov 13, 2019

I know that it's currently possible to consume in batches poll in batches issue; however, I believe the API for this could be significantly improved. One of the main draws of this library was the simplicity with which a Kafka consumer could be implemented. Consuming messages individually has become too slow for my current use case. I began the transition to consuming in batches, and have found the work required to be more challenging than I feel is strictly necessary.

I think the users of this library would find an API similar to the code below far superior to the current interface.

consumer.each_in_batches(<batch size>, <timeout>) do |batch|
  batch.each |message|
     puts "Message received: #{message}"
  end   
end
  • batch size -> The number of messages to be consumed before the block of code is run.
  • timeout -> The time in milliseconds waited before a batch of greater than 0 messages and less than batch size messages is passed to the code block.
  • It would be expected that any errors that occur would propagate up and the offset for those messages would not be committed.
@thijsc
Copy link
Collaborator

thijsc commented Nov 13, 2019

Thanks for your feedback on this!

I think it's important to understand that the high-level consumer API that replaces the now legacy simple consumer API does not do batching in the same way. The high-level consumer always works in batches, it just does not expose the workings of that in its public API. Adding some kind of batching layer cannot improve performance, it will even be a little slower because extra work needs to be done to re-batch messages on the Ruby side.

When you consume a message in the naive way you are actually using batches. It is possible to tune the batching behaviour using config options such as fetch.max.bytes which are documented here.

Personally I feel that using store_offset and controlling the timing of a commit is the best way to handle this. It maps directly to what the driver is actually doing. If we were to implement a wrapper similar to the example above we'd use those exact techniques underwater since this is the only possible way of achieving this behaviour with the high-level consumer.

In my mind, the argument for doing this would have to be that it's a substantially better API for a lot of people. I'd love to hear more if you think that's the case.

@KTSCode
Copy link
Author

KTSCode commented Nov 13, 2019

Thank you for the quick response @thijsc!

I only have my personal experience to go off of, but I would like to make rdkafka-ruby the default ruby Kafka library at the company I work for. I would feel much better telling other teams to use this tool if I knew there was a simple batch consumption API that could be quickly and easily implemented.

Regarding performance, I'm trying to increase the performance of the database writes by doing mass insertions rather than individual insertions for this I need to be able to pass batches of messages to the Sidekiq jobs that will be writing them to the database. The speed of consumption from the topic far exceeds the speed at which I can write to my database.

After a good amount of reading the code and documentation, I was able to understand that when a Consumer has subscribe called on it with a topic list, it consumes those messages in batches and makes them available through poll and each and automatically will commit its offset to the Kafka topics (though I'm not exactly sure what prompts the automatic commits.) I am fairly certain that I could(with some effort) implement batched consumption and then store the offset of the last message using store_offset. That being said, I don't think that users should be required to understand how librdkafka and this wrapper work internally in order to implement batched consumption of messages.

I think the best libraries are deep with simpler(smaller) APIs rather than shallow with wide APIs (Philosophy of Software Design)

To me, the each method on the Consumer class is an extremely elegant message consumption implementation, and I feel that a good portion of existing and new users would greatly benefit from an extension of that API to facilitate easy message batching.

@KTSCode
Copy link
Author

KTSCode commented Nov 13, 2019

I've put up a PR with a naive implementation of the functionality that I'm asking for though confident it could be done in a much better way.

@thijsc
Copy link
Collaborator

thijsc commented Nov 14, 2019

Thanks for explaining your use case.

I think the best libraries are deep with simpler(smaller) APIs rather than shallow with wide APIs

This is actually exactly why I'm very hesitant to add extra layers and helpers to this gem. The API is in my opinion already deep and simple. It does require thinking about the problem in a different way. The consumer implements Enumerable, so Ruby actually lets you do a lot of cool stuff with it out of the box.

We do a lot of batching for database writes and such. That code roughly looks like this:

require 'enumerator'

config = {
  :"bootstrap.servers" => "localhost:9092",
  :"group.id" => "group",
  :"enable.partition.eof" => false,
  :"enable.auto.commit" => false
}
consumer = Rdkafka::Config.new(config).consumer
consumer.subscribe("topic")

consumer.each_slice(1000) do |messages|
  # Write to database
  db.write(messages.map(&:to_database_thing))

  # Commit consumer
  consumer.commit
end

You can implement something similar based on time. This assumes that the commit succeeds, if a double insert may never happen it might be wise to store the offset in the database within the same transaction.

I don't see a need to provide any wrappers around Enumerable in this gem, I think that's a concern on a more general level.

@KTSCode
Copy link
Author

KTSCode commented Nov 14, 2019

Thank you for that example, that helps me a lot in understanding the best approach for my use case. I still don't know that I fully understand the difference between commit and store_offset and why it is necessary to have both. I will dig into the documentation to try and answer that.

I think many users myself included would benefit greatly by having a defacto way to pull batches of messages from the consumer with an example in the documentation somewhere. I will close this issue. Would it be alright if I opened another issue for adding a batch consumption example to the documentation?

Thank you again for the quick responses and insight, I really appreciate it.

@KTSCode KTSCode closed this as completed Nov 14, 2019
@thijsc
Copy link
Collaborator

thijsc commented Nov 14, 2019

This could be clearer in the docs, added some more context: d07c959.

Further doc improvements are very welcome indeed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants