Skip to content

Commit

Permalink
Added string string producer / consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Jun 18, 2023
1 parent 949718f commit d1c43e2
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.BytesDeserializer
import org.apache.kafka.common.serialization.BytesSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.Bytes
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
Expand Down Expand Up @@ -43,6 +45,15 @@ fun KafkaContainer.producer(configure: Properties.() -> Unit = {}): KafkaProduce
return KafkaProducer<Bytes, Bytes>(props)
}

fun KafkaContainer.stringStringProducer(configure: Properties.() -> Unit = {}): KafkaProducer<String, String> {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props.configure()
return KafkaProducer<String, String>(props)
}

fun KafkaContainer.consumer(configure: Properties.() -> Unit = {}): KafkaConsumer<Bytes, Bytes> {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
Expand All @@ -53,3 +64,14 @@ fun KafkaContainer.consumer(configure: Properties.() -> Unit = {}): KafkaConsume
props.configure()
return KafkaConsumer<Bytes, Bytes>(props)
}

fun KafkaContainer.stringStringConsumer(configure: Properties.() -> Unit = {}): KafkaConsumer<String, String> {
val props = Properties()
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ConsumerConfig.GROUP_ID_CONFIG] = "kotest_consumer_" + System.currentTimeMillis()
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props.configure()
return KafkaConsumer<String, String>(props)
}

0 comments on commit d1c43e2

Please sign in to comment.