Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect after leaving the group #817

Merged
merged 1 commit into from
Apr 20, 2020

Conversation

abicky
Copy link
Contributor

@abicky abicky commented Mar 24, 2020

This PR removes an unnecessary reconnection after Kafka::Consumer#stop is called.

@cluster.disconnect closes the connections to all brokers including the coordinator in spite of the fact that @group.leave requires a connection to the coordinator, so call @cluster.disconnect after @group.leave.

You can confirm the behavior by executing the following script:

require "kafka"

CLIENT_ID = ENV["CLIENT_ID"]
GROUP_ID = ENV["GROUP_ID"]
TOPIC = ENV["TOPIC"]

$stdout.sync = true
logger = Logger.new($stdout)
logger.level = Logger::DEBUG
kafka = Kafka.new(["localhost:9092"], client_id: CLIENT_ID, logger: logger)

consumer = kafka.consumer(group_id: GROUP_ID)
consumer.subscribe(TOPIC)
Thread.new do
  # Wait for rebalance to end
  sleep 5
  consumer.stop
end

consumer.each_message do |message|
  puts message.topic, message.partition
  puts message.offset, message.key, message.value
end

Before

-- snip --
D, [2020-03-25T05:35:02.636289 #32996] DEBUG -- : [[g] {test: 0, 2, 1}:] Handling fetcher command: stop
I, [2020-03-25T05:35:02.636329 #32996]  INFO -- : [g] {test: 0, 2, 1}: Fetcher thread exited.
I, [2020-03-25T05:35:02.636470 #32996]  INFO -- : Disconnecting broker 0
D, [2020-03-25T05:35:02.636522 #32996] DEBUG -- : Closing socket to localhost:9092
I, [2020-03-25T05:35:03.633512 #32996]  INFO -- : [[g] {}:] Leaving group `g`
D, [2020-03-25T05:35:03.633671 #32996] DEBUG -- : [[g] {}:] [leave_group] Opening connection to localhost:9092 with client id c...
D, [2020-03-25T05:35:03.634838 #32996] DEBUG -- : [[g] {}:] [leave_group] Sending leave_group API request 1 to localhost:9092
D, [2020-03-25T05:35:03.635018 #32996] DEBUG -- : [[g] {}:] [leave_group] Waiting for response 1 from localhost:9092
D, [2020-03-25T05:35:03.636950 #32996] DEBUG -- : [[g] {}:] [leave_group] Received response 1 from localhost:9092

After

-- snip --
D, [2020-03-25T05:36:31.482775 #33053] DEBUG -- : [[g] {test: 0, 2, 1}:] Handling fetcher command: stop
I, [2020-03-25T05:36:31.482809 #33053]  INFO -- : [g] {test: 0, 2, 1}: Fetcher thread exited.
I, [2020-03-25T05:36:32.480502 #33053]  INFO -- : [[g] {}:] Leaving group `g`
D, [2020-03-25T05:36:32.480612 #33053] DEBUG -- : [[g] {}:] [leave_group] Sending leave_group API request 6 to localhost:9092
D, [2020-03-25T05:36:32.480760 #33053] DEBUG -- : [[g] {}:] [leave_group] Waiting for response 6 from localhost:9092
D, [2020-03-25T05:36:32.482325 #33053] DEBUG -- : [[g] {}:] [leave_group] Received response 6 from localhost:9092
I, [2020-03-25T05:36:32.482380 #33053]  INFO -- : [[g] {}:] Disconnecting broker 0
D, [2020-03-25T05:36:32.482403 #33053] DEBUG -- : [[g] {}:] Closing socket to localhost:9092

As you can see above, the message "Opening connection to localhost:9092" disappeared in the log after the change.

This commit removes an unnecessary reconnection after
`Kafka::Consumer#stop` is called.
`@cluster.disconnect` closes the connections to all brokers
including the coordinator in spite of the fact that
`@group.leave` requires a connection to the coordinator,
so call `@cluster.disconnect` after `@group.leave`.
@dasch dasch merged commit 8660d05 into zendesk:master Apr 20, 2020
@dasch
Copy link
Collaborator

dasch commented Apr 20, 2020

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants