You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use this library in my Ktor project, and runs the KafkaReceiver with IO dispatcher, and sets commitStrategy = CommitStrategy.BySize(100), it throws java.util.ConcurrentModificationException. The full stacktrace is
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: kotlin-kafka-test-group-id-1 @coroutine#8, id: 31) otherThread(id: 33)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1207)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.commitAsync(LegacyKafkaConsumer.java:740)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1117)
at io.github.nomisRev.kafka.receiver.internals.EventLoop.commitAsync(EventLoop.kt:320)
at io.github.nomisRev.kafka.receiver.internals.EventLoop.commit(EventLoop.kt:299)
at io.github.nomisRev.kafka.receiver.internals.EventLoop.access$commit(EventLoop.kt:63)
at io.github.nomisRev.kafka.receiver.internals.EventLoop$scheduleCommitIfRequired$1.invokeSuspend(EventLoop.kt:287)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
I can also reproduce this exception with the following test code:
@Test
fun`All acknowledged messages are committed on flow completion`() = withTopic {
publishToKafka(topic, produced)
withContext(Dispatchers.IO) { // Make it runs in IO dispatcherval receiver =KafkaReceiver(
receiverSetting().copy(
commitStrategy =CommitStrategy.BySize(count)
)
)
receiver.receive(topic.name())
.take(count)
.collectIndexed { index, value ->if (index == lastIndex) {
assertEquals(0, receiver.committedCount(topic.name()))
value.offset.acknowledge()
} else value.offset.acknowledge()
}
assertEquals(receiver.committedCount(topic.name()), count.toLong())
}
}
The text was updated successfully, but these errors were encountered:
tKe
added a commit
to tKe/kotlin-kafka
that referenced
this issue
Sep 6, 2024
I use this library in my Ktor project, and runs the
KafkaReceiver
with IO dispatcher, and setscommitStrategy = CommitStrategy.BySize(100)
, it throwsjava.util.ConcurrentModificationException
. The full stacktrace isI can also reproduce this exception with the following test code:
The text was updated successfully, but these errors were encountered: