Skip to content

Commit

Permalink
Added configure to KafkaContainerExtension
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Jun 18, 2023
1 parent 347b43c commit a71bfe1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import org.apache.kafka.common.serialization.BytesDeserializer
import org.apache.kafka.common.serialization.BytesSerializer
import org.apache.kafka.common.utils.Bytes
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import java.util.Properties

class KafkaContainerExtension(
private val container: KafkaContainer,
) : AfterProjectListener,
MountableExtension<KafkaContainer, KafkaContainer> {

constructor() : this(KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")))

override suspend fun afterProject() {
if (container.isRunning) withContext(Dispatchers.IO) { container.stop() }
}
Expand All @@ -34,21 +31,22 @@ class KafkaContainerExtension(
}
}

fun KafkaContainer.producer(): KafkaProducer<Bytes, Bytes> {
fun KafkaContainer.producer(configure: Properties.() -> Unit): KafkaProducer<Bytes, Bytes> {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = BytesSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = BytesSerializer::class.java
props.configure()
return KafkaProducer<Bytes, Bytes>(props)
}


fun KafkaContainer.consumer(consumerGroupId: String? = null): KafkaConsumer<Bytes, Bytes> {
fun KafkaContainer.consumer(configure: Properties.() -> Unit): KafkaConsumer<Bytes, Bytes> {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroupId ?: ("kotest_consumer_" + System.currentTimeMillis())
props[ConsumerConfig.GROUP_ID_CONFIG] = "kotest_consumer_" + System.currentTimeMillis()
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = BytesDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = BytesDeserializer::class.java
props.configure()
return KafkaConsumer<Bytes, Bytes>(props)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import org.testcontainers.containers.GenericContainer
/**
* A Kotest [MountableExtension] for [GenericContainer]s that are started the first time they are
* installed in a spec, and then shared throughout the same test suite. The container is shutdown
* after all the test suite has completed.
* after the test suite has completed.
*
* If no spec is executed that installs a particular container, then that container is never started.
* If no spec is executed that installs a particular container,
* then that container is never started.
*
* @param beforeStart a callback that is invoked only once, just before the container is started.
* If the container is never started, this callback will not be invoked.
* @param afterStart a callback that is invoked only once, just after the container is started.
* If the container is never started, this callback will not be invoked.
* @param beforeShutdown a callback that is invoked only once, just before the containuer is stopped.
* If the container is never started, this callback will not be invoked.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import org.testcontainers.containers.KafkaContainer
import java.util.Properties
import java.util.UUID

@Deprecated("Use the kafka module")
fun KafkaContainer.createStringStringProducer(
configure: Properties.() -> Unit = {},
): KafkaProducer<String, String> {
return createProducer(StringSerializer(), StringSerializer(), configure)
}

@Deprecated("Use the kafka module")
fun <K, V> KafkaContainer.createProducer(
kserializer: Serializer<K>,
vserializer: Serializer<V>,
Expand All @@ -29,12 +31,14 @@ fun <K, V> KafkaContainer.createProducer(
return KafkaProducer(props, kserializer, vserializer)
}

@Deprecated("Use the kafka module")
fun KafkaContainer.createStringStringConsumer(
configure: Properties.() -> Unit = {},
): KafkaConsumer<String, String> {
return createConsumer(StringDeserializer(), StringDeserializer(), configure)
}

@Deprecated("Use the kafka module")
fun <K, V> KafkaContainer.createConsumer(
kserializer: Deserializer<K>,
vserializer: Deserializer<V>,
Expand All @@ -47,6 +51,7 @@ fun <K, V> KafkaContainer.createConsumer(
return KafkaConsumer(props, kserializer, vserializer)
}

@Deprecated("Use the kafka module")
fun KafkaContainer.createAdminClient(configure: Properties.() -> Unit = {}): AdminClient {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
Expand Down

0 comments on commit a71bfe1

Please sign in to comment.