Skip to content

Commit

Permalink
added removal of topics for reuse true
Browse files Browse the repository at this point in the history
  • Loading branch information
lpandzic committed Aug 9, 2023
1 parent bf136e4 commit ec87ca8
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,38 @@ public void initialize(ConfigurableApplicationContext applicationContext) {
var url = replaceHostAndPortPlaceholders(bootstrapServers, container, KafkaContainerWrapper.KAFKA_PORT);

Optional.ofNullable(environment.getProperty("testcontainers.kafka.topics", String[].class))
.ifPresent(topics -> createTestKafkaTopics(url, topics));
.ifPresent(topics -> createTestKafkaTopics(container, url, topics));
var values = TestPropertyValues.of(
"spring.kafka.bootstrap-servers=" + url);
values.applyTo(applicationContext);

registerContainerAsBean(applicationContext);
}

private static void createTestKafkaTopics(String bootstrapServers, String[] topics) {
private static void createTestKafkaTopics(KafkaContainerWrapper container, String bootstrapServers, String[] topics) {

try (var client = AdminClient.create(Collections.singletonMap("bootstrap.servers", bootstrapServers))) {
if(container.isShouldBeReused()) {
deleteTopics(client, topics);
}
createTopics(client, topics);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void deleteTopics(AdminClient client, String[] topics) throws ExecutionException,
InterruptedException, TimeoutException {
var existingTopics = client.listTopics().names().get(60, TimeUnit.SECONDS);
var deleteTopics = Stream.of(topics)
.map(topic -> topic.split(":"))
.filter(topic -> !existingTopics.contains(topic[0]))
.map(topicParts -> topicParts[0])
.toList();

client.deleteTopics(deleteTopics);
}

private static void createTopics(AdminClient client, String[] topics) throws InterruptedException,
ExecutionException, TimeoutException {
var existingTopics = client.listTopics().names().get(60, TimeUnit.SECONDS);
Expand All @@ -60,7 +75,7 @@ private static void createTopics(AdminClient client, String[] topics) throws Int
.filter(topic -> !existingTopics.contains(topic[0]))
.map(topicParts -> new NewTopic(topicParts[0], parseInt(topicParts[1]),
parseShort(topicParts[2])))
.collect(Collectors.toList());
.toList();

client.createTopics(newTopics)
.all()
Expand Down

0 comments on commit ec87ca8

Please sign in to comment.