Skip to content

Commit

Permalink
Fix stop-all.sh and apply java google style (#31)
Browse files Browse the repository at this point in the history
* Apply java google style

* bump version

* update

* update

* fix stop-all.sh

* update stop-all.sh

* apply mvn license:format

* apply 120 wide column
  • Loading branch information
xhl1988 committed Jul 24, 2017
1 parent 0d9f23c commit 6a3ea45
Show file tree
Hide file tree
Showing 58 changed files with 333 additions and 415 deletions.
29 changes: 17 additions & 12 deletions bin/pkg/stop-all.sh
@@ -1,22 +1,27 @@
#!/bin/bash -e

echo "EXECUTING: stop controller"
PID=`pgrep -f "Dapp_name=uReplicator-Controller"`
kill -9 $PID
PID1=`pgrep -f "Dapp_name=uReplicator-Controller"` || true
if [ -n "$PID1" ]; then
kill -9 ${PID1}
fi

echo "EXECUTING: stop worker"
PID=`pgrep -f "Dapp_name=uReplicator-Worker"`
kill -9 $PID
PID2=`pgrep -f "Dapp_name=uReplicator-Worker"` || true
if [ -n "$PID2" ]; then
kill -9 ${PID2}
fi

echo "EXECUTING: stop producing"
PID=`pgrep -f "./bin/produce-data-to-kafka-topic-dummyTopic.sh"`
kill -9 $PID
PID3=`pgrep -f "./bin/produce-data-to-kafka-topic-dummyTopic.sh"` || true
if [ -n "$PID3" ]; then
kill -9 ${PID3}
fi

echo "EXECUTING: stop consuming"
PID=`pgrep -f "kafka.tools.ConsoleConsumer"`
for i in "${PID[@]}"
do
kill -9 $i
done
PID4=$(ps ax | grep -i 'kafka.tools.ConsoleConsumer' | grep java | grep -v grep | awk '{print $1}')
if [ -n "$PID4" ]; then
kill -9 ${PID4}
fi

bin/grid stop all
bin/grid stop all
1 change: 1 addition & 0 deletions bin/produce-data-to-kafka-topic-dummyTopic.sh
@@ -1,4 +1,5 @@
#!/bin/bash -e

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=$(dirname $DIR)
ZOOKEEPER=localhost:2181/cluster1
Expand Down
1 change: 1 addition & 0 deletions config/test-log4j.properties
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Expand Down
1 change: 1 addition & 0 deletions config/zookeeper.properties
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.uber.uReplicator</groupId>
<artifactId>uReplicator</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<packaging>pom</packaging>
<name>uReplicator</name>

Expand Down
14 changes: 6 additions & 8 deletions uReplicator-Controller/pom.xml
@@ -1,21 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.uber.uReplicator</groupId>
<artifactId>uReplicator-Controller</artifactId>
<packaging>jar</packaging>
<name>uReplicator-Controller</name>
<parent>
<artifactId>uReplicator</artifactId>
<groupId>com.uber.uReplicator</groupId>
<version>1.0.1</version>
<artifactId>uReplicator</artifactId>
<version>1.0.2</version>
</parent>
<description>
Helix kafka mirror maker is built for resolving high level consumers rebalancing pain point.
Helix provides a good interface to interact with zookeeper and manages/rebalances a stateful mapping for
instance to simple consumer mapping.
Kafka MirrorMaker with Helix is built for resolving high level consumers rebalancing pain point.
Helix provides a good interface to interact with zookeeper and manages/rebalances a stateful
mapping for instance to simple consumer mapping.
</description>

<properties>
Expand Down
Expand Up @@ -18,7 +18,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.configuration.PropertiesConfiguration;
Expand Down Expand Up @@ -321,8 +320,7 @@ public static Options constructControllerOptions() {
.addOption("autoRebalanceDelayInSeconds", true, "Auto Rebalance Delay in seconds")
.addOption("refreshTimeInSeconds", true, "Controller Refresh Time in seconds")
.addOption("initWaitTimeInSeconds", true, "Controller Init Delay in seconds")
.addOption("backUpToGit", true,
"Backup controller metadata to git (true) or local file (false)")
.addOption("backUpToGit", true, "Backup controller metadata to git (true) or local file (false)")
.addOption("remoteBackupRepo", true, "Remote Backup Repo to store cluster state")
.addOption("localGitRepoClonePath", true, "Clone location of the remote git backup repo")
.addOption("localBackupFilePath", true, "Local backup file location");
Expand Down
Expand Up @@ -15,9 +15,18 @@
*/
package com.uber.stream.kafka.mirrormaker.controller;

import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager;
import com.uber.stream.kafka.mirrormaker.controller.core.ClusterInfoBackupManager;
import com.uber.stream.kafka.mirrormaker.controller.core.FileBackUpHandler;
import com.uber.stream.kafka.mirrormaker.controller.core.GitBackUpHandler;
import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager;
import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;
import com.uber.stream.kafka.mirrormaker.controller.rest.ControllerRestApplication;
import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager;
import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand All @@ -29,19 +38,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.uber.stream.kafka.mirrormaker.controller.core.AutoTopicWhitelistingManager;
import com.uber.stream.kafka.mirrormaker.controller.core.ClusterInfoBackupManager;
import com.uber.stream.kafka.mirrormaker.controller.core.FileBackUpHandler;
import com.uber.stream.kafka.mirrormaker.controller.core.GitBackUpHandler;
import com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager;
import com.uber.stream.kafka.mirrormaker.controller.core.KafkaBrokerTopicObserver;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;
import com.uber.stream.kafka.mirrormaker.controller.rest.ControllerRestApplication;
import com.uber.stream.kafka.mirrormaker.controller.validation.SourceKafkaClusterValidationManager;
import com.uber.stream.kafka.mirrormaker.controller.validation.ValidationManager;

/**
* The main entry point for everything.
*
* @author xiangfu
*/
public class ControllerStarter {
Expand Down
Expand Up @@ -15,6 +15,12 @@
*/
package com.uber.stream.kafka.mirrormaker.controller.core;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;
import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -24,7 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.LiveInstanceChangeListener;
Expand All @@ -34,19 +39,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;
import com.uber.stream.kafka.mirrormaker.controller.utils.HelixUtils;

/**
* We only considering add or remove box(es), not considering the replacing.
* For replacing, we just need to bring up a new box and give the old instanceId no auto-balancing
* needed.
*/
public class AutoRebalanceLiveInstanceChangeListener implements LiveInstanceChangeListener {

private static final Logger LOGGER =
LoggerFactory.getLogger(AutoRebalanceLiveInstanceChangeListener.class);

Expand Down Expand Up @@ -91,7 +90,7 @@ public void onLiveInstanceChange(final List<LiveInstance> liveInstances,
@Override
public void run() {
try {
rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances());
rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances());
} catch (Exception e) {
LOGGER.error("Got exception during rebalance the whole cluster! ", e);
}
Expand Down Expand Up @@ -168,9 +167,8 @@ private static Set<InstanceTopicPartitionHolder> rescaleInstanceToTopicPartition
}
LOGGER.info("Trying to rescale cluster with new instances - " + Arrays.toString(
newInstances.toArray(new String[0])) + " and removed instances - " + Arrays.toString(
removedInstances.toArray(new String[0])));
TreeSet<InstanceTopicPartitionHolder> orderedSet =
new TreeSet<>(InstanceTopicPartitionHolder.getComparator());
removedInstances.toArray(new String[0])));
TreeSet<InstanceTopicPartitionHolder> orderedSet = new TreeSet<>(InstanceTopicPartitionHolder.getComparator());
Set<TopicPartition> tpiNeedsToBeAssigned = new HashSet<TopicPartition>();
tpiNeedsToBeAssigned.addAll(unassignedTopicPartitions);
for (String instanceName : instanceToTopicPartitionMap.keySet()) {
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/
package com.uber.stream.kafka.mirrormaker.controller.core;

import com.codahale.metrics.Counter;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -24,18 +26,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.uber.stream.kafka.mirrormaker.controller.reporter.HelixKafkaMirrorMakerMetricsReporter;

import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

/**
* AutoTopicWhitelistingManager will look at both source and destination Kafka brokers and pick
* the topics in the intersection to add to the whitelist.
Expand Down Expand Up @@ -86,26 +83,24 @@ public AutoTopicWhitelistingManager(KafkaBrokerTopicObserver srcKafkaTopicObserv
String patternToExcludeTopics,
int refreshTimeInSec) {
this(srcKafkaTopicObserver, destKafkaTopicObserver, helixMirrorMakerManager,
patternToExcludeTopics, refreshTimeInSec, 120);
patternToExcludeTopics, refreshTimeInSec, 120);
}

public AutoTopicWhitelistingManager(KafkaBrokerTopicObserver srcKafkaTopicObserver,
KafkaBrokerTopicObserver destKafkaTopicObserver,
HelixMirrorMakerManager helixMirrorMakerManager,
String patternToExcludeTopics,
int refreshTimeInSec,
int initWaitTimeInSec) {
KafkaBrokerTopicObserver destKafkaTopicObserver,
HelixMirrorMakerManager helixMirrorMakerManager,
String patternToExcludeTopics,
int refreshTimeInSec,
int initWaitTimeInSec) {
_srcKafkaTopicObserver = srcKafkaTopicObserver;
_destKafkaTopicObserver = destKafkaTopicObserver;
_helixMirrorMakerManager = helixMirrorMakerManager;
_patternToExcludeTopics = patternToExcludeTopics;
_refreshTimeInSec = refreshTimeInSec;
_initWaitTimeInSec = initWaitTimeInSec;
_zkClient = new ZkClient(_helixMirrorMakerManager.getHelixZkURL(), 30000, 30000,
ZKStringSerializer$.MODULE$);
_zkClient = new ZkClient(_helixMirrorMakerManager.getHelixZkURL(), 30000, 30000, ZKStringSerializer$.MODULE$);
_zkUtils = ZkUtils.apply(_zkClient, false);
_blacklistedTopicsZPath =
String.format("/%s/BLACKLISTED_TOPICS", _helixMirrorMakerManager.getHelixClusterName());
_blacklistedTopicsZPath = String.format("/%s/BLACKLISTED_TOPICS", _helixMirrorMakerManager.getHelixClusterName());
}

public void start() {
Expand Down Expand Up @@ -244,7 +239,7 @@ public void addIntoBlacklist(String topic) {
}

public void removeFromBlacklist(String topic) {
_zkUtils.deletePath( _blacklistedTopicsZPath + "/" + topic);
_zkUtils.deletePath(_blacklistedTopicsZPath + "/" + topic);
LOGGER.info("topic={} is removed from blacklist on zk", topic);
}

Expand Down
Expand Up @@ -15,6 +15,9 @@
*/
package com.uber.stream.kafka.mirrormaker.controller.core;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.uber.stream.kafka.mirrormaker.controller.ControllerConf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -28,17 +31,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.uber.stream.kafka.mirrormaker.controller.ControllerConf;

/**
* This manager schedules a periodic backup task once every 24 hrs to take the
* backup of the mirror maker controller cluster state and dump the ideal state, parition assignment
* to two different files either to a remote git repo or a local backup file based on the config
*
* @author naveencherukuri
*
* @author naveencherukuri
*/
public class ClusterInfoBackupManager {

Expand All @@ -54,7 +52,8 @@ public class ClusterInfoBackupManager {
private final ControllerConf _config;
private String envInfo = "default";

public ClusterInfoBackupManager(HelixMirrorMakerManager helixMirrorMakerManager, BackUpHandler handler,
public ClusterInfoBackupManager(HelixMirrorMakerManager helixMirrorMakerManager,
BackUpHandler handler,
ControllerConf config) {
_helixMirrorMakerManager = helixMirrorMakerManager;
_handler = handler;
Expand Down Expand Up @@ -82,7 +81,6 @@ public synchronized void dumpState() throws Exception {
return;
}


LOGGER.info("Backing up the CurrentState and the IdealState!");
StringBuilder idealState = new StringBuilder();
StringBuilder partitionAssignment = new StringBuilder();
Expand All @@ -92,7 +90,6 @@ public synchronized void dumpState() throws Exception {
return;
}


JSONArray resultList = new JSONArray();

for (String topicName : topicLists) {
Expand All @@ -103,10 +100,8 @@ public synchronized void dumpState() throws Exception {
resultList.add(resultJson);
}


idealState.append(new StringRepresentation(resultList.toJSONString()));


resultList = new JSONArray();

for (String topicName : topicLists) {
Expand Down Expand Up @@ -167,7 +162,6 @@ public synchronized void dumpState() throws Exception {
resultJson.put("serverToPartitionMapping", serverToPartitionMappingJson);
resultJson.put("serverToNumPartitionsMapping", serverToNumPartitionsMappingJson);
resultList.add(resultJson);

}

partitionAssignment.append(new StringRepresentation(resultList.toJSONString()));
Expand Down
Expand Up @@ -19,7 +19,6 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;

import org.apache.commons.io.FileUtils;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
Expand Down Expand Up @@ -108,8 +107,9 @@ public void writeToFile(String fileName, String data) throws Exception {
} finally {
output.close();
git.close();
if (result != null)
if (result != null) {
result.getRepository().close();
}
backupRepo.close();
}
}
Expand Down

0 comments on commit 6a3ea45

Please sign in to comment.