From a71bfe173b7a223413dd3205de9bc41c3fdea35c Mon Sep 17 00:00:00 2001 From: sksamuel Date: Sun, 18 Jun 2023 01:49:41 -0500 Subject: [PATCH] Added configure to KafkaContainerExtension --- .../testcontainers/kafka/KafkaContainerExtension.kt | 12 +++++------- .../extensions/testcontainers/ContainerExtension.kt | 7 +++++-- .../kotest/extensions/testcontainers/kafka/kafka.kt | 5 +++++ 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/kotest-extensions-testcontainers-kafka/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/KafkaContainerExtension.kt b/kotest-extensions-testcontainers-kafka/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/KafkaContainerExtension.kt index bd8d6d9..17dc7d4 100644 --- a/kotest-extensions-testcontainers-kafka/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/KafkaContainerExtension.kt +++ b/kotest-extensions-testcontainers-kafka/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/KafkaContainerExtension.kt @@ -13,7 +13,6 @@ import org.apache.kafka.common.serialization.BytesDeserializer import org.apache.kafka.common.serialization.BytesSerializer import org.apache.kafka.common.utils.Bytes import org.testcontainers.containers.KafkaContainer -import org.testcontainers.utility.DockerImageName import java.util.Properties class KafkaContainerExtension( @@ -21,8 +20,6 @@ class KafkaContainerExtension( ) : AfterProjectListener, MountableExtension { - constructor() : this(KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))) - override suspend fun afterProject() { if (container.isRunning) withContext(Dispatchers.IO) { container.stop() } } @@ -34,21 +31,22 @@ class KafkaContainerExtension( } } -fun KafkaContainer.producer(): KafkaProducer { +fun KafkaContainer.producer(configure: Properties.() -> Unit): KafkaProducer { val props = Properties() props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = BytesSerializer::class.java props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = BytesSerializer::class.java + props.configure() return KafkaProducer(props) } - -fun KafkaContainer.consumer(consumerGroupId: String? = null): KafkaConsumer { +fun KafkaContainer.consumer(configure: Properties.() -> Unit): KafkaConsumer { val props = Properties() props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroupId ?: ("kotest_consumer_" + System.currentTimeMillis()) + props[ConsumerConfig.GROUP_ID_CONFIG] = "kotest_consumer_" + System.currentTimeMillis() props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = BytesDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = BytesDeserializer::class.java + props.configure() return KafkaConsumer(props) } diff --git a/src/main/kotlin/io/kotest/extensions/testcontainers/ContainerExtension.kt b/src/main/kotlin/io/kotest/extensions/testcontainers/ContainerExtension.kt index 8ab8047..d0560f8 100644 --- a/src/main/kotlin/io/kotest/extensions/testcontainers/ContainerExtension.kt +++ b/src/main/kotlin/io/kotest/extensions/testcontainers/ContainerExtension.kt @@ -9,12 +9,15 @@ import org.testcontainers.containers.GenericContainer /** * A Kotest [MountableExtension] for [GenericContainer]s that are started the first time they are * installed in a spec, and then shared throughout the same test suite. The container is shutdown - * after all the test suite has completed. + * after the test suite has completed. * - * If no spec is executed that installs a particular container, then that container is never started. + * If no spec is executed that installs a particular container, + * then that container is never started. * * @param beforeStart a callback that is invoked only once, just before the container is started. + * If the container is never started, this callback will not be invoked. * @param afterStart a callback that is invoked only once, just after the container is started. + * If the container is never started, this callback will not be invoked. * @param beforeShutdown a callback that is invoked only once, just before the containuer is stopped. * If the container is never started, this callback will not be invoked. */ diff --git a/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/kafka.kt b/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/kafka.kt index 96900d1..d28449f 100644 --- a/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/kafka.kt +++ b/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/kafka.kt @@ -12,12 +12,14 @@ import org.testcontainers.containers.KafkaContainer import java.util.Properties import java.util.UUID +@Deprecated("Use the kafka module") fun KafkaContainer.createStringStringProducer( configure: Properties.() -> Unit = {}, ): KafkaProducer { return createProducer(StringSerializer(), StringSerializer(), configure) } +@Deprecated("Use the kafka module") fun KafkaContainer.createProducer( kserializer: Serializer, vserializer: Serializer, @@ -29,12 +31,14 @@ fun KafkaContainer.createProducer( return KafkaProducer(props, kserializer, vserializer) } +@Deprecated("Use the kafka module") fun KafkaContainer.createStringStringConsumer( configure: Properties.() -> Unit = {}, ): KafkaConsumer { return createConsumer(StringDeserializer(), StringDeserializer(), configure) } +@Deprecated("Use the kafka module") fun KafkaContainer.createConsumer( kserializer: Deserializer, vserializer: Deserializer, @@ -47,6 +51,7 @@ fun KafkaContainer.createConsumer( return KafkaConsumer(props, kserializer, vserializer) } +@Deprecated("Use the kafka module") fun KafkaContainer.createAdminClient(configure: Properties.() -> Unit = {}): AdminClient { val props = Properties() props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers