diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/configuring-topics.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/configuring-topics.adoc index dfe5b58d31..ab6923cb95 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/configuring-topics.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/configuring-topics.adoc @@ -73,11 +73,13 @@ The context then fails to initialize. NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`. Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime. +Starting with version 4.0, it also provides a method to delete topics. * `createOrModifyTopics` * `describeTopics` +* `deleteTopics` (since 4.0) -For more advanced features, you can use the `AdminClient` directly. +For more advanced administrative features, you can use the `AdminClient` directly. The following example shows how to do so: [source, java] diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 2ea11bf568..6f96a0c97e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -81,6 +81,7 @@ * @author Anders Swanson * @author Omer Celik * @author Choi Wang Gyu + * @author Go Beom Jun * * @since 1.3 */ @@ -391,6 +392,35 @@ public Map describeTopics(String... topicNames) { } } + /** + * Delete topics from the Kafka cluster. + * @param topicNames the topic names to delete. + * @throws KafkaException if the operation fails. + * @since 4.0 + */ + @Override + public void deleteTopics(String... topicNames) { + if (topicNames.length == 0) { + return; + } + try (Admin admin = createAdmin()) { + admin.deleteTopics(Arrays.asList(topicNames)) + .all() + .get(this.operationTimeout, TimeUnit.SECONDS); + LOGGER.debug(() -> "Deleted topics: " + Arrays.toString(topicNames)); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new KafkaException("Interrupted while deleting topics", ex); + } + catch (TimeoutException ex) { + throw new KafkaException("Timed out waiting to delete topics", ex); + } + catch (ExecutionException ex) { + throw new KafkaException("Failed to delete topics", ex.getCause()); + } + } + /** * Creates a new {@link Admin} client instance using the {@link AdminClient} class. * @return the new {@link Admin} client instance. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java index ca6e3d4871..aae7e3d490 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java @@ -50,6 +50,13 @@ public interface KafkaAdminOperations { */ Map describeTopics(String... topicNames); + /** + * Delete topics from the Kafka cluster. + * @param topicNames the topic names to delete. + * @since 4.0 + */ + void deleteTopics(String... topicNames); + /** * Return the cluster id, if available. * @return the describe cluster id. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index d154b270f2..686d15392e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -339,6 +339,65 @@ void getAdminConfigWithApplicationNameAsClientId() { assertThat(kafkaAdmin.getAdminConfig()).containsOnly(Map.entry(AdminClientConfig.CLIENT_ID_CONFIG, "appname-admin-0")); } + @Test + void testDeleteTopics() { + NewTopic testTopic1 = TopicBuilder.name("test-delete-1") + .partitions(1) + .replicas(1) + .build(); + NewTopic testTopic2 = TopicBuilder.name("test-delete-2") + .partitions(1) + .replicas(1) + .build(); + + this.admin.createOrModifyTopics(testTopic1, testTopic2); + + await().atMost(10, TimeUnit.SECONDS).until(() -> { + try { + Map topics = + this.admin.describeTopics("test-delete-1", "test-delete-2"); + return topics.size() == 2; + } + catch (Exception e) { + return false; + } + }); + + Map beforeDelete = this.admin.describeTopics("test-delete-1", "test-delete-2"); + assertThat(beforeDelete).hasSize(2); + assertThat(beforeDelete).containsKeys("test-delete-1", "test-delete-2"); + + this.admin.deleteTopics("test-delete-1", "test-delete-2"); + + await().atMost(10, TimeUnit.SECONDS).until(() -> { + try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) { + DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("test-delete-1", "test-delete-2")); + try { + result.allTopicNames().get(5, TimeUnit.SECONDS); + return false; + } + catch (ExecutionException ex) { + return ex.getCause() instanceof UnknownTopicOrPartitionException; + } + } + catch (InterruptedException | TimeoutException e) { + return false; + } + }); + } + + @Test + void testDeleteNonExistentTopic() { + assertThat(org.assertj.core.api.Assertions.catchThrowable(() -> + this.admin.deleteTopics("non-existent-topic-12345") + )).isInstanceOf(org.springframework.kafka.KafkaException.class); + } + + @Test + void testDeleteTopicsWithEmptyArray() { + this.admin.deleteTopics(); + } + @Configuration public static class Config {