From 516d1ed222a427154de8308850ad0b4b2271d03c Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Fri, 1 Jul 2022 22:52:22 +0200 Subject: [PATCH 1/5] Add commitBatchWithin --- build.gradle.kts | 5 +- guide/example/example-admin-01.kt | 2 +- guide/example/example-consumer-01.kt | 2 +- guide/example/example-producer-01.kt | 2 +- guide/example/example-producer-02.kt | 2 +- guide/example/example-readme-01.kt | 2 +- guide/src/main/kotlin/KafkaContainer.kt | 32 +-- guide/src/main/kotlin/main.kt | 72 ++++++ guide/src/test/resources/log4j.properties | 3 - knit.code.include | 2 +- libs.versions.toml | 9 +- .../github/nomisRev/kafka/Admin.kt | 28 ++- .../io/github/nomisRev/kafka/Closeable.kt | 8 + .../github/nomisRev/kafka/Consumer.kt | 231 +++++++++++++----- .../github/nomisRev/kafka/KafkaFuture.kt | 2 +- .../github/nomisRev/kafka/Producer.kt | 38 +-- .../github/nomisRev/kafka/Serializers.kt | 2 +- .../kafka/internal/FlowTimeChunked.kt | 85 +++++++ .../io/github/nomisrev/kafka/ChunkSpec.kt | 80 ++++++ 19 files changed, 489 insertions(+), 118 deletions(-) create mode 100644 guide/src/main/kotlin/main.kt delete mode 100644 guide/src/test/resources/log4j.properties rename src/main/kotlin/{com => io}/github/nomisRev/kafka/Admin.kt (72%) create mode 100644 src/main/kotlin/io/github/nomisRev/kafka/Closeable.kt rename src/main/kotlin/{com => io}/github/nomisRev/kafka/Consumer.kt (53%) rename src/main/kotlin/{com => io}/github/nomisRev/kafka/KafkaFuture.kt (97%) rename src/main/kotlin/{com => io}/github/nomisRev/kafka/Producer.kt (84%) rename src/main/kotlin/{com => io}/github/nomisRev/kafka/Serializers.kt (98%) create mode 100644 src/main/kotlin/io/github/nomisRev/kafka/internal/FlowTimeChunked.kt create mode 100644 src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt diff --git a/build.gradle.kts b/build.gradle.kts index 0a6a9786..2d2b0089 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -38,10 +38,7 @@ dependencies { api(libs.kafka.streams) api(libs.kafka.connect) - testImplementation(libs.kotest.runner.junit5) - testImplementation(libs.kotest.property) - testImplementation(libs.kotest.framework) - testImplementation(libs.kotest.assertions) + testImplementation(libs.bundles.kotest) } configure { diff --git a/guide/example/example-admin-01.kt b/guide/example/example-admin-01.kt index 37abb1b6..767ff56a 100644 --- a/guide/example/example-admin-01.kt +++ b/guide/example/example-admin-01.kt @@ -1,6 +1,6 @@ package example.exampleAdmin01 -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking diff --git a/guide/example/example-consumer-01.kt b/guide/example/example-consumer-01.kt index fd0149c4..6823ba0e 100644 --- a/guide/example/example-consumer-01.kt +++ b/guide/example/example-consumer-01.kt @@ -1,6 +1,6 @@ package example.exampleConsumer01 -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking diff --git a/guide/example/example-producer-01.kt b/guide/example/example-producer-01.kt index abf08646..26a0460f 100644 --- a/guide/example/example-producer-01.kt +++ b/guide/example/example-producer-01.kt @@ -1,6 +1,6 @@ package example.exampleProducer01 -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking diff --git a/guide/example/example-producer-02.kt b/guide/example/example-producer-02.kt index be8bad39..5d13615b 100644 --- a/guide/example/example-producer-02.kt +++ b/guide/example/example-producer-02.kt @@ -1,6 +1,6 @@ package example.exampleProducer02 -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking diff --git a/guide/example/example-readme-01.kt b/guide/example/example-readme-01.kt index b0915775..4be6a80d 100644 --- a/guide/example/example-readme-01.kt +++ b/guide/example/example-readme-01.kt @@ -1,6 +1,6 @@ package example.exampleReadme01 -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking diff --git a/guide/src/main/kotlin/KafkaContainer.kt b/guide/src/main/kotlin/KafkaContainer.kt index 9f5ba59a..31c4cca4 100644 --- a/guide/src/main/kotlin/KafkaContainer.kt +++ b/guide/src/main/kotlin/KafkaContainer.kt @@ -1,7 +1,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse -import com.github.nomisRev.kafka.Admin -import com.github.nomisRev.kafka.AdminSettings -import com.github.nomisRev.kafka.await +import io.github.nomisRev.kafka.Admin +import io.github.nomisRev.kafka.AdminSettings +import io.github.nomisRev.kafka.await import kotlinx.coroutines.runBlocking import org.testcontainers.containers.KafkaContainer import org.testcontainers.utility.DockerImageName @@ -33,7 +33,7 @@ class Kafka private constructor(imageName: DockerImageName) : KafkaContainer(ima companion object { val container: KafkaContainer by lazy { - Kafka(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + Kafka(DockerImageName.parse("niciqy/cp-kafka-arm64:7.0.1").asCompatibleSubstituteFor("confluentinc/cp-kafka")) .withReuse(true) .withNetwork(null) .withLabel("io.github.nomisrev.kafka", "fqn-testcontainers-reuse") @@ -41,16 +41,16 @@ class Kafka private constructor(imageName: DockerImageName) : KafkaContainer(ima } } - override fun containerIsStarted(containerInfo: InspectContainerResponse?, reused: Boolean) { - super.containerIsStarted(containerInfo, reused) - // If we're reusing the container, we want to reset the state of the container. We do this by - // deleting all topics. - // if (reused) - runBlocking { - Admin(AdminSettings(bootstrapServers)).use { admin -> - val names = admin.listTopics().listings().await() - admin.deleteTopics(names.map { it.name() }).all().await() - } - } - } + // override fun containerIsStarted(containerInfo: InspectContainerResponse?, reused: Boolean) { + // super.containerIsStarted(containerInfo, reused) + // // If we're reusing the container, we want to reset the state of the container. We do this by + // // deleting all topics. + // // if (reused) + // runBlocking { + // Admin(AdminSettings(bootstrapServers)).use { admin -> + // val names = admin.listTopics().listings().await() + // admin.deleteTopics(names.map { it.name() }).all().await() + // } + // } + // } } diff --git a/guide/src/main/kotlin/main.kt b/guide/src/main/kotlin/main.kt new file mode 100644 index 00000000..1b7864a5 --- /dev/null +++ b/guide/src/main/kotlin/main.kt @@ -0,0 +1,72 @@ +package io.github.nomisRev.kafka + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.IntegerDeserializer +import org.apache.kafka.common.serialization.IntegerSerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import java.util.UUID +import kotlin.time.Duration.Companion.milliseconds + +@JvmInline +value class Key(val index: Int) + +@JvmInline +value class Message(val content: String) + +fun main(): Unit = runBlocking(Dispatchers.Default) { + val topicName = "test-topic" + val msgCount = 10 + val kafka = Kafka.container + + Admin(AdminSettings(kafka.bootstrapServers)).use { client -> + client.createTopic(NewTopic(topicName, 1, 1)) + } + + coroutineScope { // Run produces and consumer in a single scope + launch(Dispatchers.IO) { // Send 20 messages, and then close the producer + val settings: ProducerSettings = ProducerSettings( + kafka.bootstrapServers, + IntegerSerializer().imap { key: Key -> key.index }, + StringSerializer().imap { msg: Message -> msg.content }, + Acks.All + ) + (1..msgCount) + .map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) } + .asFlow() + .produce(settings) + .collect(::println) + } + + launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer + val settings: ConsumerSettings = ConsumerSettings( + kafka.bootstrapServers, + IntegerDeserializer().map(::Key), + StringDeserializer().map(::Message), + groupId = UUID.randomUUID().toString(), + autoOffsetReset = AutoOffsetReset.Earliest, + enableAutoCommit = false + ) + + KafkaConsumer(settings).asFlow() + .subscribeTo(topicName) + .tap { (key, value) -> println("$key -> $value") } + .commitBatchWithin(settings, 3, 10.milliseconds) + .take(4) + .collect() + } + } +} + +fun Flow.tap(also: suspend (A) -> Unit): Flow = + map { it.also { also(it) } } diff --git a/guide/src/test/resources/log4j.properties b/guide/src/test/resources/log4j.properties deleted file mode 100644 index f965cca8..00000000 --- a/guide/src/test/resources/log4j.properties +++ /dev/null @@ -1,3 +0,0 @@ -# Disable log4j logs in testing. -# Test Containers, and servers log a lot of stuff and it's not useful for docs-tests -log4j.rootLogger=OFF \ No newline at end of file diff --git a/knit.code.include b/knit.code.include index 282cc0f0..0da98978 100644 --- a/knit.code.include +++ b/knit.code.include @@ -1,5 +1,5 @@ package ${knit.package}.${knit.name} -import com.github.nomisRev.kafka.* +import io.github.nomisRev.kafka.* import java.util.Properties import kotlinx.coroutines.runBlocking \ No newline at end of file diff --git a/libs.versions.toml b/libs.versions.toml index 3914e021..8e79a34a 100644 --- a/libs.versions.toml +++ b/libs.versions.toml @@ -1,5 +1,4 @@ [versions] -# SNAPSHOT for resource DSL arrowGradleConfig = "0.10.2" kotest = "5.3.1" kafka = "3.2.0" @@ -23,6 +22,14 @@ kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-c kotlinx-coroutines-jdk8 = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8", version.ref = "kotlinx-coroutines" } testcontainers-kafka = "org.testcontainers:kafka:1.17.2" +[bundles] +kotest = [ + "kotest-assertions", + "kotest-framework", + "kotest-property", + "kotest-runner-junit5" +] + [plugins] kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } arrowGradleConfig-kotlin = { id = "io.arrow-kt.arrow-gradle-config-kotlin", version.ref = "arrowGradleConfig" } diff --git a/src/main/kotlin/com/github/nomisRev/kafka/Admin.kt b/src/main/kotlin/io/github/nomisRev/kafka/Admin.kt similarity index 72% rename from src/main/kotlin/com/github/nomisRev/kafka/Admin.kt rename to src/main/kotlin/io/github/nomisRev/kafka/Admin.kt index 50e0bda8..1079f2da 100644 --- a/src/main/kotlin/com/github/nomisRev/kafka/Admin.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Admin.kt @@ -1,4 +1,4 @@ -package com.github.nomisRev.kafka +package io.github.nomisRev.kafka import java.util.Properties import org.apache.kafka.clients.admin.Admin @@ -6,6 +6,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.admin.CreateTopicsOptions import org.apache.kafka.clients.admin.DeleteTopicsOptions import org.apache.kafka.clients.admin.DescribeTopicsOptions +import org.apache.kafka.clients.admin.ListTopicsOptions import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.admin.TopicDescription @@ -41,18 +42,29 @@ import org.apache.kafka.clients.admin.TopicDescription */ public fun Admin(settings: AdminSettings): Admin = Admin.create(settings.properties()) -/** Extension method on [Admin] to create a single Topic in a suspending way. */ +/** Extension method on [Admin] to create a Topic in a suspending way. */ public suspend fun Admin.createTopic( topic: NewTopic, - option: CreateTopicsOptions = CreateTopicsOptions() + option: CreateTopicsOptions = CreateTopicsOptions(), +): Unit = createTopic(listOf(topic), option) + +/** Extension method on [Admin] to create Topics in a suspending way. */ +public suspend fun Admin.createTopic( + topic: Iterable, + option: CreateTopicsOptions = CreateTopicsOptions(), ): Unit { - createTopics(listOf(topic), option).all().await() + createTopics(topic.toList(), option).all().await() } +public suspend fun Admin.topicExists( + topic: NewTopic, + listTopicsOptions: ListTopicsOptions = ListTopicsOptions(), +): Boolean = listTopics(listTopicsOptions).names().await().contains(topic.name()) + /** Extension method on [Admin] to delete a single Topic in a suspending way. */ public suspend fun Admin.deleteTopic( name: String, - options: DeleteTopicsOptions = DeleteTopicsOptions() + options: DeleteTopicsOptions = DeleteTopicsOptions(), ): Unit { deleteTopics(listOf(name), options).all().await() } @@ -60,9 +72,9 @@ public suspend fun Admin.deleteTopic( /** Extension method to describe a single Topic */ public suspend fun Admin.describeTopic( name: String, - options: DescribeTopicsOptions = DescribeTopicsOptions() + options: DescribeTopicsOptions = DescribeTopicsOptions(), ): TopicDescription? = - describeTopics(listOf(name), options).values().getOrDefault(name, null)?.await() + describeTopics(listOf(name), options).topicNameValues().getOrDefault(name, null)?.await() /** * Typed data class for creating a valid [Admin] instance. The only required parameter is the @@ -75,7 +87,7 @@ public suspend fun Admin.describeTopic( */ public data class AdminSettings( val bootstrapServer: String, - private val props: Properties? = null + private val props: Properties? = null, ) { public fun properties(): Properties = Properties().apply { diff --git a/src/main/kotlin/io/github/nomisRev/kafka/Closeable.kt b/src/main/kotlin/io/github/nomisRev/kafka/Closeable.kt new file mode 100644 index 00000000..e5f6f696 --- /dev/null +++ b/src/main/kotlin/io/github/nomisRev/kafka/Closeable.kt @@ -0,0 +1,8 @@ +package io.github.nomisRev.kafka + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import java.io.Closeable + +public fun A.asFlow(): Flow = + flow { use { emit(it) } } diff --git a/src/main/kotlin/com/github/nomisRev/kafka/Consumer.kt b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt similarity index 53% rename from src/main/kotlin/com/github/nomisRev/kafka/Consumer.kt rename to src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt index a3f20c52..48b8e17e 100644 --- a/src/main/kotlin/com/github/nomisRev/kafka/Consumer.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt @@ -1,19 +1,26 @@ @file:JvmName("Consumer") -package com.github.nomisRev.kafka +package io.github.nomisRev.kafka + +import io.github.nomisRev.kafka.internal.chunked +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher import java.time.Duration import java.util.Properties import kotlin.coroutines.coroutineContext import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.runInterruptible import org.apache.kafka.clients.ClientDnsLookup -import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG @@ -51,11 +58,23 @@ import org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFI import org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG import org.apache.kafka.clients.consumer.ConsumerRebalanceListener import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.consumer.RangeAssignor import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InterruptException +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration + +public fun KafkaConsumer(settings: ConsumerSettings): KafkaConsumer = + KafkaConsumer(settings.properties(), settings.keyDeserializer, settings.valueDeserializer) /** * - */ // TODO properly support interruptible `send`, whilst still working in a suspending way. + */ public suspend fun KafkaProducer.sendAwait( record: ProducerRecord -): RecordMetadata = suspendCoroutine { cont -> - // Those can be a SerializationException when it fails to serialize the message, - // a BufferExhaustedException or TimeoutException if the buffer is full, - // or an InterruptException if the sending thread was interrupted. - send(record) { a, e -> - // null if an error occurred, see: org.apache.kafka.clients.producer.Callback - if (a != null) cont.resume(a) else cont.resumeWithException(e) +): RecordMetadata = + suspendCoroutine { cont -> + // Those can be a SerializationException when it fails to serialize the message, + // a BufferExhaustedException or TimeoutException if the buffer is full, + // or an InterruptException if the sending thread was interrupted. + send(record) { a, e -> + // null if an error occurred, see: org.apache.kafka.clients.producer.Callback + if (a != null) cont.resume(a) else cont.resumeWithException(e) + } } -} + +public fun KafkaProducer( + props: Properties, + keyDeserializer: Serializer, + valueDeserializer: Serializer +): KafkaProducer = + KafkaProducer(props, keyDeserializer, valueDeserializer) public fun kafkaProducer( props: Properties, @@ -113,7 +122,8 @@ public data class ProducerSettings( val bootstrapServers: String, val keyDeserializer: Serializer, val valueDeserializer: Serializer, - val acks: Acks = Acks.One + val acks: Acks = Acks.One, + val other: Properties? = null ) { public fun properties(): Properties = Properties().apply { @@ -121,12 +131,12 @@ public data class ProducerSettings( put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer::class.qualifiedName) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer::class.qualifiedName) put(ProducerConfig.ACKS_CONFIG, acks.value) + other?.let { putAll(other) } } /** - * // TODO Support Transactional behavior Kafka producer as a [Flow]. Will automatically close, - * and flush when finished streaming. The [Flow] will close when the [KafkaProducer] is consumed - * from the [Flow]. + * Will automatically close, and flush when finished streaming. + * The [Flow] will close when the [KafkaProducer] is consumed from the [Flow]. * * This means that the [KafkaProducer] will not be closed for a synchronous running stream, but * when running the [Flow] is offloaded in a separate Coroutine it's prone to be collected, closed diff --git a/src/main/kotlin/com/github/nomisRev/kafka/Serializers.kt b/src/main/kotlin/io/github/nomisRev/kafka/Serializers.kt similarity index 98% rename from src/main/kotlin/com/github/nomisRev/kafka/Serializers.kt rename to src/main/kotlin/io/github/nomisRev/kafka/Serializers.kt index 08196f31..c7183c73 100644 --- a/src/main/kotlin/com/github/nomisRev/kafka/Serializers.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Serializers.kt @@ -1,4 +1,4 @@ -package com.github.nomisRev.kafka +package io.github.nomisRev.kafka import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.Deserializer diff --git a/src/main/kotlin/io/github/nomisRev/kafka/internal/FlowTimeChunked.kt b/src/main/kotlin/io/github/nomisRev/kafka/internal/FlowTimeChunked.kt new file mode 100644 index 00000000..61ac1f50 --- /dev/null +++ b/src/main/kotlin/io/github/nomisRev/kafka/internal/FlowTimeChunked.kt @@ -0,0 +1,85 @@ +@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + +/* + * Inspired by https://github.com/Kotlin/kotlinx.coroutines/pull/2378 + */ +package io.github.nomisRev.kafka.internal + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.getOrElse +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.internal.scopedFlow +import kotlinx.coroutines.selects.whileSelect +import kotlinx.coroutines.selects.onTimeout +import kotlin.time.Duration + +/** + * Chunk [Flow] until [size] elements are collected, or until a certain [duration] has passed. + * When the [Flow] completes or throws an exception + * + * Groups emissions from this [Flow] into [List] . Time based implementations + * collect upstream and emit to downstream in separate coroutines - concurrently, like Flow.buffer() operator. + * Exact timing of emissions is not guaranteed, as it depends on collector coroutine availability. + * + * Size based chunking happens in a single coroutine and is purely sequential. + * + * Emissions always preserve order. + * Collects upstream into a buffer and emits its content as a list at every interval or when its buffer reaches + * maximum size. When upstream completes (or is empty), it will try to emit immediately what is left of + * a chunk, omitting the interval and maxSize constraints. + * + * @param duration Interval between emissions in milliseconds. Every emission happens only after + * interval passes, unless upstream Flow completes sooner or maximum size of a chunk is reached. + * + * @param size Maximum size of a single chunk. If reached, it will try to emit a chunk, ignoring the + * interval constraint. If so happens, time-to-next-chunk gets reset to the interval value. + */ +@ExperimentalCoroutinesApi +public fun Flow.chunked( + size: Int, + duration: Duration, +): Flow> { + require(size > 0) { "Cannot create chunks smaller than 0 but found $size" } + require(!duration.isNegative() && duration != Duration.ZERO) { "Chunk duration should be positive non-zero duration" } + return scopedFlow { downstream -> + val emitNowAndMaybeContinue = Channel(capacity = Channel.RENDEZVOUS) + val elements = produce(capacity = size) { + collect { element -> + val hasCapacity = channel.trySend(element).isSuccess + if (!hasCapacity) { + emitNowAndMaybeContinue.send(true) + channel.send(element) + } + } + emitNowAndMaybeContinue.send(false) + } + + whileSelect { + emitNowAndMaybeContinue.onReceive { shouldContinue -> + val chunk = elements.drain(maxElements = size) + if (chunk.isNotEmpty()) downstream.emit(chunk) + shouldContinue + } + + onTimeout(duration) { + val chunk: List = elements.drain(maxElements = size) + if (chunk.isNotEmpty()) downstream.emit(chunk) + true + } + } + } +} + +private tailrec fun ReceiveChannel.drain( + acc: MutableList = mutableListOf(), + maxElements: Int, +): List = + if (acc.size == maxElements) acc + else { + val nextValue = tryReceive().getOrElse { error: Throwable? -> error?.let { throw (it) } ?: return acc } + acc.add(nextValue) + drain(acc, maxElements) + } diff --git a/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt new file mode 100644 index 00000000..e0450dae --- /dev/null +++ b/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt @@ -0,0 +1,80 @@ +package io.github.nomisrev.kafka + +import io.github.nomisRev.kafka.internal.chunked +import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list +import io.kotest.property.checkAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import kotlin.math.absoluteValue +import kotlin.time.Duration.Companion.days +import kotlin.time.Duration.Companion.microseconds + +class ChunkSpec : StringSpec({ + + "should never lose any elements" { + runTest { + checkAll( + Arb.list(Arb.int()), + Arb.int(min = 1) // chunk needs to have minSize 1 + ) { source, maxGroupSize0 -> + val maxGroupSize = (maxGroupSize0 % 20).absoluteValue + 1 + + source.asFlow().map { i -> + delay(100.microseconds) + i + }.chunked(maxGroupSize, 2.days).toList() shouldBe source.chunked(maxGroupSize) + } + } + } + + // @Test + // fun testEmptyFlowTimeOrSizeBasedChunking() = runTest { + // val emptyFlow = emptyFlow() + // val result = measureTimedValue { + // emptyFlow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + // } + // assertTrue(result.value.isEmpty()) + // assertTrue(result.duration < 500.milliseconds) + // } + // + // @Test + // fun testMultipleElementsFillingBufferWithTimeOrSizeBasedChunking() = runTest { + // val flow = flow { + // for (i in 1..10) { + // emit(i) + // } + // } + // val result = measureTimedValue { + // flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 10 * 1000, maxSize = 5)).toList() + // } + // assertEquals(2, result.value.size) + // assertEquals(5, result.value.first().size) + // assertEquals(5, result.value[1].size) + // assertTrue(result.duration < 500.milliseconds) + // } + // + // @Test + // fun testMultipleElementsNotFillingBufferWithTimeOrSizeBasedChunking() = withVirtualTime { + // val flow = flow { + // for (i in 1..5) { + // delay(500) + // emit(i) + // } + // } + // val result = flow.chunked(ChunkingMethod.ByTimeOrSize(intervalMs = 1100, maxSize = 500)).toList() + // + // assertEquals(3, result.size) + // assertEquals(2, result.first().size) + // assertEquals(2, result[1].size) + // assertEquals(1, result[2].size) + // + // finish(1) + // } +}) \ No newline at end of file From fa188a593f17e236d1e9d921e1879893b3cc0611 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Fri, 1 Jul 2022 23:11:56 +0200 Subject: [PATCH 2/5] Update workflow --- .github/workflows/build.yaml | 15 ++++++++++++--- .github/workflows/pr.yaml | 23 +++++++++++------------ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 1454faf7..11e1fe29 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -19,10 +19,19 @@ jobs: steps: - name: Checkout the repo - uses: actions/checkout@v3 + uses: actions/checkout@v3.0.2 + with: + fetch-depth: 0 + + - name: Set up Java + uses: actions/setup-java@v3.3.0 + with: + distribution: 'zulu' - - name: Run Check - run: ./gradlew check + - name: Build + uses: gradle/gradle-build-action@v2.1.6 + with: + arguments: build --scan --full-stacktrace - name: Bundle the build report if: failure() diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index b948c71a..b516818c 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -17,20 +17,19 @@ jobs: steps: - name: Checkout the repo - uses: actions/checkout@v3 + uses: actions/checkout@v3.0.2 + with: + fetch-depth: 0 + + - name: Set up Java + uses: actions/setup-java@v3.3.0 + with: + distribution: 'zulu' - - name: Restore Gradle cache - id: cache - uses: actions/cache@v3.0.4 + - name: Build + uses: gradle/gradle-build-action@v2.1.6 with: - path: | - ~/.gradle/caches - ~/.gradle/wrapper - key: ${{ runner.os }}-check-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} - restore-keys: ${{ runner.os }}-check- - - - name: Run Check - run: ./gradlew check + arguments: build --scan --full-stacktrace - name: Bundle the build report if: failure() From bfaee4387e63503bb194626040bfc6383e4ecdc6 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 2 Jul 2022 10:07:16 +0200 Subject: [PATCH 3/5] Add java-version: 11 to workflow --- .github/workflows/build.yaml | 1 + .github/workflows/pr.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 11e1fe29..06c510c7 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -27,6 +27,7 @@ jobs: uses: actions/setup-java@v3.3.0 with: distribution: 'zulu' + java-version: 11 - name: Build uses: gradle/gradle-build-action@v2.1.6 diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index b516818c..c93da2aa 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -25,6 +25,7 @@ jobs: uses: actions/setup-java@v3.3.0 with: distribution: 'zulu' + java-version: 11 - name: Build uses: gradle/gradle-build-action@v2.1.6 From e70779a5a5ddeb0414ac5e03e12aa49ecaafdf95 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 2 Jul 2022 10:21:22 +0200 Subject: [PATCH 4/5] Fix container --- guide/src/main/kotlin/KafkaContainer.kt | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/guide/src/main/kotlin/KafkaContainer.kt b/guide/src/main/kotlin/KafkaContainer.kt index 31c4cca4..f6950720 100644 --- a/guide/src/main/kotlin/KafkaContainer.kt +++ b/guide/src/main/kotlin/KafkaContainer.kt @@ -1,10 +1,6 @@ -import com.github.dockerjava.api.command.InspectContainerResponse -import io.github.nomisRev.kafka.Admin -import io.github.nomisRev.kafka.AdminSettings -import io.github.nomisRev.kafka.await -import kotlinx.coroutines.runBlocking import org.testcontainers.containers.KafkaContainer import org.testcontainers.utility.DockerImageName +import java.lang.System.getProperty /** * A singleton `Kafka` Test Container. @@ -30,17 +26,18 @@ import org.testcontainers.utility.DockerImageName * @see https://pawelpluta.com/optimise-testcontainers-for-better-tests-performance/ */ class Kafka private constructor(imageName: DockerImageName) : KafkaContainer(imageName) { - + companion object { + private val image: DockerImageName = + if (getProperty("os.arch") == "aarch64") DockerImageName.parse("niciqy/cp-kafka-arm64:7.0.1") + .asCompatibleSubstituteFor("confluentinc/cp-kafka") + else DockerImageName.parse("confluentinc/cp-kafka:6.2.1") + val container: KafkaContainer by lazy { - Kafka(DockerImageName.parse("niciqy/cp-kafka-arm64:7.0.1").asCompatibleSubstituteFor("confluentinc/cp-kafka")) - .withReuse(true) - .withNetwork(null) - .withLabel("io.github.nomisrev.kafka", "fqn-testcontainers-reuse") - .also { it.start() } + Kafka(image).also { it.start() } } } - + // override fun containerIsStarted(containerInfo: InspectContainerResponse?, reused: Boolean) { // super.containerIsStarted(containerInfo, reused) // // If we're reusing the container, we want to reset the state of the container. We do this by From d7ef2632132b923c455c0980f3a5a7b56ed4a311 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 2 Jul 2022 10:29:33 +0200 Subject: [PATCH 5/5] clean up println --- src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt index 48b8e17e..fcc70930 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt @@ -106,7 +106,6 @@ public fun kafkaConsumer(settings: ConsumerSettings): Flow