Skip to content

Commit

Permalink
GH-1074: Use AdminClient to check topic presence
Browse files Browse the repository at this point in the history
Resolves #1074

Use the admin client instead of a `Consumer`.
  • Loading branch information
garyrussell authored and artembilan committed Apr 24, 2019
1 parent f27a3ae commit f20f0d6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
Expand Up @@ -16,16 +16,20 @@

package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -280,31 +284,47 @@ public final void start() {

protected void checkTopics() {
if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
try (Consumer<K, V> consumer =
this.consumerFactory.createConsumer(this.containerProperties.getGroupId(),
this.containerProperties.getClientId(), null,
this.containerProperties.getConsumerProperties())) {
if (consumer != null) {
Map<String, Object> configs = this.consumerFactory.getConfigurationProperties()
.entrySet()
.stream()
.filter(entry -> AdminClientConfig.configNames().contains(entry.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
List<String> missing = null;
try (AdminClient client = AdminClient.create(configs)) {
if (client != null) {
String[] topics = this.containerProperties.getTopics();
if (topics == null) {
topics = Arrays.stream(this.containerProperties.getTopicPartitions())
.map(TopicPartitionInitialOffset::topic)
.toArray(String[]::new);
}
List<String> missing = new ArrayList<>();
for (String topic : topics) {
if (consumer.partitionsFor(topic) == null) {
missing.add(topic);
}
}
if (missing.size() > 0) {
throw new IllegalStateException(
"Topic(s) " + missing.toString()
+ " is/are not present and missingTopicsFatal is true");
}
DescribeTopicsResult result = client.describeTopics(Arrays.asList(topics));
missing = result.values()
.entrySet()
.stream()
.filter(entry -> {
try {
entry.getValue().get(30, TimeUnit.SECONDS);
return false;
}
catch (@SuppressWarnings("unused") Exception e) {
return true;
}
})
.map(Entry::getKey)
.collect(Collectors.toList());
}
}
catch (Exception e) {
this.logger.error("Failed to check topic existence", e);
}
if (missing != null && missing.size() > 0) {
throw new IllegalStateException(
"Topic(s) " + missing.toString()
+ " is/are not present and missingTopicsFatal is true");
}
}

}

public void checkGroupId() {
Expand Down
Expand Up @@ -152,7 +152,7 @@ public class EnableKafkaIntegrationTests {
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
"annotated34", "annotated35", "annotated36", "annotated37");
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart");

private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();

Expand Down
Expand Up @@ -1876,6 +1876,7 @@ public void testBadListenerType() {
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, Foo1> badContainer =
new KafkaMessageListenerContainer<>(cf, containerProps);
assertThatIllegalStateException().isThrownBy(() -> badContainer.start())
Expand All @@ -1897,6 +1898,7 @@ public void testBadAckMode() {
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "true", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setMissingTopicsFatal(false);
containerProps.setAckMode(AckMode.MANUAL);
KafkaMessageListenerContainer<Integer, Foo1> badContainer =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand All @@ -1912,6 +1914,7 @@ public void testBadErrorHandler() {
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, Foo1> badContainer =
new KafkaMessageListenerContainer<>(cf, containerProps);
badContainer.setBatchErrorHandler(new BatchLoggingErrorHandler());
Expand All @@ -1927,6 +1930,7 @@ public void testBadBatchErrorHandler() {
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, Foo1> badContainer =
new KafkaMessageListenerContainer<>(cf, containerProps);
badContainer.setErrorHandler(new LoggingErrorHandler());
Expand Down

0 comments on commit f20f0d6

Please sign in to comment.