Skip to content

Commit

Permalink
Improve balance in the RoundRobinAssigner.
Browse files Browse the repository at this point in the history
  The RoundRobinAssigner now use the same round robin to assign partitions across all topics.
  Previously the RoundRobinAssigner would round robin partitions within each topic.
  If the consumer was subscribed to many topics with a single partition, this would lead to a
  situation in which all partitions of all topics were consumed by a single consumer.
  • Loading branch information
Ben Maraney committed Feb 5, 2020
1 parent 7fc1a14 commit 52f4238
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
32 changes: 18 additions & 14 deletions src/consumer/assigners/roundRobinAssigner/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { MemberMetadata, MemberAssignment } = require('../../assignerProtocol')
const flatten = require('../../../utils/flatten')

/**
* RoundRobinAssigner
Expand All @@ -15,7 +16,6 @@ module.exports = ({ cluster }) => ({
* The members array contains information about each member, `memberMetadata` is the result of the
* `protocol` operation.
*
* This process can result in imbalanced assignments
* @param {array} members array of members, e.g:
[{ memberId: 'test-5f93f5a3', memberMetadata: Buffer }]
* @param {array} topics
Expand All @@ -25,14 +25,14 @@ module.exports = ({ cluster }) => ({
* memberId: 'test-5f93f5a3',
* memberAssignment: {
* 'topic-A': [0, 2, 4, 6],
* 'topic-B': [0, 2],
* 'topic-B': [1],
* },
* },
* {
* memberId: 'test-3d3d5341',
* memberAssignment: {
* 'topic-A': [1, 3, 5],
* 'topic-B': [1],
* 'topic-B': [0, 2],
* },
* }
* ]
Expand All @@ -42,20 +42,24 @@ module.exports = ({ cluster }) => ({
const sortedMembers = members.map(({ memberId }) => memberId).sort()
const assignment = {}

sortedMembers.forEach(memberId => {
assignment[memberId] = {}
const topicsPartionArrays = topics.map(topic => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
return partitionMetadata.map(m => ({ topic: topic, partitionId: m.partitionId }))
})
const topicsPartitions = flatten(topicsPartionArrays)

topics.forEach(topic => {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
const partitions = partitionMetadata.map(m => m.partitionId)
sortedMembers.forEach((memberId, i) => {
if (!assignment[memberId][topic]) {
assignment[memberId][topic] = []
}
topicsPartitions.forEach((topicPartition, i) => {
const assignee = sortedMembers[i % membersCount]

if (!assignment[assignee]) {
assignment[assignee] = []
}

if (!assignment[assignee][topicPartition.topic]) {
assignment[assignee][topicPartition.topic] = []
}

assignment[memberId][topic].push(...partitions.filter(id => id % membersCount === i))
})
assignment[assignee][topicPartition.topic].push(topicPartition.partitionId)
})

return Object.keys(assignment).map(memberId => ({
Expand Down
10 changes: 5 additions & 5 deletions src/consumer/assigners/roundRobinAssigner/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
})

describe('#assign', () => {
test('assign all partitions evenly', async () => {
test('assign all topic-partitions evenly', async () => {
metadata['topic-A'] = Array(14)
.fill()
.map((_, i) => ({ partitionId: i }))
Expand All @@ -37,7 +37,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [0, 4, 8, 12],
'topic-B': [0, 4],
'topic-B': [2],
},
}),
},
Expand All @@ -47,7 +47,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [1, 5, 9, 13],
'topic-B': [1],
'topic-B': [3],
},
}),
},
Expand All @@ -57,7 +57,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [2, 6, 10],
'topic-B': [2],
'topic-B': [0, 4],
},
}),
},
Expand All @@ -67,7 +67,7 @@ describe('Consumer > assigners > RoundRobinAssigner', () => {
version: assigner.version,
assignment: {
'topic-A': [3, 7, 11],
'topic-B': [3],
'topic-B': [1],
},
}),
},
Expand Down

0 comments on commit 52f4238

Please sign in to comment.