Skip to content

Commit

Permalink
Add secure zookeeper support (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldhev authored and efeg committed Jun 21, 2019
1 parent a33298f commit 46135ff
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 11 deletions.
23 changes: 23 additions & 0 deletions config/cruise_control_jaas.conf_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//Rename this file to cruise_control_jaas.conf when using secured zookeepers
//For detailed instructions, see /docs/wiki/User Guide/Secure-zookeeper-configuration.md

//Enter appropriate Client entry for secured zookeeper client connections
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/to/zookeeper_client.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper_client@<REALM>";
};

//Enter appropriate KafkaClient entry if using the SASL protocol, remove if not
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/to/kafka_client.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka_client@<REALM>";
};
3 changes: 3 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ num.proposal.precompute.threads=1
# The zookeeper connect of the Kafka cluster
zookeeper.connect=localhost:2181/

# If true, appropriate zookeeper Client { .. } entry required in jaas file located at $base_dir/config/cruise_control_jaas.conf
zookeeper.security.enabled=false

# The max number of partitions to move in/out on a given broker at a given time.
num.concurrent.partition.movements.per.broker=10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
public class KafkaCruiseControlUtils {
public static final int ZK_SESSION_TIMEOUT = 30000;
public static final int ZK_CONNECTION_TIMEOUT = 30000;
public static final boolean IS_ZK_SECURITY_ENABLED = false;
public static final String DATE_FORMAT = "YYYY-MM-dd_HH:mm:ss z";
public static final String DATE_FORMAT2 = "dd/MM/yyyy HH:mm:ss";
public static final String TIME_ZONE = "UTC";
Expand Down Expand Up @@ -93,8 +92,8 @@ public static String getRequiredConfig(Map<String, ?> configs, String configName
return value;
}

public static ZkUtils createZkUtils(String zkConnect) {
return ZkUtils.apply(zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, IS_ZK_SECURITY_ENABLED);
public static ZkUtils createZkUtils(String zkConnect, boolean zkSecurityEnabled) {
return ZkUtils.apply(zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, zkSecurityEnabled);
}

public static void closeZkUtilsWithTimeout(ZkUtils zkUtils, long timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ public class KafkaCruiseControlConfig extends AbstractConfig {
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC = "The zookeeper path used by the Kafka cluster.";

/**
* <code>zookeeper.security.enabled</code>
*/
public static final String ZOOKEEPER_SECURITY_ENABLED_CONFIG = "zookeeper.security.enabled";
private static final String ZOOKEEPER_SECURITY_ENABLED_DOC = "Specify if zookeeper is secured, true or false";

/**
* <code>num.concurrent.partition.movements.per.broker</code>
*/
Expand Down Expand Up @@ -1241,6 +1247,11 @@ public class KafkaCruiseControlConfig extends AbstractConfig {
ConfigDef.Importance.LOW,
NUM_PROPOSAL_PRECOMPUTE_THREADS_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ZOOKEEPER_CONNECT_DOC)
.define(ZOOKEEPER_SECURITY_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.HIGH,
ZOOKEEPER_SECURITY_ENABLED_DOC)
.define(NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG,
ConfigDef.Type.INT,
5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public BrokerFailureDetector(KafkaCruiseControlConfig config,
KafkaCruiseControl kafkaCruiseControl,
List<String> selfHealingGoals) {
String zkUrl = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
boolean zkSecurityEnabled = config.getBoolean(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
ZkConnection zkConnection = new ZkConnection(zkUrl, ZK_SESSION_TIMEOUT);
_zkClient = new ZkClient(zkConnection, ZK_CONNECTION_TIMEOUT, new ZkStringSerializer());
// Do not support secure ZK at this point.
_zkUtils = new ZkUtils(_zkClient, zkConnection, false);
_zkUtils = new ZkUtils(_zkClient, zkConnection, zkSecurityEnabled);
_failedBrokers = new HashMap<>();
_failedBrokersZkPath = config.getString(KafkaCruiseControlConfig.FAILED_BROKERS_ZK_PATH_CONFIG);
_loadMonitor = loadMonitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public Executor(KafkaCruiseControlConfig config,

_time = time;
String zkConnect = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
_zkUtils = KafkaCruiseControlUtils.createZkUtils(zkConnect);
boolean zkSecurityEnabled = config.getBoolean(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
_zkUtils = KafkaCruiseControlUtils.createZkUtils(zkConnect, zkSecurityEnabled);
_executionTaskManager =
new ExecutionTaskManager(config.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG),
config.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ protected KafkaConsumer<byte[], byte[]> createConsumer(Map<String, ?> config) {

private void ensureTopicsCreated(Map<String, ?> config) {
String zkConnect = (String) config.get(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zkConnect);
boolean zkSecurityEnabled = Boolean.parseBoolean((String) config.get(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG));
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zkConnect, zkSecurityEnabled);
try {
Map<String, List<PartitionInfo>> topics = _consumers.get(0).listTopics();
long partitionSampleWindowMs = Long.parseLong((String) config.get(KafkaCruiseControlConfig.PARTITION_METRICS_WINDOW_MS_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private BrokerFailureDetector createBrokerFailureDetector(Queue<Anomaly> anomali
EasyMock.replay(mockLoadMonitor);
Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
props.setProperty(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper().getConnectionString());
props.setProperty(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG, "false");
KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props);
return new BrokerFailureDetector(kafkaCruiseControlConfig,
mockLoadMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void tearDown() {

@Test
public void testBasicBalanceMovement() throws InterruptedException {
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString());
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString(), false);
Collection<ExecutionProposal> proposals = getBasicProposals();

try {
Expand All @@ -93,7 +93,7 @@ public void testBasicBalanceMovement() throws InterruptedException {

@Test
public void testMoveNonExistingPartition() throws InterruptedException {
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString());
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString(), false);

Map<String, TopicDescription> topicDescriptions = createTopics();
int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id();
Expand Down Expand Up @@ -127,7 +127,7 @@ public void testMoveNonExistingPartition() throws InterruptedException {

@Test
public void testBrokerDiesWhenMovePartitions() throws Exception {
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString());
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString(), false);

Map<String, TopicDescription> topicDescriptions = createTopics();
int initialLeader0 = topicDescriptions.get(TOPIC0).partitions().get(0).leader().id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class LoadMonitorTaskRunnerTest extends AbstractKafkaIntegrationTestHarne
@Before
public void setUp() {
super.setUp();
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString());
ZkUtils zkUtils = KafkaCruiseControlUtils.createZkUtils(zookeeper().getConnectionString(), false);
for (int i = 0; i < NUM_TOPICS; i++) {
AdminUtils.createTopic(zkUtils, "topic-" + i, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Safe$.MODULE$);
}
Expand Down
15 changes: 15 additions & 0 deletions docs/wiki/User Guide/Secure-zookeeper-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#Secure zookeeper configuration

Cruise Control uses zookeeper clients for its operation. If the zookeeper is secured, the following
steps are to be taken care of so that the zookeeper client authenticates successfully.

* Set the config _zookeeper.security.enabled_ in "$base_dir/config/cruisecontrol.properties" to _true_.
* Rename the file "$base_dir/config/cruise_control_jaas.conf_template" to "$base_dir/config/cruise_control_jaas.conf".
* In the file cruise_control_jaas.conf, enter the appropriate _Client{ .. }_ entry for the zookeeper client.

Cruise Control will export the "$base_dir/config/cruise_control_jaas.conf" configuration file only if it is present.
Please ensure that the jaas file contains the correct entry for successful authentication. The authentication failure/success
messages will appear in the Cruise Control logs on startup.

NOTE: If using the SASL protocol, you could enter the _KafkaClient{ .. }_ entry here as this configuration file will be
exported.
6 changes: 6 additions & 0 deletions kafka-cruise-control-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi

#Add jaas file to KAFKA_OPTS if present
if [ -f $base_dir/config/cruise_control_jaas.conf ]
then
KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/config/cruise_control_jaas.conf $KAFKA_OPTS"
fi

DAEMON_NAME="kafka-cruise-control"
CONSOLE_OUTPUT_FILE="${LOG_DIR}"/"${DAEMON_NAME}".out
while [ $# -gt 0 ]; do
Expand Down

0 comments on commit 46135ff

Please sign in to comment.