-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change ruby-kafka to rdkafka-ruby #97
Conversation
@@ -237,8 +237,6 @@ end | |||
The consumers will checkpoint their positions from time to time in order to be able to recover from failures. This is called _committing offsets_, since it's done by tracking the offset reached in each partition being processed, and committing those offset numbers to the Kafka offset storage API. If you can tolerate more double-processing after a failure, you can increase the interval between commits in order to better performance. You can also do the opposite if you prefer less chance of double-processing. | |||
|
|||
* `offset_commit_interval` – How often to save the consumer's position in Kafka. Default is every 10 seconds. | |||
* `offset_commit_threshold` – How many messages to process before forcing a checkpoint. Default is 0, which means there's no limit. Setting this to e.g. 100 makes the consumer stop every 100 messages to checkpoint its position. | |||
* `offset_retention_time` - How long committed offsets will be retained. Defaults to the broker setting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those configuration values are not supported by rdkafka. Or did I miss just them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset_commit_threshold
is a ruby-kafka thing, but offset_retention_time
should be supported – it's part of the protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I tried to dig a bit deeper there. It seems like the underlying C library librdkafka does not expose offset retention as configuration option. Looking at the current Apache Kafka Documentation I can only find it as broker configuration value.
Personally I don't think we would need that option to be set from the consumer side. Therefore I would vote for ditching it to be able to make progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually something I use in production :/ it's nice for knowing how long a consumer group can be suspended for before it loses its offsets...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I've put it into my TODO list for going from alpha to beta. This one might be tricky to implement trough all the layers involved though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this issue being related to that topic. They once thought about implementing it per commit until this was removed from the protocol definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. It's still supported by ruby-kafka...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it is part of the latest OffsetCommit
request (version 4), but not part of TxnOffsetCommit
. http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
Good initiative 👍 ! |
lib/racecar/consumer_set.rb
Outdated
@consumers = @config.subscriptions.map do |subscription| | ||
consumer = Rdkafka::Config.new(rdkafka_config(subscription)).consumer | ||
consumer.subscribe subscription.topic | ||
consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does an Rdkafka consumer not support multiple subscriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, is it because rdkafka-ruby doesn't support per-subscription config? That's supported by the protocol though – does librdkafka not support that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, but not with different configuration options (like auto.offset.reset
, which is quite handy to define in the handler for different topics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll see if we run into performance issues with many subscriptions then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, there can be only one config per client. This is a limitation imposed by the C lib.
to avoid duplication in prioviding allowed_values in config
Awesome that you're making this move. I'm a maintainer of |
Took a quick look at d00dbc7, but not sure how this works exactly. What are you achieving with these batches? |
@thijsc I tried to explain my use case in an issue on rdkafka-ruby: karafka/rdkafka-ruby#57 |
@thijsc we've got a ruby-kafka + racecar + delivery boy Slack channel; do you want to join so that we can discuss things there? |
* master: Add note to changelog Add current dir to LOAD_PATH only when --require option is used Update version to v0.5.0.beta2 Remove alpha warning for producing messages Add `command` to the k8s example Add example of configuring consumer in k8s Describe how to deploy with Kubernetes RSpec: enable zero-monkey patch mode Allow using Bundler 2 Allow configuring `sasl_over_ssl`. Replace #present? with #nil? to work outside of Rails Fix group_id/group_id_prefix documentation v0.5.0.beta1 Describe manual heartbeats in the README Allow triggering heartbeats manually from a consumer Mention message headers
* wip: initial implementation * wip: add basic tests & fix bugs * bump rdkafka version to ensure pause is available * refactor pause class, use rdkafka's assignment over subscription -- latter doesn't list partitions at all * document exponential backoff; fix disabling pause * seek back in order not to skip over broken batches * explain how heartbeats/long running processing works with librdkafka * improve long-running task docs
What is remaining here or blockers? Anything we could to push this along? As using native librdkafka would have immense benefits. |
FeaturesI consider this PR is almost complete:
StabilityWe are running this PR in production for some weeks now using Kafka 2.2. Breaking ChangesThis change requires a major release as it introduces some breaking changes. Those need to get documented carefully and sorted out with the core maintainers of this gem. See also https://github.com/zendesk/racecar/pull/97/files#diff-4ac32a78649ca5bdd8e0ba38b7006a1e DocumentationThe breaking changes require some re-write of the README. Those texts also need to get approved by the core maintainers of this gem. See also https://github.com/zendesk/racecar/pull/97/files#diff-04c6e90faac2675aa89e2176d2eec7d8 As we run this code on production already we consider it good enough for our use-case. Obviously we also would have a benefit when this code re-joins the main branch, but we would happily be willing to accept help to make this happen. @dasch did I conclude correct here or have I missed something from your point of view? |
I just released rdkafka-ruby with all the latest additions. (pause was already in 0.5.0 by the way). I haven't made any progress on the instrumentation support. It's not something we have a need for ourselves, so haven't felt a lot of urgency. If it's a bocker to get this pull shipped we should get it done though. I'll take a look at it this week. |
@thijsc I have work in progress with something similar for Waterdrop and Karafka, the instrumentation hooks would be really helpful. |
FYI: Here we have described our use-case that lead to this PR: https://tech.xing.com/asynchronous-state-replication-9e504aeac0de |
@JHK I am returning to work next week and will give this a review. Thanks for all your hard work! |
@last_poll_read_nil_message = true if msg.nil? | ||
end | ||
|
||
# XXX: messages are not guaranteed to be from the same partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that could break existing consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it certainly did. However, ConsumerSet
is an entirely new class and we fixed this issue in Racecar::Runner
in this commit: c8902cf#diff-2b2ca276543198a74d7942069ba0b48bR45
On master
Racecar::Runner#consumer
is public, but in this branch it's actually private, so it would break existing custom users already I think. Additionally the methods are named each_batch
(master) and batch_poll
(this branch).
I'm aware the behaviour change is not ideal, especially because it is potentially performance degrading (see linked commit message). If someone decides to use the ConsumerSet#batch_poll
directly, they might be in for a surprise because it's not necessarily visible in testing. Naming the method something like batch_poll_from_subscribed_partitions
would make it clearer, but it's not an exactly nice name. I'm fine with renaming this because it's not used often, but ideally this would be solved directly within librdkafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thijsc would this be something that librdkafka could handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked in librdkafka
's channel if there is a recommended approach to handle this in general (https://gitter.im/edenhill/librdkafka?at=5d4d71d21db76d0ffea12263). Let's see what they recommend, maybe I'm just missing some obvious batch poll method that supports this more or less out of the box.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the chat:
@breunigs what you can do is extract the per-partition queue, see queue_get_partition() and forward it to NULL and poll each partition queue explicitly.
Header for queue_get_partition
is here: https://github.com/edenhill/librdkafka/blob/675e764b0fb013835c3ead74415bdb62b0aa7903/src/rdkafka.h#L2859-L2872
I took a brief look and this seems to allow a very fine grained setup of how message are consumed. Since there's a rebalance callback, old queues can probably be destroyed through that, but maybe queue_get_partition
→ poll
→ queue_destroy
in the main loop is easier.
librdkafka also works on all assigned partitions semi-simultaneously, so we'd need something similar to switch queues if we stall on a single partition, or introduce a max time to ensure partitions are worked off more or less evenly. There'd also need to be a call to determine how many potential partitions a topic has, or at least how many we're currently subscribed to.
Apart from handling the details, exposing this function through rdkafka-ruby sounds reasonable, but reimplementing the polling/queue switching handling is probably on racecar side.
Judging from gut feeling only, this seems effort and not a good trade-off. It took me a couple of iterations until I had all the bugs worked out on another feature which works with the lower-level APIs because it requires a very good understanding of Kafka and librdkafka. However, for us this non-optimal batching also incurs only a small performance penalty, so it was much easier to just start an additional process.
Can you rebase and resolve the conflicts? I think the best plan for rolling this out is to cut a @JHK will you be available for helping solve the many issues that will undoubtedly crop up? I unfortunately have very limited bandwidth. |
+1 on the rollout/branch naming approach Regarding availability: yes, we do have some time to support that. We don't have dedicated time for this, so I can offer best effort here, but I can't guarantee some fixed hours/week availability. Also most of the support would be from me, not @JHK. |
OK, I will probably start that work next week. |
Can you rebase and move the changelog entries under |
Is there a reason for this switch ? |
Yup – keeping ruby-kafka up to date with all the Kafka changes while avoiding regressions is too much work, and that's not even counting the missing features, such as background heartbeats, that we're still missing. By building upon rdkafka we can focus on ergonomic features rather than low-level stuff, and we should gain a lot of performance at the same time. |
This PR will change the underlying library to the C-based rdkafka-ruby. It is still work in progress as some things are not updated yet: