Skip to content

Commit

Permalink
Switch from using .all() to .values() for alltopics config fetching i…
Browse files Browse the repository at this point in the history
…n the KafkaAdminTopicConfigProvider

Signed-off-by: Thomas Cooper <code@tomcooper.dev>
  • Loading branch information
tomncooper committed Jun 3, 2021
1 parent 5ca0999 commit 858b2a9
Showing 1 changed file with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,7 +71,9 @@ public Properties topicConfigs(String topic) {

@Override
public Map<String, Properties> allTopicConfigs() {
Map<ConfigResource, Config> topicConfigs = null;

// Request a map of futures for the config of each topic on the Kafka cluster
Map<ConfigResource, KafkaFuture<Config>> topicConfigs = null;
try {
LOG.debug("Requesting configurations for all topics");
topicConfigs = _adminClient
Expand All @@ -79,24 +82,37 @@ public Map<String, Properties> allTopicConfigs() {
.thenApply(
topicNameSet -> _adminClient.describeConfigs(
topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).all()
).values()
)
.get()
.get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Config check for all topics failed due to failure to describe their configs.", e);
}

Map<String, Properties> propsMap = new HashMap<>();
if (topicConfigs != null) {
LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size());
for (Map.Entry<ConfigResource, Config> entry : topicConfigs.entrySet()) {
propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue()));

// Set a method to run when each topic config future completes which either logs any error or adds the config to the properties map
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : topicConfigs.entrySet()) {

entry.getValue().whenComplete((config, error) -> {
if (error != null) {
LOG.warn("Topic configurations for topic '{}' on the cluster could not be retrieved due to: {}", entry.getKey(), error.getMessage());
} else {
propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config));
}
});

}

//Block on all the config futures completing
try {
KafkaFuture.allOf(topicConfigs.values().toArray(new KafkaFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Config check for all topics failed due to failure to describe their configs.", e);
}
LOG.debug("Topic Config conversion complete");
} else {
LOG.error("Topic configurations for all topics on the cluster could not be retrieved");
}

return propsMap;
}

Expand Down

0 comments on commit 858b2a9

Please sign in to comment.