Skip to content

Commit

Permalink
Round-robin partition assignments across multiple topics
Browse files Browse the repository at this point in the history
This is closer to the behavior of Kafka's Java consumer, as described
here:

https://github.com/apache/kafka/blob/15bc405/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java#L31-L55

This is valuable in situations where you have highly disparate
partition counts. Given three group members subscribed to many topics,
only a few of which have more than one partition, the first member
will receive far more assignments than the others.
  • Loading branch information
pd committed Feb 9, 2018
1 parent cf455bc commit a64a8be
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 109 deletions.
170 changes: 104 additions & 66 deletions balancer.go
@@ -1,6 +1,7 @@
package cluster

import (
"container/ring"
"math"
"sort"

Expand Down Expand Up @@ -70,89 +71,53 @@ func (n *Notification) success(current map[string][]int32) *Notification {

// --------------------------------------------------------------------

type balancer struct {
client sarama.Client
memberIDs []string
topics map[string]*topicInfo
}

type topicInfo struct {
Partitions []int32
MemberIDs []string
}

func (info topicInfo) Perform(s Strategy) map[string][]int32 {
if s == StrategyRoundRobin {
return info.RoundRobin()
}
return info.Ranges()
}

func (info topicInfo) Ranges() map[string][]int32 {
sort.Strings(info.MemberIDs)

mlen := len(info.MemberIDs)
plen := len(info.Partitions)
res := make(map[string][]int32, mlen)

for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) > 0 {
res[memberID] = sub
}
}
return res
}

func (info topicInfo) RoundRobin() map[string][]int32 {
sort.Strings(info.MemberIDs)

mlen := len(info.MemberIDs)
res := make(map[string][]int32, mlen)
for i, pnum := range info.Partitions {
memberID := info.MemberIDs[i%mlen]
res[memberID] = append(res[memberID], pnum)
}
return res
}

// --------------------------------------------------------------------

type balancer struct {
client sarama.Client
topics map[string]topicInfo
}

func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := newBalancer(client)
balancer := &balancer{
client: client,
memberIDs: make([]string, 0, len(members)),
topics: make(map[string]*topicInfo),
}
for memberID, meta := range members {
balancer.memberIDs = append(balancer.memberIDs, memberID)
for _, topic := range meta.Topics {
if err := balancer.Topic(topic, memberID); err != nil {
if err := balancer.Topic(memberID, topic); err != nil {
return nil, err
}
}
}
return balancer, nil
}

func newBalancer(client sarama.Client) *balancer {
return &balancer{
client: client,
topics: make(map[string]topicInfo),
}
sort.Strings(balancer.memberIDs)
return balancer, nil
}

func (r *balancer) Topic(name string, memberID string) error {
topic, ok := r.topics[name]
func (r *balancer) Topic(memberID string, name string) error {
info, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}
topic = topicInfo{

r.topics[name] = &topicInfo{
MemberIDs: []string{memberID},
Partitions: nums,
MemberIDs: make([]string, 0, 1),
}

return nil
}
topic.MemberIDs = append(topic.MemberIDs, memberID)
r.topics[name] = topic

info.MemberIDs = append(info.MemberIDs, memberID)
return nil
}

Expand All @@ -161,14 +126,87 @@ func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
return nil
}

res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
if _, ok := res[memberID]; !ok {
res[memberID] = make(map[string][]int32, 1)
switch s {
case StrategyRoundRobin:
return assignRoundRobin(r.memberIDs, r.topics)
default:
return assignRange(r.memberIDs, r.topics)
}
}

func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
tlen := len(topics)
res := make(map[string]map[string][]int32)

for topic, info := range topics {
mlen := len(info.MemberIDs)
plen := len(info.Partitions)

sort.Strings(info.MemberIDs)
for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) <= 0 {
continue
}

assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
res[memberID][topic] = partitions
assigned[topic] = sub
}
}

return res
}

func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
sort.Strings(memberIDs)

r := ring.New(len(memberIDs))
for i := 0; i < r.Len(); i++ {
r.Value = memberIDs[i]
r = r.Next()
}

isSubscribed := func(memberID string, topic string) bool {
info, ok := topics[topic]
if !ok {
return false
}

for _, subscriber := range info.MemberIDs {
if memberID == subscriber {
return true
}
}

return false
}

tlen := len(topics)
res := make(map[string]map[string][]int32, r.Len())

for topic, info := range topics {
for i := range info.Partitions {
for ; !isSubscribed(r.Value.(string), topic); r = r.Next() {
continue
}

memberID := r.Value.(string)
assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
assigned[topic] = append(assigned[topic], info.Partitions[i])
r = r.Next()
}
}

return res
}

0 comments on commit a64a8be

Please sign in to comment.