diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 875380142..acaff18ed 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -14,6 +14,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,9 @@ public Properties topicConfigs(String topic) { @Override public Map allTopicConfigs() { - Map topicConfigs = null; + + // Request a map of futures for the config of each topic on the Kafka cluster + Map> topicConfigs = null; try { LOG.debug("Requesting configurations for all topics"); topicConfigs = _adminClient @@ -79,9 +82,8 @@ public Map allTopicConfigs() { .thenApply( topicNameSet -> _adminClient.describeConfigs( topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) - ).all() + ).values() ) - .get() .get(); } catch (InterruptedException | ExecutionException e) { LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); @@ -89,14 +91,28 @@ public Map allTopicConfigs() { Map propsMap = new HashMap<>(); if (topicConfigs != null) { - LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size()); - for (Map.Entry entry : topicConfigs.entrySet()) { - propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue())); + + // Set a method to run when each topic config future completes which either logs any error or adds the config to the properties map + for (Map.Entry> entry : topicConfigs.entrySet()) { + + entry.getValue().whenComplete((config, error) -> { + if (error != null) { + LOG.warn("Topic configurations for topic '{}' on the cluster could not be retrieved due to: {}", entry.getKey(), error.getMessage()); + } else { + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); + } + }); + + } + + //Block on all the config futures completing + try { + KafkaFuture.allOf(topicConfigs.values().toArray(new KafkaFuture[0])).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); } - LOG.debug("Topic Config conversion complete"); - } else { - LOG.error("Topic configurations for all topics on the cluster could not be retrieved"); } + return propsMap; }