Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ The context then fails to initialize.
NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`.

Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime.
Starting with version 4.0, it also provides a method to delete topics.

* `createOrModifyTopics`
* `describeTopics`
* `deleteTopics` (since 4.0)

For more advanced features, you can use the `AdminClient` directly.
For more advanced administrative features, you can use the `AdminClient` directly.
The following example shows how to do so:

[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* @author Anders Swanson
* @author Omer Celik
* @author Choi Wang Gyu
* @author Go Beom Jun
*
* @since 1.3
*/
Expand Down Expand Up @@ -391,6 +392,35 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}
}

/**
* Delete topics from the Kafka cluster.
* @param topicNames the topic names to delete.
* @throws KafkaException if the operation fails.
* @since 4.0
*/
@Override
public void deleteTopics(String... topicNames) {
if (topicNames.length == 0) {
return;
}
try (Admin admin = createAdmin()) {
admin.deleteTopics(Arrays.asList(topicNames))
.all()
.get(this.operationTimeout, TimeUnit.SECONDS);
LOGGER.debug(() -> "Deleted topics: " + Arrays.toString(topicNames));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while deleting topics", ex);
}
catch (TimeoutException ex) {
throw new KafkaException("Timed out waiting to delete topics", ex);
}
catch (ExecutionException ex) {
throw new KafkaException("Failed to delete topics", ex.getCause());
}
}

/**
* Creates a new {@link Admin} client instance using the {@link AdminClient} class.
* @return the new {@link Admin} client instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public interface KafkaAdminOperations {
*/
Map<String, TopicDescription> describeTopics(String... topicNames);

/**
* Delete topics from the Kafka cluster.
* @param topicNames the topic names to delete.
* @since 4.0
*/
void deleteTopics(String... topicNames);

/**
* Return the cluster id, if available.
* @return the describe cluster id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,65 @@ void getAdminConfigWithApplicationNameAsClientId() {
assertThat(kafkaAdmin.getAdminConfig()).containsOnly(Map.entry(AdminClientConfig.CLIENT_ID_CONFIG, "appname-admin-0"));
}

@Test
void testDeleteTopics() {
NewTopic testTopic1 = TopicBuilder.name("test-delete-1")
.partitions(1)
.replicas(1)
.build();
NewTopic testTopic2 = TopicBuilder.name("test-delete-2")
.partitions(1)
.replicas(1)
.build();

this.admin.createOrModifyTopics(testTopic1, testTopic2);

await().atMost(10, TimeUnit.SECONDS).until(() -> {
try {
Map<String, TopicDescription> topics =
this.admin.describeTopics("test-delete-1", "test-delete-2");
return topics.size() == 2;
}
catch (Exception e) {
return false;
}
});

Map<String, TopicDescription> beforeDelete = this.admin.describeTopics("test-delete-1", "test-delete-2");
assertThat(beforeDelete).hasSize(2);
assertThat(beforeDelete).containsKeys("test-delete-1", "test-delete-2");

this.admin.deleteTopics("test-delete-1", "test-delete-2");

await().atMost(10, TimeUnit.SECONDS).until(() -> {
try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) {
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("test-delete-1", "test-delete-2"));
try {
result.allTopicNames().get(5, TimeUnit.SECONDS);
return false;
}
catch (ExecutionException ex) {
return ex.getCause() instanceof UnknownTopicOrPartitionException;
}
}
catch (InterruptedException | TimeoutException e) {
return false;
}
});
}

@Test
void testDeleteNonExistentTopic() {
assertThat(org.assertj.core.api.Assertions.catchThrowable(() ->
this.admin.deleteTopics("non-existent-topic-12345")
)).isInstanceOf(org.springframework.kafka.KafkaException.class);
}

@Test
void testDeleteTopicsWithEmptyArray() {
this.admin.deleteTopics();
}

@Configuration
public static class Config {

Expand Down