Skip to content

Commit

Permalink
KAFKA-4402: make the KafkaProducer true round robin per topic
Browse files Browse the repository at this point in the history
Author: yaojuncn <yaojuncn@users.noreply.github.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Konstantin <konstantin@tubemogul.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#2128 from yaojuncn/KAFKA-4402-client-producer-round-robin-fix
  • Loading branch information
yaojuncn authored and soenkeliebau committed Feb 7, 2017
1 parent 4c981d3 commit bafb020
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
Expand All @@ -35,7 +37,7 @@
*/
public class DefaultPartitioner implements Partitioner {

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}

Expand All @@ -53,7 +55,7 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
Expand All @@ -68,6 +70,18 @@ public int partition(String topic, Object key, byte[] keyBytes, Object value, by
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

public void close() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import java.util.Map;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -62,4 +63,35 @@ public void testRoundRobinWithUnavailablePartitions() {
}
assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
}

@Test
public void testRoundRobin() throws InterruptedException {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, node0, nodes, nodes),
new PartitionInfo(topicA, 1, node1, nodes, nodes),
new PartitionInfo(topicA, 2, node2, nodes, nodes),
new PartitionInfo(topicB, 0, node0, nodes, nodes)
);
Cluster testCluster = new Cluster("clusterId", asList(node0, node1, node2), allPartitions,
Collections.<String>emptySet(), Collections.<String>emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

for (int i = 0; i < 30; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
Integer count = partitionCount.get(partition);
if (null == count) count = 0;
partitionCount.put(partition, count + 1);

if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}

assertEquals(10, (int) partitionCount.get(0));
assertEquals(10, (int) partitionCount.get(1));
assertEquals(10, (int) partitionCount.get(2));
}
}

0 comments on commit bafb020

Please sign in to comment.