-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
Milestone
Description
Expected Behavior
KafkaAdmin should provide a deleteTopics() method to programmatically delete topics, completing the topic lifecycle management API alongside the existing createOrModifyTopics() method.
@Configuration
public class KafkaConfig {
@Autowired
private KafkaAdmin kafkaAdmin;
public void manageTopicLifecycle() {
// Create topics
kafkaAdmin.createOrModifyTopics(new NewTopic("my-topic", 10, (short) 3));
// Delete topics - Expected to work like this
kafkaAdmin.deleteTopics("my-topic");
}
}
Current Behavior
KafkaAdmin only supports topic creation and modification through createOrModifyTopics() and topic inspection via describeTopics(). To delete topics programmatically, users must directly instantiate and manage AdminClient:
// Current workaround - manual AdminClient management required
try (Admin admin = AdminClient.create(configs)) {
admin.deleteTopics(Arrays.asList("my-topic")).all().get();
} catch (Exception e) {
// Handle exceptions
}
This creates an inconsistency where the Spring-managed KafkaAdmin bean handles creation but not deletion.
Context
This gap affects several common use cases:
Test cleanup with @EmbeddedKafka:
@SpringBootTest
@EmbeddedKafka(topics = {"test-input", "test-output"})
class MyIntegrationTest {
@Autowired
private KafkaAdmin kafkaAdmin;
@AfterEach
void cleanup() {
// Currently need to manually create AdminClient
// Would prefer: kafkaAdmin.deleteTopics("test-input", "test-output");
}
}
Topic recreation with new configuration:
// Want to change partition count
kafkaAdmin.deleteTopics("my-topic"); // Not available
kafkaAdmin.createOrModifyTopics(new NewTopic("my-topic", 20, (short) 3));
Multi-tenant applications where tenant-specific topics need cleanup when a tenant is removed.