From 1ebe99c1ca3bf098426effb7c9fd8c8797a7d558 Mon Sep 17 00:00:00 2001 From: Warren Green Date: Thu, 23 Mar 2017 18:08:46 -0700 Subject: [PATCH] Produce message with key to allow ordered message delivery across clusters (#55) Currently ProduceService produces message without key to the specific partition. Messages to the same partition will have incremental sequence number in the payload to allow ConsumeService to detect message loss and out-of-order message delivery. However, when we monitor a pipeline of clusters connected by MirrorMaker, the message to the same partition in the destination cluster may not have incremental sequence number since MM produces message without key. This patch addresses the problem by producing message with key that will map to the given partition using the specified partitioner class. The partitioner class needs to be the same partitioner used by MM. It is configured as org.apache.kafka.clients.producer.internals.DefaultPartitioner by default. --- build.gradle | 6 ++-- .../kmf/apps/MirrorPipelineMonitor.java | 1 + .../java/com/linkedin/kmf/common/Utils.java | 2 +- .../kmf/partitioner/KMPartitioner.java | 16 ++++++++++ .../kmf/partitioner/NewKMPartitioner.java | 24 ++++++++++++++ .../kmf/partitioner/OldKMPartitioner.java | 17 ++++++++++ .../linkedin/kmf/services/ProduceService.java | 31 ++++++++++++++++--- .../configs/ProduceServiceConfig.java | 9 ++++++ 8 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/linkedin/kmf/partitioner/KMPartitioner.java create mode 100644 src/main/java/com/linkedin/kmf/partitioner/NewKMPartitioner.java create mode 100644 src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java diff --git a/build.gradle b/build.gradle index 46012df8..e1d21bde 100644 --- a/build.gradle +++ b/build.gradle @@ -16,8 +16,8 @@ allprojects { compile 'net.sourceforge.argparse4j:argparse4j:0.5.0' compile 'org.slf4j:slf4j-log4j12:1.7.6' compile 'org.apache.avro:avro:1.4.0' - compile 'org.apache.kafka:kafka_2.11:0.10.0.1' - compile 'org.apache.kafka:kafka-clients:0.10.0.1' + compile 'org.apache.kafka:kafka_2.11:0.10.1.1' + compile 'org.apache.kafka:kafka-clients:0.10.1.1' compile 'org.testng:testng:6.8.8' compile 'org.eclipse.jetty:jetty-server:8.1.19.v20160209' compile 'org.json:json:20140107' @@ -53,7 +53,7 @@ allprojects { checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") } - + test.dependsOn('checkstyleMain', 'checkstyleTest') test { diff --git a/src/main/java/com/linkedin/kmf/apps/MirrorPipelineMonitor.java b/src/main/java/com/linkedin/kmf/apps/MirrorPipelineMonitor.java index 508da9e1..185e097a 100644 --- a/src/main/java/com/linkedin/kmf/apps/MirrorPipelineMonitor.java +++ b/src/main/java/com/linkedin/kmf/apps/MirrorPipelineMonitor.java @@ -70,6 +70,7 @@ private List createTopicManagementServices(Map topicManagementProps = createTopicManagementServiceProps(props, topic); + topicManagementProps.put(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG, ""); TopicManagementServiceConfig config = new TopicManagementServiceConfig(topicManagementProps); double partitionsToBrokerRatio = config.getDouble(TopicManagementServiceConfig.PARTITIONS_TO_BROKER_RATIO_THRESHOLD); diff --git a/src/main/java/com/linkedin/kmf/common/Utils.java b/src/main/java/com/linkedin/kmf/common/Utils.java index ab67f376..cf9994cf 100644 --- a/src/main/java/com/linkedin/kmf/common/Utils.java +++ b/src/main/java/com/linkedin/kmf/common/Utils.java @@ -16,7 +16,6 @@ import java.util.Properties; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; -import kafka.common.TopicExistsException; import kafka.server.KafkaConfig; import kafka.utils.ZkUtils; import org.apache.avro.generic.GenericData; @@ -24,6 +23,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.Encoder; import org.apache.avro.io.JsonEncoder; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.security.JaasUtils; import org.json.JSONObject; import org.slf4j.Logger; diff --git a/src/main/java/com/linkedin/kmf/partitioner/KMPartitioner.java b/src/main/java/com/linkedin/kmf/partitioner/KMPartitioner.java new file mode 100644 index 00000000..839e3d77 --- /dev/null +++ b/src/main/java/com/linkedin/kmf/partitioner/KMPartitioner.java @@ -0,0 +1,16 @@ +/** + * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.linkedin.kmf.partitioner; + +public interface KMPartitioner { + + int partition(String key, int numPartitions); + +} diff --git a/src/main/java/com/linkedin/kmf/partitioner/NewKMPartitioner.java b/src/main/java/com/linkedin/kmf/partitioner/NewKMPartitioner.java new file mode 100644 index 00000000..e73a4119 --- /dev/null +++ b/src/main/java/com/linkedin/kmf/partitioner/NewKMPartitioner.java @@ -0,0 +1,24 @@ +/** + * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.linkedin.kmf.partitioner; + +import static org.apache.kafka.common.utils.Utils.murmur2; + +public class NewKMPartitioner implements KMPartitioner { + + public int partition(String key, int partitionNum) { + byte[] keyBytes = key.getBytes(); + return toPositive(murmur2(keyBytes)) % partitionNum; + } + + private static int toPositive(int number) { + return number & 0x7fffffff; + } +} diff --git a/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java b/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java new file mode 100644 index 00000000..fa356eff --- /dev/null +++ b/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java @@ -0,0 +1,17 @@ +/** + * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.linkedin.kmf.partitioner; + +public class OldKMPartitioner implements KMPartitioner { + + public int partition(String key, int partitionNum) { + return Math.abs(key.hashCode()) % partitionNum; + } +} diff --git a/src/main/java/com/linkedin/kmf/services/ProduceService.java b/src/main/java/com/linkedin/kmf/services/ProduceService.java index 8f0137fe..3aefc7a9 100644 --- a/src/main/java/com/linkedin/kmf/services/ProduceService.java +++ b/src/main/java/com/linkedin/kmf/services/ProduceService.java @@ -10,6 +10,7 @@ package com.linkedin.kmf.services; import com.linkedin.kmf.common.Utils; +import com.linkedin.kmf.partitioner.KMPartitioner; import com.linkedin.kmf.producer.BaseProducerRecord; import com.linkedin.kmf.producer.KMBaseProducer; import com.linkedin.kmf.producer.NewProducer; @@ -44,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ProduceService implements Service { private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class); private static final String METRIC_GROUP_NAME = "produce-service"; @@ -55,6 +55,7 @@ public class ProduceService implements Service { private final String _name; private final ProduceMetrics _sensors; private KMBaseProducer _producer; + private KMPartitioner _partitioner; private ScheduledExecutorService _produceExecutor; private final ScheduledExecutorService _handleNewPartitionsExecutor; private final int _produceDelayMs; @@ -80,6 +81,7 @@ public ProduceService(Map props, String name) throws Exception { _brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG); String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG); + _partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class); _threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG); _topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG); _producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG); @@ -154,17 +156,36 @@ public void start() { } private void initializeStateForPartitions() { + Map keyMapping = generateKeyMappings(); int partitionNum = _partitionNum.get(); for (int partition = 0; partition < partitionNum; partition++) { + String key = keyMapping.get(partition); //This is what preserves sequence numbers across restarts if (!_nextIndexPerPartition.containsKey(partition)) { _nextIndexPerPartition.put(partition, new AtomicLong(0)); _sensors.addPartitionSensors(partition); } - _produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS); + _produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition, key), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS); } } + private Map generateKeyMappings() { + int partitionNum = _partitionNum.get(); + HashMap keyMapping = new HashMap<>(); + + int nextInt = 0; + while (keyMapping.size() < partitionNum) { + String key = Integer.toString(nextInt); + int partition = _partitioner.partition(key, partitionNum); + if (!keyMapping.containsKey(partition)) { + keyMapping.put(partition, key); + } + nextInt++; + } + + return keyMapping; + } + @Override public void stop() { if (_running.compareAndSet(true, false)) { @@ -257,16 +278,18 @@ void addPartitionSensors(int partition) { */ private class ProduceRunnable implements Runnable { private final int _partition; + private final String _key; - ProduceRunnable(int partition) { + ProduceRunnable(int partition, String key) { _partition = partition; + _key = key; } public void run() { try { long nextIndex = _nextIndexPerPartition.get(_partition).get(); String message = Utils.jsonFromFields(_topic, nextIndex, System.currentTimeMillis(), _producerId, _recordSize); - BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, null, message); + BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message); RecordMetadata metadata = _producer.send(record, _sync); _sensors._recordsProduced.record(); _sensors._recordsProducedPerPartition.get(_partition).record(); diff --git a/src/main/java/com/linkedin/kmf/services/configs/ProduceServiceConfig.java b/src/main/java/com/linkedin/kmf/services/configs/ProduceServiceConfig.java index 7c5e9157..1f22befb 100644 --- a/src/main/java/com/linkedin/kmf/services/configs/ProduceServiceConfig.java +++ b/src/main/java/com/linkedin/kmf/services/configs/ProduceServiceConfig.java @@ -9,6 +9,7 @@ */ package com.linkedin.kmf.services.configs; +import com.linkedin.kmf.partitioner.NewKMPartitioner; import com.linkedin.kmf.producer.NewProducer; import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; @@ -33,6 +34,9 @@ public class ProduceServiceConfig extends AbstractConfig { public static final String PRODUCER_CLASS_DOC = "Producer class that will be instantiated as producer in the produce service. " + "It can be NewProducer, or full class name of any class that implements the KMBaseProducer interface. "; + public static final String PARTITIONER_CLASS_CONFIG = "produce.partitioner.class"; + public static final String PARTITIONER_CLASS_DOC = "KMPartitioner class that corresponds to the partitioner used in the target cluster."; + public static final String PRODUCE_SYNC_CONFIG = "produce.sync"; public static final String PRODUCE_SYNC_DOC = "If true, and if this is supported by the producer class, messages are sent synchronously"; @@ -70,6 +74,11 @@ public class ProduceServiceConfig extends AbstractConfig { NewProducer.class.getCanonicalName(), ConfigDef.Importance.LOW, PRODUCER_CLASS_DOC) + .define(PARTITIONER_CLASS_CONFIG, + ConfigDef.Type.STRING, + NewKMPartitioner.class.getCanonicalName(), + ConfigDef.Importance.HIGH, + PARTITIONER_CLASS_DOC) .define(PRODUCE_SYNC_CONFIG, ConfigDef.Type.BOOLEAN, true,