From 6a3ea45f251d9e764e77a68be1d832d3b33b29ae Mon Sep 17 00:00:00 2001 From: xhl1988 Date: Mon, 24 Jul 2017 11:51:44 -0700 Subject: [PATCH] Fix stop-all.sh and apply java google style (#31) * Apply java google style * bump version * update * update * fix stop-all.sh * update stop-all.sh * apply mvn license:format * apply 120 wide column --- bin/pkg/stop-all.sh | 29 +++++---- bin/produce-data-to-kafka-topic-dummyTopic.sh | 1 + config/test-log4j.properties | 1 + config/zookeeper.properties | 1 + pom.xml | 2 +- uReplicator-Controller/pom.xml | 14 ++-- .../controller/ControllerConf.java | 4 +- .../controller/ControllerStarter.java | 23 ++++--- ...toRebalanceLiveInstanceChangeListener.java | 22 +++---- .../core/AutoTopicWhitelistingManager.java | 31 ++++----- .../core/ClusterInfoBackupManager.java | 18 ++--- .../controller/core/FileBackUpHandler.java | 1 - .../controller/core/GitBackUpHandler.java | 4 +- .../core/HelixMirrorMakerManager.java | 12 ++-- .../controller/core/IdealStateBuilder.java | 1 - .../core/InstanceTopicPartitionHolder.java | 3 +- .../core/KafkaBrokerTopicObserver.java | 25 ++++--- .../core/OnlineOfflineStateModel.java | 23 +------ .../controller/core/TopicPartition.java | 3 - .../HelixKafkaMirrorMakerMetricsReporter.java | 26 ++++---- .../rest/ControllerRestApplication.java | 9 ++- .../rest/resources/AdminRestletResource.java | 7 +- .../TopicManagementRestletResource.java | 15 ++--- .../resources/ValidationRestletResource.java | 5 +- .../controller/utils/HelixSetupUtils.java | 4 +- .../controller/utils/HelixUtils.java | 13 ++-- .../SourceKafkaClusterValidationManager.java | 14 ++-- .../validation/ValidationManager.java | 10 ++- .../controller/TestControllerConf.java | 7 +- ...MirrorMakerManagerCustomEmptyFullTest.java | 10 ++- ...rrorMakerManagerCustomEmptyWorkerTest.java | 10 ++- ...HelixMirrorMakerManagerCustomFullTest.java | 10 ++- ...lixMirrorMakerManagerCustomSimpleTest.java | 10 ++- .../TestAutoTopicWhitelistingManager.java | 10 ++- .../core/TestKafkaBrokerTopicObserver.java | 8 +-- ...MirrorMakerManagerCustomEmptyFullTest.java | 12 ++-- ...rrorMakerManagerCustomEmptyWorkerTest.java | 12 ++-- ...HelixMirrorMakerManagerCustomFullTest.java | 12 ++-- ...lixMirrorMakerManagerCustomSimpleTest.java | 12 ++-- ...tHelixKafkaMirrorMakerMetricsReporter.java | 8 +-- .../rest/ControllerStarterTest.java | 32 +++++---- .../utils/ControllerRequestURLBuilder.java | 8 +-- .../controller/utils/FakeInstance.java | 1 + .../controller/utils/KafkaStarterUtils.java | 14 ++-- .../TestOnlineOfflineStateModelFactory.java | 2 + .../controller/utils/ZkStarter.java | 10 ++- ...stSourceKafkaClusterValidationManager.java | 14 ++-- .../validation/TestValidationManager.java | 21 +++--- uReplicator-Distribution/pom.xml | 8 +-- .../starter/MirrorMakerStarter.java | 10 +-- uReplicator-Worker/pom.xml | 14 ++-- .../CompactConsumerFetcherManager.scala | 65 ++++++++++--------- .../CompactConsumerFetcherThread.scala | 28 ++++---- ...WorkerOnlineOfflineStateModelFactory.scala | 4 +- .../kafka/mirrormaker/KafkaConnector.scala | 14 ++-- .../kafka/mirrormaker/MirrorMakerWorker.scala | 37 +++++------ .../mirrormaker/PartitionTopicInfo.scala | 15 +++-- .../mirrormaker/MirrorMakerWorkerTest.scala | 9 +-- 58 files changed, 333 insertions(+), 415 deletions(-) diff --git a/bin/pkg/stop-all.sh b/bin/pkg/stop-all.sh index fcc935d9..f694e6bf 100755 --- a/bin/pkg/stop-all.sh +++ b/bin/pkg/stop-all.sh @@ -1,22 +1,27 @@ #!/bin/bash -e echo "EXECUTING: stop controller" -PID=`pgrep -f "Dapp_name=uReplicator-Controller"` -kill -9 $PID +PID1=`pgrep -f "Dapp_name=uReplicator-Controller"` || true +if [ -n "$PID1" ]; then + kill -9 ${PID1} +fi echo "EXECUTING: stop worker" -PID=`pgrep -f "Dapp_name=uReplicator-Worker"` -kill -9 $PID +PID2=`pgrep -f "Dapp_name=uReplicator-Worker"` || true +if [ -n "$PID2" ]; then + kill -9 ${PID2} +fi echo "EXECUTING: stop producing" -PID=`pgrep -f "./bin/produce-data-to-kafka-topic-dummyTopic.sh"` -kill -9 $PID +PID3=`pgrep -f "./bin/produce-data-to-kafka-topic-dummyTopic.sh"` || true +if [ -n "$PID3" ]; then + kill -9 ${PID3} +fi echo "EXECUTING: stop consuming" -PID=`pgrep -f "kafka.tools.ConsoleConsumer"` -for i in "${PID[@]}" -do - kill -9 $i -done +PID4=$(ps ax | grep -i 'kafka.tools.ConsoleConsumer' | grep java | grep -v grep | awk '{print $1}') +if [ -n "$PID4" ]; then + kill -9 ${PID4} +fi -bin/grid stop all \ No newline at end of file +bin/grid stop all diff --git a/bin/produce-data-to-kafka-topic-dummyTopic.sh b/bin/produce-data-to-kafka-topic-dummyTopic.sh index 6932b755..9a0ee14c 100755 --- a/bin/produce-data-to-kafka-topic-dummyTopic.sh +++ b/bin/produce-data-to-kafka-topic-dummyTopic.sh @@ -1,4 +1,5 @@ #!/bin/bash -e + DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" BASE_DIR=$(dirname $DIR) ZOOKEEPER=localhost:2181/cluster1 diff --git a/config/test-log4j.properties b/config/test-log4j.properties index e0bbc134..f98b3491 100644 --- a/config/test-log4j.properties +++ b/config/test-log4j.properties @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender diff --git a/config/zookeeper.properties b/config/zookeeper.properties index 74cbf904..76d48137 100644 --- a/config/zookeeper.properties +++ b/config/zookeeper.properties @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + # the directory where the snapshot is stored. dataDir=/tmp/zookeeper # the port at which the clients will connect diff --git a/pom.xml b/pom.xml index c5b72d48..f8ee76e0 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.uber.uReplicator uReplicator - 1.0.1 + 1.0.2 pom uReplicator diff --git a/uReplicator-Controller/pom.xml b/uReplicator-Controller/pom.xml index e732e33f..d399f9b4 100644 --- a/uReplicator-Controller/pom.xml +++ b/uReplicator-Controller/pom.xml @@ -1,21 +1,19 @@ - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.uber.uReplicator uReplicator-Controller jar uReplicator-Controller - uReplicator com.uber.uReplicator - 1.0.1 + uReplicator + 1.0.2 - Helix kafka mirror maker is built for resolving high level consumers rebalancing pain point. - Helix provides a good interface to interact with zookeeper and manages/rebalances a stateful mapping for - instance to simple consumer mapping. + Kafka MirrorMaker with Helix is built for resolving high level consumers rebalancing pain point. + Helix provides a good interface to interact with zookeeper and manages/rebalances a stateful + mapping for instance to simple consumer mapping. diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerConf.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerConf.java index 0277c789..40f1abf0 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerConf.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerConf.java @@ -18,7 +18,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Iterator; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.configuration.PropertiesConfiguration; @@ -321,8 +320,7 @@ public static Options constructControllerOptions() { .addOption("autoRebalanceDelayInSeconds", true, "Auto Rebalance Delay in seconds") .addOption("refreshTimeInSeconds", true, "Controller Refresh Time in seconds") .addOption("initWaitTimeInSeconds", true, "Controller Init Delay in seconds") - .addOption("backUpToGit", true, - "Backup controller metadata to git (true) or local file (false)") + .addOption("backUpToGit", true, "Backup controller metadata to git (true) or local file (false)") .addOption("remoteBackupRepo", true, "Remote Backup Repo to store cluster state") .addOption("localGitRepoClonePath", true, "Clone location of the remote git backup repo") .addOption("localBackupFilePath", true, "Local backup file location"); diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerStarter.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerStarter.java index 5c753295..178cd07d 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerStarter.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/ControllerStarter.java @@ -15,9 +15,18 @@ */ package com.uber.stream.kafka.mirrormaker.controller; +import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager; +import com.uber.stream.kafka.mirrormaker.controller.core.ClusterInfoBackupManager; +import com.uber.stream.kafka.mirrormaker.controller.core.FileBackUpHandler; +import com.uber.stream.kafka.mirrormaker.controller.core.GitBackUpHandler; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; +import com.uber.stream.kafka.mirrormaker.controller.rest.ControllerRestApplication; +import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager; +import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager; import java.util.HashMap; import java.util.Map; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -29,19 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager; -import com.uber.stream.kafka.mirrormaker.controller.core.ClusterInfoBackupManager; -import com.uber.stream.kafka.mirrormaker.controller.core.FileBackUpHandler; -import com.uber.stream.kafka.mirrormaker.controller.core.GitBackUpHandler; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; -import com.uber.stream.kafka.mirrormaker.controller.rest.ControllerRestApplication; -import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager; -import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager; - /** * The main entry point for everything. + * * @author xiangfu */ public class ControllerStarter { diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java index 425e2091..f67bde34 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java @@ -15,6 +15,12 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; +import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -24,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.LiveInstanceChangeListener; @@ -34,19 +39,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.codahale.metrics.Timer.Context; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; -import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils; - /** * We only considering add or remove box(es), not considering the replacing. * For replacing, we just need to bring up a new box and give the old instanceId no auto-balancing * needed. */ public class AutoRebalanceLiveInstanceChangeListener implements LiveInstanceChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(AutoRebalanceLiveInstanceChangeListener.class); @@ -91,7 +90,7 @@ public void onLiveInstanceChange(final List liveInstances, @Override public void run() { try { - rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances()); + rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances()); } catch (Exception e) { LOGGER.error("Got exception during rebalance the whole cluster! ", e); } @@ -168,9 +167,8 @@ private static Set rescaleInstanceToTopicPartition } LOGGER.info("Trying to rescale cluster with new instances - " + Arrays.toString( newInstances.toArray(new String[0])) + " and removed instances - " + Arrays.toString( - removedInstances.toArray(new String[0]))); - TreeSet orderedSet = - new TreeSet<>(InstanceTopicPartitionHolder.getComparator()); + removedInstances.toArray(new String[0]))); + TreeSet orderedSet = new TreeSet<>(InstanceTopicPartitionHolder.getComparator()); Set tpiNeedsToBeAssigned = new HashSet(); tpiNeedsToBeAssigned.addAll(unassignedTopicPartitions); for (String instanceName : instanceToTopicPartitionMap.keySet()) { diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoTopicWhitelistingManager.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoTopicWhitelistingManager.java index ee21f515..8ebe265e 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoTopicWhitelistingManager.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoTopicWhitelistingManager.java @@ -15,6 +15,8 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.codahale.metrics.Counter; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -24,18 +26,13 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Counter; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; - -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - /** * AutoTopicWhitelistingManager will look at both source and destination Kafka brokers and pick * the topics in the intersection to add to the whitelist. @@ -86,26 +83,24 @@ public AutoTopicWhitelistingManager(KafkaBrokerTopicObserver srcKafkaTopicObserv String patternToExcludeTopics, int refreshTimeInSec) { this(srcKafkaTopicObserver, destKafkaTopicObserver, helixMirrorMakerManager, - patternToExcludeTopics, refreshTimeInSec, 120); + patternToExcludeTopics, refreshTimeInSec, 120); } public AutoTopicWhitelistingManager(KafkaBrokerTopicObserver srcKafkaTopicObserver, - KafkaBrokerTopicObserver destKafkaTopicObserver, - HelixMirrorMakerManager helixMirrorMakerManager, - String patternToExcludeTopics, - int refreshTimeInSec, - int initWaitTimeInSec) { + KafkaBrokerTopicObserver destKafkaTopicObserver, + HelixMirrorMakerManager helixMirrorMakerManager, + String patternToExcludeTopics, + int refreshTimeInSec, + int initWaitTimeInSec) { _srcKafkaTopicObserver = srcKafkaTopicObserver; _destKafkaTopicObserver = destKafkaTopicObserver; _helixMirrorMakerManager = helixMirrorMakerManager; _patternToExcludeTopics = patternToExcludeTopics; _refreshTimeInSec = refreshTimeInSec; _initWaitTimeInSec = initWaitTimeInSec; - _zkClient = new ZkClient(_helixMirrorMakerManager.getHelixZkURL(), 30000, 30000, - ZKStringSerializer$.MODULE$); + _zkClient = new ZkClient(_helixMirrorMakerManager.getHelixZkURL(), 30000, 30000, ZKStringSerializer$.MODULE$); _zkUtils = ZkUtils.apply(_zkClient, false); - _blacklistedTopicsZPath = - String.format("/%s/BLACKLISTED_TOPICS", _helixMirrorMakerManager.getHelixClusterName()); + _blacklistedTopicsZPath = String.format("/%s/BLACKLISTED_TOPICS", _helixMirrorMakerManager.getHelixClusterName()); } public void start() { @@ -244,7 +239,7 @@ public void addIntoBlacklist(String topic) { } public void removeFromBlacklist(String topic) { - _zkUtils.deletePath( _blacklistedTopicsZPath + "/" + topic); + _zkUtils.deletePath(_blacklistedTopicsZPath + "/" + topic); LOGGER.info("topic={} is removed from blacklist on zk", topic); } diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ClusterInfoBackupManager.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ClusterInfoBackupManager.java index 522c87ed..de12a7c3 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ClusterInfoBackupManager.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ClusterInfoBackupManager.java @@ -15,6 +15,9 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,17 +31,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; - /** * This manager schedules a periodic backup task once every 24 hrs to take the * backup of the mirror maker controller cluster state and dump the ideal state, parition assignment * to two different files either to a remote git repo or a local backup file based on the config - * - * @author naveencherukuri * + * @author naveencherukuri */ public class ClusterInfoBackupManager { @@ -54,7 +52,8 @@ public class ClusterInfoBackupManager { private final ControllerConf _config; private String envInfo = "default"; - public ClusterInfoBackupManager(HelixMirrorMakerManager helixMirrorMakerManager, BackUpHandler handler, + public ClusterInfoBackupManager(HelixMirrorMakerManager helixMirrorMakerManager, + BackUpHandler handler, ControllerConf config) { _helixMirrorMakerManager = helixMirrorMakerManager; _handler = handler; @@ -82,7 +81,6 @@ public synchronized void dumpState() throws Exception { return; } - LOGGER.info("Backing up the CurrentState and the IdealState!"); StringBuilder idealState = new StringBuilder(); StringBuilder partitionAssignment = new StringBuilder(); @@ -92,7 +90,6 @@ public synchronized void dumpState() throws Exception { return; } - JSONArray resultList = new JSONArray(); for (String topicName : topicLists) { @@ -103,10 +100,8 @@ public synchronized void dumpState() throws Exception { resultList.add(resultJson); } - idealState.append(new StringRepresentation(resultList.toJSONString())); - resultList = new JSONArray(); for (String topicName : topicLists) { @@ -167,7 +162,6 @@ public synchronized void dumpState() throws Exception { resultJson.put("serverToPartitionMapping", serverToPartitionMappingJson); resultJson.put("serverToNumPartitionsMapping", serverToNumPartitionsMappingJson); resultList.add(resultJson); - } partitionAssignment.append(new StringRepresentation(resultList.toJSONString())); diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/FileBackUpHandler.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/FileBackUpHandler.java index cfee06f2..9f0eaa85 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/FileBackUpHandler.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/FileBackUpHandler.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/GitBackUpHandler.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/GitBackUpHandler.java index 14f1bc47..d2f1351d 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/GitBackUpHandler.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/GitBackUpHandler.java @@ -20,7 +20,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Date; - import org.apache.commons.io.FileUtils; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; @@ -108,8 +107,9 @@ public void writeToFile(String fileName, String data) throws Exception { } finally { output.close(); git.close(); - if (result != null) + if (result != null) { result.getRepository().close(); + } backupRepo.close(); } } diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java index 759e09b2..1c416640 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java @@ -15,13 +15,15 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.HelixSetupUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; - import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -36,10 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.HelixSetupUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils; - /** * Main logic for Helix Controller. Provided all necessary APIs for topics management. * Have two modes auto/custom: @@ -215,11 +213,11 @@ public List getCurrentLiveInstances() { } public String getHelixZkURL() { - return _helixZkURL; + return _helixZkURL; } public String getHelixClusterName() { - return _helixClusterName; + return _helixClusterName; } } diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java index 20f7a1ad..7eeecfbe 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/IdealStateBuilder.java @@ -16,7 +16,6 @@ package com.uber.stream.kafka.mirrormaker.controller.core; import java.util.PriorityQueue; - import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/InstanceTopicPartitionHolder.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/InstanceTopicPartitionHolder.java index 88155ef7..f5fe2592 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/InstanceTopicPartitionHolder.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/InstanceTopicPartitionHolder.java @@ -15,13 +15,12 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.google.common.collect.ImmutableSet; import java.util.Collection; import java.util.Comparator; import java.util.HashSet; import java.util.Set; -import com.google.common.collect.ImmutableSet; - /** * InstanceTopicPartitionHolder is a wrapper for instance and the topicPartitionSet it's holding. */ diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/KafkaBrokerTopicObserver.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/KafkaBrokerTopicObserver.java index 9cbf243d..be2e7a72 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/KafkaBrokerTopicObserver.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/KafkaBrokerTopicObserver.java @@ -15,6 +15,12 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -24,21 +30,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.codahale.metrics.Timer.Context; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; - -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; import scala.collection.JavaConversions; import scala.collection.Seq; @@ -108,7 +105,8 @@ public void handleChildChange(String parentPath, List currentChilds) } } scala.collection.mutable.Map>> partitionAssignmentForTopics = - _zkUtils.getPartitionAssignmentForTopics(JavaConversions.asScalaBuffer(ImmutableList.copyOf(newAddedTopics))); + _zkUtils.getPartitionAssignmentForTopics( + JavaConversions.asScalaBuffer(ImmutableList.copyOf(newAddedTopics))); for (String topic : newAddedTopics) { try { @@ -142,7 +140,8 @@ private synchronized void refreshCache() { } scala.collection.mutable.Map>> partitionAssignmentForTopics = - _zkUtils.getPartitionAssignmentForTopics(JavaConversions.asScalaBuffer(ImmutableList.copyOf(servingTopics))); + _zkUtils.getPartitionAssignmentForTopics( + JavaConversions.asScalaBuffer(ImmutableList.copyOf(servingTopics))); for (String topic : servingTopics) { try { diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OnlineOfflineStateModel.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OnlineOfflineStateModel.java index ca29613f..0448d2c6 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OnlineOfflineStateModel.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OnlineOfflineStateModel.java @@ -15,30 +15,10 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.HelixDefinedState; import org.apache.helix.ZNRecord; import org.apache.helix.model.StateModelDefinition; @@ -47,6 +27,7 @@ * Helix built-in Online-offline state model definition */ public final class OnlineOfflineStateModel extends StateModelDefinition { + public static final String name = "OnlineOffline"; public enum States { @@ -60,7 +41,6 @@ public OnlineOfflineStateModel() { /** * Build OnlineOffline state model definition - * @return */ public static StateModelDefinition build() { StateModelDefinition.Builder builder = new StateModelDefinition.Builder(name); @@ -89,7 +69,6 @@ public static StateModelDefinition build() { /** * Generate OnlineOffline state model definition * Replaced by OnlineOfflineSMD#build() - * @return */ @Deprecated public static ZNRecord generateConfigForOnlineOffline() { diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/TopicPartition.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/TopicPartition.java index 39b3920f..5a0fd20a 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/TopicPartition.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/TopicPartition.java @@ -36,9 +36,6 @@ public TopicPartition(String topic, int numPartitions) { /** * This is used only for POST and PUT call to create the pojo. - * - * @param jsonRequest - * @return */ public static TopicPartition init(String jsonRequest) { JSONObject jsonObject = JSON.parseObject(jsonRequest); diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/reporter/HelixKafkaMirrorMakerMetricsReporter.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/reporter/HelixKafkaMirrorMakerMetricsReporter.java index cc59fb45..51001212 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/reporter/HelixKafkaMirrorMakerMetricsReporter.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/reporter/HelixKafkaMirrorMakerMetricsReporter.java @@ -15,11 +15,6 @@ */ package com.uber.stream.kafka.mirrormaker.controller.reporter; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - -import org.apache.log4j.Logger; - import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; @@ -28,6 +23,9 @@ import com.google.common.base.Preconditions; import com.google.common.io.Closeables; import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; /** * Holds a singleton MetricRegistry to be shared across MirrorMaker. @@ -35,6 +33,7 @@ * config). Note: There is no need to explicitly close this. */ public class HelixKafkaMirrorMakerMetricsReporter { + private static final String KAFKA_MIRROR_MAKER_METRICS_REPORTER_PREFIX_FORMAT = "stats.%s.counter.kafka-mirror-maker-controller.%s.%s"; @@ -95,10 +94,12 @@ public class HelixKafkaMirrorMakerMetricsReporter { Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { - if (enabledJmxReporting) + if (enabledJmxReporting) { Closeables.close(_jmxReporter, true); - if (enabledGraphiteReporting) + } + if (enabledGraphiteReporting) { Closeables.close(_graphiteReporter, true); + } } catch (Exception e) { LOGGER.error("Error while closing Jmx and Graphite reporters.", e); } @@ -107,11 +108,13 @@ public void run() { } private String[] parse(String environment) { - if (environment == null || environment.trim().length() <= 0) + if (environment == null || environment.trim().length() <= 0) { return null; + } String[] res = environment.split("\\."); - if (res == null || res.length != 2) + if (res == null || res.length != 2) { return null; + } return res; } @@ -131,12 +134,13 @@ static Graphite getGraphite(ControllerConf config) { /** * This function must be called before calling the get() method, because of * the dependency on the config object. - * + * * @param config Specifies config pertaining to Metrics */ public static synchronized void init(ControllerConf config) { - if (DID_INIT) + if (DID_INIT) { return; + } METRICS_REPORTER_INSTANCE = new HelixKafkaMirrorMakerMetricsReporter(config); DID_INIT = true; } diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerRestApplication.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerRestApplication.java index f371b7af..b7fe0fac 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerRestApplication.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerRestApplication.java @@ -15,17 +15,16 @@ */ package com.uber.stream.kafka.mirrormaker.controller.rest; +import com.uber.stream.kafka.mirrormaker.controller.rest.resources.AdminRestletResource; +import com.uber.stream.kafka.mirrormaker.controller.rest.resources.HealthCheckRestletResource; +import com.uber.stream.kafka.mirrormaker.controller.rest.resources.TopicManagementRestletResource; +import com.uber.stream.kafka.mirrormaker.controller.rest.resources.ValidationRestletResource; import org.restlet.Application; import org.restlet.Context; import org.restlet.Restlet; import org.restlet.routing.Router; import org.restlet.routing.Template; -import com.uber.stream.kafka.mirrormaker.controller.rest.resources.AdminRestletResource; -import com.uber.stream.kafka.mirrormaker.controller.rest.resources.HealthCheckRestletResource; -import com.uber.stream.kafka.mirrormaker.controller.rest.resources.TopicManagementRestletResource; -import com.uber.stream.kafka.mirrormaker.controller.rest.resources.ValidationRestletResource; - /** * Register different REST endpoints */ diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java index 2a5d78c2..7fc07089 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/AdminRestletResource.java @@ -1,5 +1,6 @@ package com.uber.stream.kafka.mirrormaker.controller.rest.resources; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; import org.restlet.representation.Representation; import org.restlet.representation.StringRepresentation; import org.restlet.resource.Get; @@ -7,15 +8,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; - /** * AdminRestletResource is used to control auto balancing enable/disalbe. - * - * @author xiangfu * + * @author xiangfu */ public class AdminRestletResource extends ServerResource { + private static final Logger LOGGER = LoggerFactory.getLogger(AdminRestletResource.class); diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java index a06a684c..07f1d7eb 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java @@ -1,13 +1,17 @@ package com.uber.stream.kafka.mirrormaker.controller.rest.resources; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; +import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - -import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.restlet.data.MediaType; @@ -23,16 +27,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; -import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; - /** * Rest API for topic management */ public class TopicManagementRestletResource extends ServerResource { + private static final Logger LOGGER = LoggerFactory.getLogger(TopicManagementRestletResource.class); diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/ValidationRestletResource.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/ValidationRestletResource.java index 64a21ea2..e0601603 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/ValidationRestletResource.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/ValidationRestletResource.java @@ -1,5 +1,7 @@ package com.uber.stream.kafka.mirrormaker.controller.rest.resources; +import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager; +import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager; import org.restlet.data.MediaType; import org.restlet.representation.Representation; import org.restlet.representation.StringRepresentation; @@ -9,9 +11,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager; -import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager; - /** * Validate idealState and externalView also update related metrics. */ diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixSetupUtils.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixSetupUtils.java index 3804f535..cae418e2 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixSetupUtils.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixSetupUtils.java @@ -15,9 +15,9 @@ */ package com.uber.stream.kafka.mirrormaker.controller.utils; +import com.uber.stream.kafka.mirrormaker.controller.core.OnlineOfflineStateModel; import java.util.HashMap; import java.util.Map; - import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.controller.HelixControllerMain; @@ -31,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.core.OnlineOfflineStateModel; - /** * HelixSetupUtils handles how to create or get a helixCluster in controller. */ diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixUtils.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixUtils.java index 4afc2538..8a1594af 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixUtils.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/utils/HelixUtils.java @@ -15,6 +15,10 @@ */ package com.uber.stream.kafka.mirrormaker.controller.utils; +import com.google.common.collect.ImmutableList; +import com.uber.stream.kafka.mirrormaker.controller.core.InstanceTopicPartitionHolder; +import com.uber.stream.kafka.mirrormaker.controller.core.OnlineOfflineStateModel; +import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -22,7 +26,6 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; - import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -36,12 +39,8 @@ import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; -import com.google.common.collect.ImmutableList; -import com.uber.stream.kafka.mirrormaker.controller.core.InstanceTopicPartitionHolder; -import com.uber.stream.kafka.mirrormaker.controller.core.OnlineOfflineStateModel; -import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; - public class HelixUtils { + public static String getAbsoluteZkPathForHelix(String zkBaseUrl) { zkBaseUrl = StringUtils.chomp(zkBaseUrl, "/"); return zkBaseUrl; @@ -67,7 +66,7 @@ public static List liveInstances(HelixManager helixManager) { /** * From IdealStates. - * @param helixManager + * * @return InstanceToNumTopicPartitionMap */ public static Map> getInstanceToTopicPartitionsMap( diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java index a5669b04..3ee180ee 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/SourceKafkaClusterValidationManager.java @@ -15,6 +15,12 @@ */ package com.uber.stream.kafka.mirrormaker.controller.validation; +import com.alibaba.fastjson.JSONObject; +import com.codahale.metrics.Counter; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; +import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -22,17 +28,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONObject; -import com.codahale.metrics.Counter; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; -import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; - /** * Validate idealstates and source kafka cluster info and update related metrics. */ diff --git a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java index ff988a54..86c736cb 100644 --- a/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java +++ b/uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/validation/ValidationManager.java @@ -15,22 +15,20 @@ */ package com.uber.stream.kafka.mirrormaker.controller.validation; +import com.alibaba.fastjson.JSONObject; +import com.codahale.metrics.Counter; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONObject; -import com.codahale.metrics.Counter; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter; - /** * Validate every one minute and update related metrics. */ diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/TestControllerConf.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/TestControllerConf.java index 785c2a5d..9e115f9f 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/TestControllerConf.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/TestControllerConf.java @@ -17,7 +17,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -44,7 +43,7 @@ public void testDefaultControllerConf() throws UnknownHostException { @Test public void testCmdControllerConf() throws ParseException { - String[] args = new String[] { + String[] args = new String[]{ "-helixClusterName", "testHelixClusterName", "-zookeeper", "localhost:2181", "-port", "9090", @@ -89,7 +88,7 @@ public void testCmdControllerConf() throws ParseException { @Test public void testAnotherCmdControllerConf() throws ParseException { - String[] args = new String[] { + String[] args = new String[]{ "-helixClusterName", "testHelixClusterName", "-zookeeper", "localhost:2181", "-port", "9090", @@ -131,7 +130,7 @@ public void testAnotherCmdControllerConf() throws ParseException { @Test public void testNoGitBackupCmdControllerConf() throws ParseException, UnknownHostException { - String[] args = new String[] { + String[] args = new String[]{ "-helixClusterName", "testHelixClusterName", "-zookeeper", "localhost:2181", "-port", "9090", diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyFullTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyFullTest.java index 4b56b081..9297da9e 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyFullTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyFullTest.java @@ -15,10 +15,13 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +30,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomEmptyFullTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyWorkerTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyWorkerTest.java index 6832bb0c..72e520f7 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyWorkerTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomEmptyWorkerTest.java @@ -15,10 +15,13 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +30,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomEmptyWorkerTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomFullTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomFullTest.java index 9cd7c18b..21fbb98c 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomFullTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomFullTest.java @@ -15,10 +15,13 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +30,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomFullTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomSimpleTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomSimpleTest.java index 742ef247..fee962ee 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomSimpleTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManagerCustomSimpleTest.java @@ -15,19 +15,17 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomSimpleTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestAutoTopicWhitelistingManager.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestAutoTopicWhitelistingManager.java index d023eca2..c89310fc 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestAutoTopicWhitelistingManager.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestAutoTopicWhitelistingManager.java @@ -15,6 +15,10 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; +import kafka.server.KafkaServerStartable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -22,12 +26,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - -import kafka.server.KafkaServerStartable; - public class TestAutoTopicWhitelistingManager { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestKafkaBrokerTopicObserver.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestKafkaBrokerTopicObserver.java index 75a00e84..1ff317e4 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestKafkaBrokerTopicObserver.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/core/TestKafkaBrokerTopicObserver.java @@ -15,6 +15,9 @@ */ package com.uber.stream.kafka.mirrormaker.controller.core; +import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; +import kafka.server.KafkaServerStartable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -22,11 +25,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - -import kafka.server.KafkaServerStartable; - public class TestKafkaBrokerTopicObserver { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyFullTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyFullTest.java index 2a9ea16f..1e78e47a 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyFullTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyFullTest.java @@ -15,10 +15,14 @@ */ package com.uber.stream.kafka.mirrormaker.controller.integration; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,12 +31,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomEmptyFullTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyWorkerTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyWorkerTest.java index 57d0361a..71fd2d6d 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyWorkerTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomEmptyWorkerTest.java @@ -15,10 +15,14 @@ */ package com.uber.stream.kafka.mirrormaker.controller.integration; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,12 +31,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomEmptyWorkerTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomFullTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomFullTest.java index b31382a3..c77ccc90 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomFullTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomFullTest.java @@ -15,10 +15,14 @@ */ package com.uber.stream.kafka.mirrormaker.controller.integration; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.model.ExternalView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,12 +31,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomFullTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomSimpleTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomSimpleTest.java index e3b877d4..d13a0e83 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomSimpleTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/integration/HelixMirrorMakerManagerCustomSimpleTest.java @@ -15,20 +15,18 @@ */ package com.uber.stream.kafka.mirrormaker.controller.integration; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - public class HelixMirrorMakerManagerCustomSimpleTest { private static final Logger LOGGER = diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/reporter/TestHelixKafkaMirrorMakerMetricsReporter.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/reporter/TestHelixKafkaMirrorMakerMetricsReporter.java index 9207dedd..6a1c7d6b 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/reporter/TestHelixKafkaMirrorMakerMetricsReporter.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/reporter/TestHelixKafkaMirrorMakerMetricsReporter.java @@ -15,17 +15,15 @@ */ package com.uber.stream.kafka.mirrormaker.controller.reporter; -import java.lang.reflect.Field; - -import org.testng.Assert; -import org.testng.annotations.Test; - import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.codahale.metrics.Timer.Context; import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; import com.uber.stream.kafka.mirrormaker.controller.ControllerStarter; +import java.lang.reflect.Field; +import org.testng.Assert; +import org.testng.annotations.Test; public class TestHelixKafkaMirrorMakerMetricsReporter { diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerStarterTest.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerStarterTest.java index 712a05ef..e0f0cc3e 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerStarterTest.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/rest/ControllerStarterTest.java @@ -15,9 +15,19 @@ */ package com.uber.stream.kafka.mirrormaker.controller.rest; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; +import com.uber.stream.kafka.mirrormaker.controller.ControllerStarter; +import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerRequestURLBuilder; +import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; +import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; +import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; import java.util.ArrayList; import java.util.List; - +import kafka.server.KafkaServerStartable; import org.I0Itec.zkclient.ZkClient; import org.restlet.Client; import org.restlet.Request; @@ -31,20 +41,8 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; -import com.uber.stream.kafka.mirrormaker.controller.ControllerStarter; -import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerRequestURLBuilder; -import com.uber.stream.kafka.mirrormaker.controller.utils.ControllerTestUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; -import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; -import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - -import kafka.server.KafkaServerStartable; - public class ControllerStarterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStarterTest.class); public static String CONTROLLER_PORT = "9999"; public static Client HTTP_CLIENT = new Client(Protocol.HTTP); @@ -103,7 +101,7 @@ public void shutdown() { kafkaBrokerTopicObserver.stop(); KafkaStarterUtils.stopServer(kafkaStarter); - + ZK_CLIENT.deleteRecursive("/" + HELIX_CLUSTER_NAME); ZK_CLIENT.close(); ZkStarter.stopLocalZkServer(); @@ -120,7 +118,7 @@ public ControllerStarter startController(String helixClusterName, String port) { conf.setEnableAutoTopicExpansion("true"); conf.setSrcKafkaZkPath(KafkaStarterUtils.DEFAULT_ZK_STR); conf.setDestKafkaZkPath(KafkaStarterUtils.DEFAULT_ZK_STR); - + final ControllerStarter starter = new ControllerStarter(conf); try { starter.start(); @@ -139,7 +137,7 @@ public void testGet() { Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK); Assert.assertEquals(response.getEntityAsText(), "No topic is added in MirrorMaker Controller!"); System.out.println(response.getEntityAsText()); - + // Create topic request = ControllerRequestURLBuilder.baseUrl(REQUEST_URL) .getTopicCreationRequestUrl("testTopic0", 8); diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ControllerRequestURLBuilder.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ControllerRequestURLBuilder.java index 534f04ec..d5f608bd 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ControllerRequestURLBuilder.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ControllerRequestURLBuilder.java @@ -15,14 +15,14 @@ */ package com.uber.stream.kafka.mirrormaker.controller.utils; +import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; import org.apache.commons.lang.StringUtils; import org.restlet.Request; import org.restlet.data.MediaType; import org.restlet.data.Method; -import com.uber.stream.kafka.mirrormaker.controller.core.TopicPartition; - public class ControllerRequestURLBuilder { + private final String _baseUrl; private ControllerRequestURLBuilder(String baseUrl) { @@ -34,7 +34,7 @@ public static ControllerRequestURLBuilder baseUrl(String baseUrl) { } public Request getTopicExternalViewRequestUrl(String topic) { - String requestUrl = StringUtils.join(new String[] { + String requestUrl = StringUtils.join(new String[]{ _baseUrl, "/topics/", topic }); @@ -43,7 +43,7 @@ public Request getTopicExternalViewRequestUrl(String topic) { } public Request getTopicDeleteRequestUrl(String topic) { - String requestUrl = StringUtils.join(new String[] { + String requestUrl = StringUtils.join(new String[]{ _baseUrl, "/topics/", topic }); diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/FakeInstance.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/FakeInstance.java index 349b52e2..4bd8a705 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/FakeInstance.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/FakeInstance.java @@ -61,6 +61,7 @@ public String toString() { } public static class ShutdownHook extends Thread { + private final FakeInstance _fakeInstance; public ShutdownHook(FakeInstance fakeInstance) { diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java index 318e5fa8..4cccccc9 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java @@ -15,6 +15,8 @@ */ package com.uber.stream.kafka.mirrormaker.controller.utils; +import java.io.File; +import java.util.Properties; import kafka.admin.TopicCommand; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; @@ -23,15 +25,12 @@ import org.apache.commons.io.FileUtils; import org.apache.kafka.common.errors.TopicExistsException; -import java.io.File; -import java.util.Properties; - /** * Utilities to start Kafka during unit tests. - * */ public class KafkaStarterUtils { + public static final int DEFAULT_KAFKA_PORT = 19092; public static final int DEFAULT_BROKER_ID = 0; public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + "/kafka"; @@ -41,8 +40,8 @@ public static Properties getDefaultKafkaConfiguration() { return new Properties(); } - public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr, - final Properties configuration) { + public static KafkaServerStartable startServer(final int port, final int brokerId, + final String zkStr, final Properties configuration) { // Create the ZK nodes for Kafka, if needed int indexOfFirstSlash = zkStr.indexOf('/'); if (indexOfFirstSlash != -1) { @@ -100,7 +99,8 @@ public static void stopServer(KafkaServerStartable serverStartable) { public static void createTopic(String kafkaTopic, String zkStr) { // TopicCommand.main() will call System.exit() finally, which will break maven-surefire-plugin try { - String[] args = new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", "1", "--partitions", "1", "--topic", kafkaTopic}; + String[] args = new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", "1", + "--partitions", "1", "--topic", kafkaTopic}; ZkUtils zkUtils = ZkUtils.apply(zkStr, 30000, 30000, false); TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args); TopicCommand.createTopic(zkUtils, opts); diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/TestOnlineOfflineStateModelFactory.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/TestOnlineOfflineStateModelFactory.java index 31c34bb9..620a15cb 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/TestOnlineOfflineStateModelFactory.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/TestOnlineOfflineStateModelFactory.java @@ -21,6 +21,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory; public class TestOnlineOfflineStateModelFactory extends StateModelFactory { + int _delay; final String _instanceId; @@ -37,6 +38,7 @@ public StateModel createNewStateModel(String resourceName, String stateUnitKey) } public static class TestOnlineOfflineStateModel extends StateModel { + int _transDelay = 0; final String _instanceId; diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ZkStarter.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ZkStarter.java index 4d3398ba..4c7f0a2e 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ZkStarter.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/ZkStarter.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeUnit; - import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; @@ -27,9 +26,10 @@ import org.slf4j.LoggerFactory; public class ZkStarter { + private static final Logger LOGGER = LoggerFactory.getLogger(ZkStarter.class); public static final int DEFAULT_ZK_TEST_PORT = new Random().nextInt(10000) + 10000; - public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;; + public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT; private static PublicZooKeeperServerMain _zookeeperServerMain = null; private static String _zkDataDir = null; @@ -38,6 +38,7 @@ public class ZkStarter { * Silly class to make protected methods public. */ static class PublicZooKeeperServerMain extends ZooKeeperServerMain { + @Override public void initializeAndRun(String[] args) throws QuorumPeerConfig.ConfigException, IOException { @@ -65,6 +66,7 @@ public static void startLocalZkServer() { /** * Starts a local Zk instance with a generated empty data directory + * * @param port The port to listen on */ public static void startLocalZkServer(final int port) { @@ -74,6 +76,7 @@ public static void startLocalZkServer(final int port) { /** * Starts a local Zk instance + * * @param port The port to listen on * @param dataDirPath The path for the Zk data directory */ @@ -87,7 +90,7 @@ public synchronized static void startLocalZkServer(final int port, final String _zookeeperServerMain = new PublicZooKeeperServerMain(); LOGGER.info("Zookeeper data path - " + dataDirPath); _zkDataDir = dataDirPath; - final String[] args = new String[] { + final String[] args = new String[]{ Integer.toString(port), dataDirPath }; new Thread() { @@ -126,6 +129,7 @@ public static void stopLocalZkServer() { /** * Stops a local Zk instance. + * * @param deleteDataDir Whether or not to delete the data directory */ public synchronized static void stopLocalZkServer(final boolean deleteDataDir) { diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestSourceKafkaClusterValidationManager.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestSourceKafkaClusterValidationManager.java index a6b50750..64860870 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestSourceKafkaClusterValidationManager.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestSourceKafkaClusterValidationManager.java @@ -15,21 +15,19 @@ */ package com.uber.stream.kafka.mirrormaker.controller.validation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterTest; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - import com.alibaba.fastjson.JSONObject; import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver; import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - import kafka.server.KafkaServerStartable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; public class TestSourceKafkaClusterValidationManager { diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestValidationManager.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestValidationManager.java index 969e5e02..53abe899 100644 --- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestValidationManager.java +++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/validation/TestValidationManager.java @@ -15,18 +15,6 @@ */ package com.uber.stream.kafka.mirrormaker.controller.validation; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterTest; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - import com.alibaba.fastjson.JSONObject; import com.uber.stream.kafka.mirrormaker.controller.ControllerConf; import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager; @@ -35,8 +23,15 @@ import com.uber.stream.kafka.mirrormaker.controller.utils.FakeInstance; import com.uber.stream.kafka.mirrormaker.controller.utils.KafkaStarterUtils; import com.uber.stream.kafka.mirrormaker.controller.utils.ZkStarter; - +import java.util.ArrayList; +import java.util.List; import kafka.server.KafkaServerStartable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; public class TestValidationManager { diff --git a/uReplicator-Distribution/pom.xml b/uReplicator-Distribution/pom.xml index c0cba10e..add2ab36 100644 --- a/uReplicator-Distribution/pom.xml +++ b/uReplicator-Distribution/pom.xml @@ -1,16 +1,14 @@ - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.uber.uReplicator uReplicator-Distribution jar uReplicator-Distribution - uReplicator com.uber.uReplicator - 1.0.1 + uReplicator + 1.0.2 A distribution for this project. diff --git a/uReplicator-Distribution/src/main/java/com/uber/stream/kafka/mirrormaker/starter/MirrorMakerStarter.java b/uReplicator-Distribution/src/main/java/com/uber/stream/kafka/mirrormaker/starter/MirrorMakerStarter.java index c526a09a..c42febbb 100644 --- a/uReplicator-Distribution/src/main/java/com/uber/stream/kafka/mirrormaker/starter/MirrorMakerStarter.java +++ b/uReplicator-Distribution/src/main/java/com/uber/stream/kafka/mirrormaker/starter/MirrorMakerStarter.java @@ -15,15 +15,12 @@ */ package com.uber.stream.kafka.mirrormaker.starter; +import com.uber.stream.kafka.mirrormaker.controller.ControllerStarter; import java.util.Arrays; - +import kafka.mirrormaker.MirrorMakerWorker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.uber.stream.kafka.mirrormaker.controller.ControllerStarter; - -import kafka.mirrormaker.MirrorMakerWorker; - /** * This is the entry point to start mirror maker controller and worker. * The 1st parameter indicates the module to start: @@ -48,8 +45,7 @@ public static void main(String[] args) throws Exception { + " to start as the first parameter! Current args: {}", Arrays.toString(args)); } } else { - LOGGER.error( - "Start script doesn't provide enough parameters! Current args: {}.", Arrays.toString(args)); + LOGGER.error("Start script doesn't provide enough parameters! Current args: {}.", Arrays.toString(args)); } } diff --git a/uReplicator-Worker/pom.xml b/uReplicator-Worker/pom.xml index ab29efed..9b03e7de 100644 --- a/uReplicator-Worker/pom.xml +++ b/uReplicator-Worker/pom.xml @@ -1,20 +1,18 @@ - 4.0.0 - com.uber.uReplicator uReplicator-Worker uReplicator-Worker - uReplicator com.uber.uReplicator - 1.0.1 + uReplicator + 1.0.2 - Kafka mirror maker with Helix is built for resolving high level consumers rebalancing pain point. - Helix provides a good interface to dynamically interact with zookeeper and manages/rebalances a stateful mapping for - instance to simple consumer mapping. + Kafka MirrorMaker with Helix is built for resolving high level consumers rebalancing pain point. + Helix provides a good interface to interact with zookeeper and manages/rebalances a stateful + mapping for instance to simple consumer mapping. diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala index 3bcd3725..da8d33e3 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala @@ -44,9 +44,9 @@ import scala.collection.{JavaConversions, Map, Set, mutable} * @param config * @param zkClient */ -class CompactConsumerFetcherManager (private val consumerIdString: String, - private val config: ConsumerConfig, - private val zkClient : ZkClient) +class CompactConsumerFetcherManager(private val consumerIdString: String, + private val config: ConsumerConfig, + private val zkClient: ZkClient) extends Logging with KafkaMetricsGroup { protected val name: String = "CompactConsumerFetcherManager-%d".format(System.currentTimeMillis) private val clientId: String = config.clientId @@ -88,23 +88,23 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, ) newGauge( - "MinFetchRate", { - new Gauge[Double] { - // current min fetch rate across all fetchers/topics/partitions - def value = { - val headRate: Double = - fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0) - - fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { - fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) - }) + "MinFetchRate", { + new Gauge[Double] { + // current min fetch rate across all fetchers/topics/partitions + def value = { + val headRate: Double = + fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0) + + fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { + fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) + }) + } } - } - }, - Map("clientId" -> clientId) + }, + Map("clientId" -> clientId) ) - private def getFetcherId(topic: String, partitionId: Int) : Int = { + private def getFetcherId(topic: String, partitionId: Int): Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } @@ -117,8 +117,9 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { debug("addFetcherForPartitions get lock") - val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => - BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} + val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicAndPartition, brokerAndInitialOffset) => + BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition)) + } for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { var fetcherThread: CompactConsumerFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { @@ -132,14 +133,16 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => topicAndPartition -> brokerAndInitOffset.initOffset }) - info("Fetcher Thread for topic partitions: %s is %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => - "[" + topicAndPartition + ", InitialOffset " + brokerAndInitialOffset.initOffset + "] "}, fetcherThread.name)) + info("Fetcher Thread for topic partitions: %s is %s".format(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitialOffset) => + "[" + topicAndPartition + ", InitialOffset " + brokerAndInitialOffset.initOffset + "] " + }, fetcherThread.name)) } debug("addFetcherForPartitions releasing lock") } - info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => - "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) + info("Added fetcher for partitions %s".format(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitialOffset) => + "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] " + })) } def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) { @@ -158,7 +161,7 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, def closeAllFetchers() { mapLock synchronized { - for ( (_, fetcher) <- fetcherThreadMap) { + for ((_, fetcher) <- fetcherThreadMap) { fetcher.shutdown() } fetcherThreadMap.clear() @@ -240,7 +243,7 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, partitionInfoMap.get(tp).deleted.set(true) } - def getPartitionInfoMapSize():Int = { + def getPartitionInfoMapSize(): Int = { var count = 0 val piItr = partitionInfoMap.values().iterator() while (piItr.hasNext) { @@ -311,12 +314,12 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, config.clientId, config.socketTimeoutMs, correlationId.getAndIncrement).topicsMetadata - if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) + if (logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) topicsMetadata.foreach { tmd => val topic = tmd.topic tmd.partitionsMetadata.foreach { pmd => val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) - if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { + if (pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { info("Try find leader for topic: %s, partition:%d".format(topicAndPartition.topic, topicAndPartition.partition)) val leaderBroker = pmd.leader.get leaderForPartitionsMap.put(topicAndPartition, leaderBroker) @@ -336,10 +339,10 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, } try { - addFetcherForPartitions(leaderForPartitionsMap.map{ + addFetcherForPartitions(leaderForPartitionsMap.map { case (topicAndPartition, broker) => - topicAndPartition -> BrokerAndInitialOffset(broker, partitionInfoMap.get(topicAndPartition).getFetchOffset())} - ) + topicAndPartition -> BrokerAndInitialOffset(broker, partitionInfoMap.get(topicAndPartition).getFetchOffset()) + }) } catch { case t: Throwable => { if (!isRunning.get()) @@ -355,5 +358,5 @@ class CompactConsumerFetcherManager (private val consumerIdString: String, Thread.sleep(config.refreshLeaderBackoffMs) } } -} +} diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherThread.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherThread.scala index 17999cba..1685b5a7 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherThread.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherThread.scala @@ -15,8 +15,8 @@ */ package kafka.mirrormaker -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import kafka.api._ import kafka.cluster.BrokerEndPoint @@ -54,9 +54,10 @@ class CompactConsumerFetcherThread(name: String, private val fetchBackOffMs = config.refreshLeaderBackoffMs private var lastDumpTime = 0L; - private final val DUMP_INTERVAL_MS = 5*60*1000; + private final val DUMP_INTERVAL_MS = 5 * 60 * 1000; - private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map + // a (topic, partition) -> partitionFetchState map + private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] private val partitionAddMap = new ConcurrentHashMap[TopicAndPartition, PartitionFetchState] private val partitionDeleteMap = new ConcurrentHashMap[TopicAndPartition, Boolean] private val updateMapLock = new ReentrantLock @@ -103,7 +104,7 @@ class CompactConsumerFetcherThread(name: String, // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - var startTimestamp : Long = 0 + var startTimestamp: Long = 0 config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime @@ -122,15 +123,16 @@ class CompactConsumerFetcherThread(name: String, consumerFetcherManager.addPartitionsWithError(partitions) } - def logTopicPartitionInfo(): Unit ={ + def logTopicPartitionInfo(): Unit = { if ((System.currentTimeMillis() - lastDumpTime) > DUMP_INTERVAL_MS) { - info("Topic partitions dump in fetcher thread: %s".format(partitionMap.map{ case (topicAndPartition, partitionFetchState) => - "[" + topicAndPartition + ", Offset " + partitionFetchState.offset + "] "})) + info("Topic partitions dump in fetcher thread: %s".format(partitionMap.map { case (topicAndPartition, partitionFetchState) => + "[" + topicAndPartition + ", Offset " + partitionFetchState.offset + "] " + })) lastDumpTime = System.currentTimeMillis() } } - override def shutdown(){ + override def shutdown() { initiateShutdown() inLock(partitionMapLock) { partitionMapCond.signalAll() @@ -170,8 +172,8 @@ class CompactConsumerFetcherThread(name: String, } partitionMap.foreach { - case((topicAndPartition, partitionFetchState)) => - if(partitionFetchState.isActive) + case ((topicAndPartition, partitionFetchState)) => + if (partitionFetchState.isActive) fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) } @@ -184,7 +186,7 @@ class CompactConsumerFetcherThread(name: String, logTopicPartitionInfo() } - if(!fetchRequest.requestInfo.isEmpty) + if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) } @@ -212,7 +214,7 @@ class CompactConsumerFetcherThread(name: String, // process fetched data inLock(partitionMapLock) { response.data.foreach { - case(topicAndPartition, partitionData) => + case (topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => { // we append to the log if the current offset is defined and it is the same as the offset requested during fetch @@ -273,7 +275,7 @@ class CompactConsumerFetcherThread(name: String, } } - if(partitionsWithError.size > 0) { + if (partitionsWithError.size > 0) { info("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/HelixWorkerOnlineOfflineStateModelFactory.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/HelixWorkerOnlineOfflineStateModelFactory.scala index 6484f5de..8eb82d52 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/HelixWorkerOnlineOfflineStateModelFactory.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/HelixWorkerOnlineOfflineStateModelFactory.scala @@ -28,10 +28,10 @@ import org.apache.helix.participant.statemachine.{StateModel, StateModelFactory} */ class HelixWorkerOnlineOfflineStateModelFactory(final val instanceId: String, final val connector: KafkaConnector) extends StateModelFactory[StateModel] with Logging { - override def createNewStateModel( partitionName:String) = new OnlineOfflineStateModel(instanceId, connector) + override def createNewStateModel(partitionName: String) = new OnlineOfflineStateModel(instanceId, connector) // register mm instance - class OnlineOfflineStateModel (final val instanceId: String, final val connectors: KafkaConnector) extends StateModel { + class OnlineOfflineStateModel(final val instanceId: String, final val connectors: KafkaConnector) extends StateModel { def onBecomeOnlineFromOffline(message: Message, context: NotificationContext) = { info("OnlineOfflineStateModel.onBecomeOnlineFromOffline for topic: " diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/KafkaConnector.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/KafkaConnector.scala index 74fc26e8..dfb4683e 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/KafkaConnector.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/KafkaConnector.scala @@ -22,19 +22,19 @@ import kafka.common.{AppInfo, OffsetAndMetadata, OffsetMetadataAndError, TopicAn import kafka.consumer._ import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter} import kafka.serializer.DefaultDecoder -import kafka.utils.{ZkUtils, Pool, ZKGroupTopicDirs} +import kafka.utils.{Pool, ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import scala.collection.JavaConverters._ import scala.collection.mutable /** - * This class handles the consumers interaction with zookeeper - * Stores the consumer offsets to Zookeeper. - * - * @param consumerIdString - * @param config - */ + * This class handles the consumers interaction with zookeeper + * Stores the consumer offsets to Zookeeper. + * + * @param consumerIdString + * @param config + */ class KafkaConnector(private val consumerIdString: String, private val config: ConsumerConfig) extends KafkaMetricsGroup { private val zkClient: ZkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/MirrorMakerWorker.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/MirrorMakerWorker.scala index 7a1782bb..f97c3cc7 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/MirrorMakerWorker.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/MirrorMakerWorker.scala @@ -37,22 +37,22 @@ import org.apache.kafka.common.utils.Utils import scala.io.Source /** - * The mirror maker has the following architecture: - * - There is one mirror maker thread uses one KafkaConnector and owns a Kafka stream. - * - All the mirror maker threads share one producer. - * - Mirror maker thread periodically flushes the producer and then commits all offsets. - * - * @note For mirror maker, the following settings are set by default to make sure there is no data loss: - * 1. use new producer with following settings - * acks=all - * retries=max integer - * block.on.buffer.full=true - * max.in.flight.requests.per.connection=1 - * 2. Consumer Settings - * auto.commit.enable=false - * 3. Mirror Maker Setting: - * abort.on.send.failure=true - */ + * The mirror maker has the following architecture: + * - There is one mirror maker thread uses one KafkaConnector and owns a Kafka stream. + * - All the mirror maker threads share one producer. + * - Mirror maker thread periodically flushes the producer and then commits all offsets. + * + * @note For mirror maker, the following settings are set by default to make sure there is no data loss: + * 1. use new producer with following settings + * acks=all + * retries=max integer + * block.on.buffer.full=true + * max.in.flight.requests.per.connection=1 + * 2. Consumer Settings + * auto.commit.enable=false + * 3. Mirror Maker Setting: + * abort.on.send.failure=true + */ object MirrorMakerWorker extends Logging with KafkaMetricsGroup { private var helixClusterName: String = null @@ -157,7 +157,6 @@ object MirrorMakerWorker extends Logging with KafkaMetricsGroup { instanceId = helixProps.getProperty("instanceId", "HelixMirrorMaker-" + System.currentTimeMillis) helixClusterName = helixProps.getProperty("helixClusterName", "testMirrorMaker") - // Create consumer connector val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt)) // Disable consumer auto offsets commit to prevent data loss. @@ -412,8 +411,8 @@ object MirrorMakerWorker extends Logging with KafkaMetricsGroup { } /** - * If message.handler.args is specified. A constructor that takes in a String as argument must exist. - */ + * If message.handler.args is specified. A constructor that takes in a String as argument must exist. + */ trait MirrorMakerMessageHandler { def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]] } diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/PartitionTopicInfo.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/PartitionTopicInfo.scala index 82275a84..7236e581 100644 --- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/PartitionTopicInfo.scala +++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/PartitionTopicInfo.scala @@ -40,13 +40,14 @@ class PartitionTopicInfo(override val topic: String, private val fetchSize: AtomicInteger, private val clientId: String) extends kafka.consumer.PartitionTopicInfo(topic = topic, - partitionId = partitionId, - chunkQueue = chunkQueue, - consumedOffset = consumedOffset, - fetchedOffset = fetchedOffset, - fetchSize = fetchSize, - clientId = clientId) with Logging { + partitionId = partitionId, + chunkQueue = chunkQueue, + consumedOffset = consumedOffset, + fetchedOffset = fetchedOffset, + fetchSize = fetchSize, + clientId = clientId) with Logging { + + val deleted: AtomicBoolean = new AtomicBoolean(false) - val deleted:AtomicBoolean = new AtomicBoolean(false) def getDeleted() = deleted.get() } diff --git a/uReplicator-Worker/src/test/kafka/mirrormaker/MirrorMakerWorkerTest.scala b/uReplicator-Worker/src/test/kafka/mirrormaker/MirrorMakerWorkerTest.scala index 4e11b52f..7e3a6186 100644 --- a/uReplicator-Worker/src/test/kafka/mirrormaker/MirrorMakerWorkerTest.scala +++ b/uReplicator-Worker/src/test/kafka/mirrormaker/MirrorMakerWorkerTest.scala @@ -15,17 +15,13 @@ */ package kafka.mirrormaker - import java.util.Properties -import kafka.api.IntegrationTestHarness import kafka.consumer.SimpleConsumer -import kafka.integration.KafkaServerTestHarness import kafka.producer.Producer -import kafka.security.auth.SimpleAclAuthorizer import kafka.serializer.StringEncoder import kafka.server.KafkaConfig -import kafka.utils.{StaticPartitioner, TestUtils, Logging} +import kafka.utils.Logging import org.junit.{After, Before} import org.scalatest.junit.JUnitSuite @@ -81,6 +77,3 @@ class MirrorMakerWorkerTest extends JUnitSuite with IntegrationTestHarness with override def generateConfigs() = Seq(brokerConfig1, brokerConfig2) } - - -