-
Notifications
You must be signed in to change notification settings - Fork 164
Consumer group (Again) #219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lib/kafka_ex/consumer_group.ex
Outdated
| join_response = %JoinGroupResponse{error_code: :no_error} = | ||
| KafkaEx.join_group(join_request, worker_name: worker_name, timeout: session_timeout + @session_timeout_padding) | ||
|
|
||
| Logger.debug("Joined consumer group #{group_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer lazy Logger calls.
lib/kafka_ex/consumer_group.ex
Outdated
| {:ok, new_state} | ||
|
|
||
| %HeartbeatResponse{error_code: :rebalance_in_progress} -> | ||
| Logger.debug("Rebalancing consumer group #{group_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer lazy Logger calls.
lib/kafka_ex/consumer_group.ex
Outdated
|
|
||
| %LeaveGroupResponse{error_code: :no_error} = KafkaEx.leave_group(leave_request, worker_name: worker_name) | ||
|
|
||
| Logger.debug("Left consumer group #{group_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer lazy Logger calls.
lib/kafka_ex/gen_consumer.ex
Outdated
| [%OffsetCommitResponse{topic: ^topic, partitions: [^partition]}] = | ||
| KafkaEx.offset_commit(worker_name, request) | ||
|
|
||
| Logger.debug("Committed offset #{topic}/#{partition}@#{offset} for #{group}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer lazy Logger calls.
i.e., instead of `handle_message/2` we have `handle_message_set/2` It's very easy for the user to iterate over the batch, and there is at least one use case for passing batches to the consumer: parallelized processing of messages. At Simpli.fi we have some applications where each message we receive from Kafka some amount of work that can be done asynchronously, and we've found a big performance increase by processing those in parallel. There are some delivery guarantee implications to that, but it should be up to the user to decide how to handle that.
Mostly just line length. Added an intermediate variable in a couple places to handle long case statement input.
This makes more intuitive sense - one wouldn't really start the manager process without the supervisor.
GenConsumer doesn't necessarily need to know anything about the way partitions are assigned - it makes more sense for the ConsumerGroup code to deal with this. I made it an optional callback that defaults to the round robin algorithm. I tried to update the docs where applicable.
* Group the gen_server_opts * Specify heartbeat and session timeout opts
I tried to add a test for this but had significant trouble getting the test to work. I BELIEVE the issues with the test are because of the number of connects/disconnects happening in rapid succession, but I'm not sure and I'd like to revisit that later.
This should be more robust and exercises some of the ops functions
Cleaned up the tests a bit
|
@bjhaid @joshuawscott I think this is finally ready for review. There are a couple things I might like to clean up later, but they can be in follow-up PRs. I mostly wanted to nail down the API first. @dcuddeback If you have time, I would greatly appreciate any review you can give here. This is still mostly your vision, but I changed a handful of things. This may not be an exhaustive list:
|
|
ack, this is some massive diff, I'll find sometime to review it before the end of the week |
| session_timeout: session_timeout, | ||
| } | ||
|
|
||
| join_response = %JoinGroupResponse{error_code: :no_error} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will crash if we get any error back from Kafka. I don't know if it's necessarily good to crash within a private function that's in an essentially private module. It will make it difficult to diagnose for a library user. Maybe we should crash with a helpful message here (like display a human-readable version of the error code that we got)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what I'd like to do here is follow up on some of these with separate PRs. I'll make issues for them.
This one is #223
| assignments2 = ConsumerGroup.assignments(context[:consumer_group_pid2]) | ||
| assert 2 == length(assignments1) | ||
| assert 2 == length(assignments2) | ||
| refute assignments1 == assignments2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assert that none of the assignments in assignments1 are in assignments2
joshuawscott
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
|
@bjhaid @dcuddeback ping. Sorry for the size of the PR :( |
bjhaid
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This is my continuation of #207 which is in turn a continuation of #195 .
It's still WIP - feel free to review as it goes, but I'll post when done.