Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-525 / 764: ruby-kafka does not support different topic subscriptions in the same consumer group #903

Merged
merged 3 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Changes and additions to the library will be listed here.
- Add support for `murmur2` based partitioning.
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).
- Handle SyncGroup responses with a non-zero error and no assignments (#896).
- Add support for non-identical topic subscriptions within the same consumer group (#525 / #764).

## 1.3.0

Expand Down
7 changes: 6 additions & 1 deletion lib/kafka/consumer_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,14 @@ def synchronize
if group_leader?
@logger.info "Chosen as leader of group `#{@group_id}`"

topics = Set.new
@members.each do |_member, metadata|
metadata.topics.each { |t| topics.add(t) }
end

group_assignment = @assignor.assign(
members: @members,
topics: @topics,
topics: topics,
)
end

Expand Down
35 changes: 28 additions & 7 deletions lib/kafka/round_robin_assignment_strategy.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

module Kafka

# A consumer group partition assignment strategy that assigns partitions to
# consumers in a round-robin fashion.
# A round robin assignment strategy inpired on the
# original java client round robin assignor. It's capable
# of handling identical as well as different topic subscriptions
# accross the same consumer group.
class RoundRobinAssignmentStrategy
def protocol_name
"roundrobin"
Expand All @@ -19,13 +19,34 @@ def protocol_name
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def call(cluster:, members:, partitions:)
member_ids = members.keys
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
relevant_partitions = valid_sorted_partitions(members, partitions)
members_ids = members.keys
iterator = (0...members.size).cycle
idx = iterator.next

relevant_partitions.each do |partition|
topic = partition.topic

while !members[members_ids[idx]].topics.include?(topic)
idx = iterator.next
end

partitions_per_member[members_ids[idx]] << partition
idx = iterator.next
end

partitions_per_member
end

def valid_sorted_partitions(members, partitions)
subscribed_topics = members.map do |id, metadata|
metadata && metadata.topics
end.flatten.compact

partitions
.select { |partition| subscribed_topics.include?(partition.topic) }
.sort_by { |partition| partition.topic }
end
end
end
62 changes: 62 additions & 0 deletions spec/functional/consumer_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,68 @@ def call(cluster:, members:, partitions:)
expect(received_messages.values.map(&:count)).to match_array [messages.count / 3, messages.count / 3 * 2]
end

example "subscribing to different topics while in the same consumer group" do
topic1 = create_random_topic(num_partitions: 1)
topic2 = create_random_topic(num_partitions: 1)
messages = (1..500).to_a

begin
kafka = Kafka.new(kafka_brokers, client_id: "test")
producer = kafka.producer

messages[0..249].each do |i|
producer.produce(i.to_s, topic: topic1, partition: 0)
end

messages[250..500].each do |i|
producer.produce(i.to_s, topic: topic2, partition: 0)
end

producer.deliver_messages
end

group_id = "test#{rand(1000)}"

mutex = Mutex.new
received_messages = []

assignment_strategy_class = Kafka::RoundRobinAssignmentStrategy

consumers = [topic1, topic2].map do |topic|
assignment_strategy = assignment_strategy_class.new
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
consumer = kafka.consumer(
group_id: group_id,
offset_retention_time: offset_retention_time,
assignment_strategy: assignment_strategy
)
consumer.subscribe(topic)
consumer
end

threads = consumers.map do |consumer|
t = Thread.new do
consumer.each_message do |message|
mutex.synchronize do
received_messages << message

if received_messages.count == messages.count
consumers.each(&:stop)
end
end
end
end

t.abort_on_exception = true

t
end

threads.each(&:join)

expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
end

def wait_until(timeout:)
Timeout.timeout(timeout) do
sleep 0.5 until yield
Expand Down
Loading