Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate consumer group behavior #15

Closed
sdball opened this issue Feb 17, 2017 · 4 comments
Closed

Investigate consumer group behavior #15

sdball opened this issue Feb 17, 2017 · 4 comments
Assignees

Comments

@sdball
Copy link
Contributor

sdball commented Feb 17, 2017

When @objectuser spun up multiple dynos in the same consumer group we saw that offsets were not advancing forward as expected. In fact it appeared offsets were resetting over each other.

I suspected that consumer groups required more explicit coordination on our side vs kafka but that turns out to not be the case. This issue summarizes my results.

@sdball
Copy link
Contributor Author

sdball commented Feb 17, 2017

Partition assignment in a consumer group

When a consumer wants to join a group it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all the active consumers in the group from the group coordinator. The leader is responsible for assigning a subset of partitions to each consumer and uses an implementation of the PartitionAssigner interface (or language/framework appropriate implementation) to decide which partitions should be handled by each consumer.

After deciding on the partition assignment the group leader sends the list of assignments back to the Group Coordinator which then sends the information to all consumers. Each consumer only sees its own assignment, the leader is the only client process that has the full list of consumers in the group.

This partition assignment process happens whenever their is a rebalance event.

The way consumers maintain their membership in a consumer group and their ownership of the partitions assigned to them is by sending heartbeats to a Kafka Broker that is designated as the Group Coordinator.

The Group Coordinator broker can be different for different consumer groups.

If a consumer stops sending heartbeats for long enough its session will time out and the group coordinator will consider it dead and trigger a rebalance.

@sdball
Copy link
Contributor Author

sdball commented Feb 17, 2017

Consumer Group behavior in Kaffe

I spun up a simple Kaffe project to see how it handles consumer groups in nodes that are completely isolated from each other except for the connection to Kafka. Things look good!

I changed my local whitelist topic to have 10 partitions. I then joined a single kafkatest node to the kafkatest1 consumer group. I then joined a second node to the same consumer group. I then killed the original node (the group leader) to verify that the second node took over the partitions.

single consumer node started

$ iex -S mix
Erlang/OTP 18 [erts-7.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]

Compiling 1 file (.ex)
Generated kafkatest app
Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
12:23:16.290 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
failed to join group
reason::GroupCoordinatorNotAvailable

12:23:16.290 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
re-joining group, reason::GroupCoordinatorNotAvailable

12:23:16.329 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
failed to join group
reason::GroupCoordinatorNotAvailable

12:23:17.334 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
re-joining group, reason::GroupCoordinatorNotAvailable

12:23:17.343 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.171.0>):
connected to group coordinator kafka:9092

12:23:17.363 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
elected=true

12:23:17.418 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
assignments received:
whitelist:
    partition=0 begin_offset=undefined
    partition=1 begin_offset=undefined
    partition=2 begin_offset=undefined
    partition=3 begin_offset=undefined
    partition=4 begin_offset=undefined
    partition=5 begin_offset=undefined
    partition=6 begin_offset=undefined
    partition=7 begin_offset=undefined
    partition=8 begin_offset=undefined
    partition=9 begin_offset=undefined

12:23:17.423 [info]  client :kafkatest1 connected to kafka:9092

second consumer node started

12:24:10.323 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=1,pid=#PID<0.171.0>):
re-joining group, reason::RebalanceInProgress

12:24:10.327 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=2,pid=#PID<0.171.0>):
elected=true

12:24:10.333 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.171.0>-9a3f7132-aa86-4925-836b-2bd31566492a,generation=2,pid=#PID<0.171.0>):
assignments received:
whitelist:
    partition=0 begin_offset=undefined
    partition=2 begin_offset=undefined
    partition=4 begin_offset=undefined
    partition=6 begin_offset=undefined
    partition=8 begin_offset=undefined
$ iex -S mix
Erlang/OTP 18 [erts-7.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]

Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
12:24:09.544 [info]  group coordinator (groupId=kafkatest1,memberId=,generation=0,pid=#PID<0.142.0>):
connected to group coordinator kafka:9092

12:24:10.328 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
elected=false

12:24:10.330 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
assignments received:
whitelist:
    partition=1 begin_offset=undefined
    partition=3 begin_offset=undefined
    partition=5 begin_offset=undefined
    partition=7 begin_offset=undefined
    partition=9 begin_offset=undefined

12:24:10.340 [info]  client :kafkatest1 connected to kafka:9092

group coordinator quit

12:25:25.716 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=2,pid=#PID<0.142.0>):
re-joining group, reason::RebalanceInProgress

12:25:25.719 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=3,pid=#PID<0.142.0>):
elected=true

12:25:25.722 [info]  group coordinator (groupId=kafkatest1,memberId=nonode@nohost/<0.142.0>-c306fad6-2bbf-4081-b80c-d4e3539c2b6d,generation=3,pid=#PID<0.142.0>):
assignments received:
whitelist:
    partition=0 begin_offset=undefined
    partition=1 begin_offset=undefined
    partition=2 begin_offset=undefined
    partition=3 begin_offset=undefined
    partition=4 begin_offset=undefined
    partition=5 begin_offset=undefined
    partition=6 begin_offset=undefined
    partition=7 begin_offset=undefined
    partition=8 begin_offset=undefined
    partition=9 begin_offset=undefined

It appears that group coordination is indeed correctly happening through Kafka. 👍

@rwdaigle
Copy link
Contributor

Good background info (not that we doubted Kafka's ability to coordinate consumers)!

@sdball
Copy link
Contributor Author

sdball commented Feb 17, 2017

I had some doubts. 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants