Skip to content

Commit

Permalink
Produce message with key to allow ordered message delivery across clu…
Browse files Browse the repository at this point in the history
…sters (#55)

Currently ProduceService produces message without key to the specific
partition. Messages to the same partition will have incremental sequence number
in the payload to allow ConsumeService to detect message loss and out-of-order
message delivery. However, when we monitor a pipeline of clusters connected by
MirrorMaker, the message to the same partition in the destination cluster may
not have incremental sequence number since MM produces message without key.

This patch addresses the problem by producing message with key that will map to
the given partition using the specified partitioner class. The partitioner
class needs to be the same partitioner used by MM. It is configured as
org.apache.kafka.clients.producer.internals.DefaultPartitioner by default.
  • Loading branch information
WarrenGreen authored and lindong28 committed Mar 24, 2017
1 parent b13b43c commit 1ebe99c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 8 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Expand Up @@ -16,8 +16,8 @@ allprojects {
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'org.slf4j:slf4j-log4j12:1.7.6'
compile 'org.apache.avro:avro:1.4.0'
compile 'org.apache.kafka:kafka_2.11:0.10.0.1'
compile 'org.apache.kafka:kafka-clients:0.10.0.1'
compile 'org.apache.kafka:kafka_2.11:0.10.1.1'
compile 'org.apache.kafka:kafka-clients:0.10.1.1'
compile 'org.testng:testng:6.8.8'
compile 'org.eclipse.jetty:jetty-server:8.1.19.v20160209'
compile 'org.json:json:20140107'
Expand Down Expand Up @@ -53,7 +53,7 @@ allprojects {
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}

test.dependsOn('checkstyleMain', 'checkstyleTest')

test {
Expand Down
Expand Up @@ -70,6 +70,7 @@ private List<TopicManagementService> createTopicManagementServices(Map<String, O
String topic,
String name) throws Exception {
Map<String, Object> topicManagementProps = createTopicManagementServiceProps(props, topic);
topicManagementProps.put(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG, "");
TopicManagementServiceConfig config = new TopicManagementServiceConfig(topicManagementProps);
double partitionsToBrokerRatio = config.getDouble(TopicManagementServiceConfig.PARTITIONS_TO_BROKER_RATIO_THRESHOLD);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/linkedin/kmf/common/Utils.java
Expand Up @@ -16,14 +16,14 @@
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.common.TopicExistsException;
import kafka.server.KafkaConfig;
import kafka.utils.ZkUtils;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.security.JaasUtils;
import org.json.JSONObject;
import org.slf4j.Logger;
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/linkedin/kmf/partitioner/KMPartitioner.java
@@ -0,0 +1,16 @@
/**
* Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.linkedin.kmf.partitioner;

public interface KMPartitioner {

int partition(String key, int numPartitions);

}
24 changes: 24 additions & 0 deletions src/main/java/com/linkedin/kmf/partitioner/NewKMPartitioner.java
@@ -0,0 +1,24 @@
/**
* Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.linkedin.kmf.partitioner;

import static org.apache.kafka.common.utils.Utils.murmur2;

public class NewKMPartitioner implements KMPartitioner {

public int partition(String key, int partitionNum) {
byte[] keyBytes = key.getBytes();
return toPositive(murmur2(keyBytes)) % partitionNum;
}

private static int toPositive(int number) {
return number & 0x7fffffff;
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java
@@ -0,0 +1,17 @@
/**
* Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.linkedin.kmf.partitioner;

public class OldKMPartitioner implements KMPartitioner {

public int partition(String key, int partitionNum) {
return Math.abs(key.hashCode()) % partitionNum;
}
}
31 changes: 27 additions & 4 deletions src/main/java/com/linkedin/kmf/services/ProduceService.java
Expand Up @@ -10,6 +10,7 @@
package com.linkedin.kmf.services;

import com.linkedin.kmf.common.Utils;
import com.linkedin.kmf.partitioner.KMPartitioner;
import com.linkedin.kmf.producer.BaseProducerRecord;
import com.linkedin.kmf.producer.KMBaseProducer;
import com.linkedin.kmf.producer.NewProducer;
Expand Down Expand Up @@ -44,7 +45,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ProduceService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class);
private static final String METRIC_GROUP_NAME = "produce-service";
Expand All @@ -55,6 +55,7 @@ public class ProduceService implements Service {
private final String _name;
private final ProduceMetrics _sensors;
private KMBaseProducer _producer;
private KMPartitioner _partitioner;
private ScheduledExecutorService _produceExecutor;
private final ScheduledExecutorService _handleNewPartitionsExecutor;
private final int _produceDelayMs;
Expand All @@ -80,6 +81,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);

_partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class);
_threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG);
_topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG);
_producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG);
Expand Down Expand Up @@ -154,17 +156,36 @@ public void start() {
}

private void initializeStateForPartitions() {
Map<Integer, String> keyMapping = generateKeyMappings();
int partitionNum = _partitionNum.get();
for (int partition = 0; partition < partitionNum; partition++) {
String key = keyMapping.get(partition);
//This is what preserves sequence numbers across restarts
if (!_nextIndexPerPartition.containsKey(partition)) {
_nextIndexPerPartition.put(partition, new AtomicLong(0));
_sensors.addPartitionSensors(partition);
}
_produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS);
_produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition, key), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS);
}
}

private Map<Integer, String> generateKeyMappings() {
int partitionNum = _partitionNum.get();
HashMap<Integer, String> keyMapping = new HashMap<>();

int nextInt = 0;
while (keyMapping.size() < partitionNum) {
String key = Integer.toString(nextInt);
int partition = _partitioner.partition(key, partitionNum);
if (!keyMapping.containsKey(partition)) {
keyMapping.put(partition, key);
}
nextInt++;
}

return keyMapping;
}

@Override
public void stop() {
if (_running.compareAndSet(true, false)) {
Expand Down Expand Up @@ -257,16 +278,18 @@ void addPartitionSensors(int partition) {
*/
private class ProduceRunnable implements Runnable {
private final int _partition;
private final String _key;

ProduceRunnable(int partition) {
ProduceRunnable(int partition, String key) {
_partition = partition;
_key = key;
}

public void run() {
try {
long nextIndex = _nextIndexPerPartition.get(_partition).get();
String message = Utils.jsonFromFields(_topic, nextIndex, System.currentTimeMillis(), _producerId, _recordSize);
BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, null, message);
BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message);
RecordMetadata metadata = _producer.send(record, _sync);
_sensors._recordsProduced.record();
_sensors._recordsProducedPerPartition.get(_partition).record();
Expand Down
Expand Up @@ -9,6 +9,7 @@
*/
package com.linkedin.kmf.services.configs;

import com.linkedin.kmf.partitioner.NewKMPartitioner;
import com.linkedin.kmf.producer.NewProducer;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
Expand All @@ -33,6 +34,9 @@ public class ProduceServiceConfig extends AbstractConfig {
public static final String PRODUCER_CLASS_DOC = "Producer class that will be instantiated as producer in the produce service. "
+ "It can be NewProducer, or full class name of any class that implements the KMBaseProducer interface. ";

public static final String PARTITIONER_CLASS_CONFIG = "produce.partitioner.class";
public static final String PARTITIONER_CLASS_DOC = "KMPartitioner class that corresponds to the partitioner used in the target cluster.";

public static final String PRODUCE_SYNC_CONFIG = "produce.sync";
public static final String PRODUCE_SYNC_DOC = "If true, and if this is supported by the producer class, messages are sent synchronously";

Expand Down Expand Up @@ -70,6 +74,11 @@ public class ProduceServiceConfig extends AbstractConfig {
NewProducer.class.getCanonicalName(),
ConfigDef.Importance.LOW,
PRODUCER_CLASS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
ConfigDef.Type.STRING,
NewKMPartitioner.class.getCanonicalName(),
ConfigDef.Importance.HIGH,
PARTITIONER_CLASS_DOC)
.define(PRODUCE_SYNC_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
Expand Down

0 comments on commit 1ebe99c

Please sign in to comment.