From a64a8bedcccab09f845e7aa14e372ce993d3998e Mon Sep 17 00:00:00 2001 From: Kyle Hargraves Date: Thu, 8 Feb 2018 21:29:22 -0600 Subject: [PATCH] Round-robin partition assignments across multiple topics This is closer to the behavior of Kafka's Java consumer, as described here: https://github.com/apache/kafka/blob/15bc405/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java#L31-L55 This is valuable in situations where you have highly disparate partition counts. Given three group members subscribed to many topics, only a few of which have more than one partition, the first member will receive far more assignments than the others. --- balancer.go | 170 +++++++++++++++++++------------- balancer_test.go | 247 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 308 insertions(+), 109 deletions(-) diff --git a/balancer.go b/balancer.go index 0f9b445..9fade63 100644 --- a/balancer.go +++ b/balancer.go @@ -1,6 +1,7 @@ package cluster import ( + "container/ring" "math" "sort" @@ -70,89 +71,53 @@ func (n *Notification) success(current map[string][]int32) *Notification { // -------------------------------------------------------------------- +type balancer struct { + client sarama.Client + memberIDs []string + topics map[string]*topicInfo +} + type topicInfo struct { Partitions []int32 MemberIDs []string } -func (info topicInfo) Perform(s Strategy) map[string][]int32 { - if s == StrategyRoundRobin { - return info.RoundRobin() - } - return info.Ranges() -} - -func (info topicInfo) Ranges() map[string][]int32 { - sort.Strings(info.MemberIDs) - - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - res := make(map[string][]int32, mlen) - - for pos, memberID := range info.MemberIDs { - n, i := float64(plen)/float64(mlen), float64(pos) - min := int(math.Floor(i*n + 0.5)) - max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) > 0 { - res[memberID] = sub - } - } - return res -} - -func (info topicInfo) RoundRobin() map[string][]int32 { - sort.Strings(info.MemberIDs) - - mlen := len(info.MemberIDs) - res := make(map[string][]int32, mlen) - for i, pnum := range info.Partitions { - memberID := info.MemberIDs[i%mlen] - res[memberID] = append(res[memberID], pnum) - } - return res -} - -// -------------------------------------------------------------------- - -type balancer struct { - client sarama.Client - topics map[string]topicInfo -} - func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { - balancer := newBalancer(client) + balancer := &balancer{ + client: client, + memberIDs: make([]string, 0, len(members)), + topics: make(map[string]*topicInfo), + } for memberID, meta := range members { + balancer.memberIDs = append(balancer.memberIDs, memberID) for _, topic := range meta.Topics { - if err := balancer.Topic(topic, memberID); err != nil { + if err := balancer.Topic(memberID, topic); err != nil { return nil, err } } } - return balancer, nil -} -func newBalancer(client sarama.Client) *balancer { - return &balancer{ - client: client, - topics: make(map[string]topicInfo), - } + sort.Strings(balancer.memberIDs) + return balancer, nil } -func (r *balancer) Topic(name string, memberID string) error { - topic, ok := r.topics[name] +func (r *balancer) Topic(memberID string, name string) error { + info, ok := r.topics[name] if !ok { nums, err := r.client.Partitions(name) if err != nil { return err } - topic = topicInfo{ + + r.topics[name] = &topicInfo{ + MemberIDs: []string{memberID}, Partitions: nums, - MemberIDs: make([]string, 0, 1), } + + return nil } - topic.MemberIDs = append(topic.MemberIDs, memberID) - r.topics[name] = topic + + info.MemberIDs = append(info.MemberIDs, memberID) return nil } @@ -161,14 +126,87 @@ func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { return nil } - res := make(map[string]map[string][]int32, 1) - for topic, info := range r.topics { - for memberID, partitions := range info.Perform(s) { - if _, ok := res[memberID]; !ok { - res[memberID] = make(map[string][]int32, 1) + switch s { + case StrategyRoundRobin: + return assignRoundRobin(r.memberIDs, r.topics) + default: + return assignRange(r.memberIDs, r.topics) + } +} + +func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 { + tlen := len(topics) + res := make(map[string]map[string][]int32) + + for topic, info := range topics { + mlen := len(info.MemberIDs) + plen := len(info.Partitions) + + sort.Strings(info.MemberIDs) + for pos, memberID := range info.MemberIDs { + n, i := float64(plen)/float64(mlen), float64(pos) + min := int(math.Floor(i*n + 0.5)) + max := int(math.Floor((i+1)*n + 0.5)) + sub := info.Partitions[min:max] + if len(sub) <= 0 { + continue + } + + assigned, ok := res[memberID] + if !ok { + assigned = make(map[string][]int32, tlen) + res[memberID] = assigned } - res[memberID][topic] = partitions + assigned[topic] = sub } } + + return res +} + +func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 { + sort.Strings(memberIDs) + + r := ring.New(len(memberIDs)) + for i := 0; i < r.Len(); i++ { + r.Value = memberIDs[i] + r = r.Next() + } + + isSubscribed := func(memberID string, topic string) bool { + info, ok := topics[topic] + if !ok { + return false + } + + for _, subscriber := range info.MemberIDs { + if memberID == subscriber { + return true + } + } + + return false + } + + tlen := len(topics) + res := make(map[string]map[string][]int32, r.Len()) + + for topic, info := range topics { + for i := range info.Partitions { + for ; !isSubscribed(r.Value.(string), topic); r = r.Next() { + continue + } + + memberID := r.Value.(string) + assigned, ok := res[memberID] + if !ok { + assigned = make(map[string][]int32, tlen) + res[memberID] = assigned + } + assigned[topic] = append(assigned[topic], info.Partitions[i]) + r = r.Next() + } + } + return res } diff --git a/balancer_test.go b/balancer_test.go index 0334c18..4a88be8 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -56,74 +56,235 @@ var _ = Describe("balancer", func() { }) It("should parse from meta data", func() { + Expect(subject.memberIDs).To(Equal([]string{"a", "b"})) Expect(subject.topics).To(HaveLen(3)) }) - It("should perform", func() { - Expect(subject.Perform(StrategyRange)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 1}, "two": {0, 1, 2}}, - "b": {"one": {2, 3}, "three": {0, 1}}, - })) - - Expect(subject.Perform(StrategyRoundRobin)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 2}, "two": {0, 1, 2}}, - "b": {"one": {1, 3}, "three": {0, 1}}, - })) - }) - }) -var _ = Describe("topicInfo", func() { +var _ = Describe("partition assignment", func() { DescribeTable("Ranges", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.Ranges()).To(Equal(expected)) + func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { + assignments := assignRange(memberIDs, topics) + Expect(assignments).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, + }), + + Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M3": {"t1": {1}}, + }), + + Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M2": {"t1": {0}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M3": {1}, + + Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2, 3}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1, 2}}, + "M3": {"t1": {3}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M2": {0}, + + Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 2, 4, 6, 8}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 2}}, + "M2": {"t1": {4}}, + "M3": {"t1": {6, 8}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0}, "M2": {1, 2}, "M3": {3}, + + Entry("multiple topics", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}, "t2": {0}}, + "M2": {"t1": {1}, "t3": {0}}, + "M3": {"t1": {2}, "t2": {1}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 2}, "M2": {4}, "M3": {6, 8}, + + Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}, "t3": {0}}, + "M2": {"t1": {1}, "t2": {0}}, + "M3": {"t1": {2}, "t2": {1}}, }), ) DescribeTable("RoundRobin", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.RoundRobin()).To(Equal(expected)) + func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { + assignments := assignRoundRobin(memberIDs, topics) + Expect(assignments).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M2": {1}, + + Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M1": {0}, + + Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0, 3}, "M2": {1}, "M3": {2}, + + Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2, 3}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 3}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, + }), + + Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 2, 4, 6, 8}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 6}}, + "M2": {"t1": {2, 8}}, + "M3": {"t1": {4}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 6}, "M2": {2, 8}, "M3": {4}, + + Entry("multiple topics", []string{"M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2"}, + }, + }, map[string]map[string][]int32{ + "M1": { + "t1": {0, 2}, + "t2": {1}, + }, + "M2": { + "t1": {1}, + "t2": {0, 2}, + }, + }), + + Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M3"}, + }, + }, map[string]map[string][]int32{ + "M1": { + "t1": {0}, + }, + "M2": { + "t2": {0}, + }, + "M3": { + "t2": {1}, + "t3": {0, 1, 2}, + }, }), )