Skip to content

Conversation

@dantswain
Copy link
Collaborator

I introduced a minor bug in #251 - the correlation id was not being properly incremented for each request. This is not really a big issue with KafkaEx because we don't really use the correlation id to correlate responses, but I still wanted to fix it.

Part of my fix here is to refactor the consumer group sync requests to a single function. The behavior is the same for all of the calls:

  • Convert a request struct to a wire request binary
  • Do the sync network request
  • Parse the response binary back into a struct
  • Repeat the request if we get a "not group coordinator" error message

I would like to explore using protocols for this, but wanted to get the fix done first.

The correlation id wasn't getting correctly implemented every time in
my previous fix.  Refactoring seemed like a good idea to hopefully get
better consistency.
@client_id,
join_group_request
def kafka_server_join_group(request, network_timeout, state_in) do
true = consumer_group?(state_in)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like we could drop this true = consumer_group?(state_in) in each of these functions and just pattern match in consumer_group_sync_request/4? This would also raise a match error, rather than giving a decent failure, which we could do in the shared function.
e.g.

def consumer_group_sync_request(_req, _mod, _timeout, %State{consumer_group: :no_consumer_group}) do
  raise "You are trying to consumer group but you aren't consumer grouping"
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. At first I had this in the shared function, but I wanted it to error out in the parent so that you would see that in the stacktrace. Making it raise an error and decorating the error with info from the request struct is even better 👍

Copy link
Member

@joshuawscott joshuawscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good other than the one comment. I'm +1 on using protocols for this at some point.

@dantswain
Copy link
Collaborator Author

I think I'm going to merge this PR (unless @bjhaid objects soon) and do a new PR with the exception, since the exception would show up in more locations than what is changed here.

@bjhaid
Copy link
Member

bjhaid commented Nov 29, 2017

I have not reviewed, but I am not objecting to merging, I can look tomorrow morning, but if you don't get feedback from me you can just merge :)

@dantswain dantswain merged commit c87d5b2 into master Nov 29, 2017
@dantswain dantswain deleted the fix_correlation_id branch November 29, 2017 14:25
dantswain added a commit that referenced this pull request Nov 29, 2017
robotarmy pushed a commit to RAM9/kafka_ex that referenced this pull request Apr 7, 2025
Fix correlation id bug, refactor consumer group network requests
robotarmy pushed a commit to RAM9/kafka_ex that referenced this pull request Apr 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants