Skip to content

Commit

Permalink
Merge pull request #153 from nomisRev/fix-auto-offset-reset
Browse files Browse the repository at this point in the history
Add publish plugin
  • Loading branch information
nomisRev committed Oct 7, 2023
2 parents 6747026 + af5cb6d commit 6344476
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 39 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ dependencies {

<!--- INCLUDE
import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.produce
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins {
alias(libs.plugins.dokka)
alias(libs.plugins.spotless)
alias(libs.plugins.knit)
alias(libs.plugins.publish)
}

repositories {
Expand Down
13 changes: 8 additions & 5 deletions guide/example/example-admin-01.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package example.exampleAdmin01

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.await
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.deleteTopic
import org.apache.kafka.clients.ClientDnsLookup
import org.apache.kafka.clients.admin.AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG
import org.apache.kafka.clients.admin.NewTopic
import java.util.Properties

fun main() = runBlocking {
fun main() = SuspendApp {
val settings = AdminSettings(
Kafka.container.bootstrapServers,
Properties().apply {
Expand Down
12 changes: 6 additions & 6 deletions guide/example/example-consumer-01.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package example.exampleConsumer01

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.ConsumerSettings
import io.github.nomisRev.kafka.kafkaConsumer
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.subscribeTo
import java.util.UUID
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.collect
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
@JvmInline value class Key(val index: Int)
@JvmInline value class Message(val content: String)

fun main() = runBlocking {
fun main() = SuspendApp {
val settings: ConsumerSettings<Key, Message> = ConsumerSettings(
Kafka.container.bootstrapServers,
IntegerDeserializer().map(::Key),
Expand Down
12 changes: 6 additions & 6 deletions guide/example/example-producer-01.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package example.exampleProducer01

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.produce
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringSerializer
@JvmInline value class Key(val index: Int)
@JvmInline value class Message(val content: String)

fun main() = runBlocking {
fun main() = SuspendApp {
val settings: ProducerSettings<Key, Message> = ProducerSettings(
Kafka.container.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
Expand Down
9 changes: 4 additions & 5 deletions guide/example/example-producer-02.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package example.exampleProducer02

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.sendAwait
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

fun main() = runBlocking<Unit> {
fun main() = SuspendApp {
KafkaProducer(Properties(), StringSerializer(), StringSerializer()).use { producer ->
producer.sendAwait(ProducerRecord("topic-name", "message #1"))
producer.sendAwait(ProducerRecord("topic-name", "message #2"))
Expand Down
13 changes: 9 additions & 4 deletions guide/example/example-readme-01.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package example.exampleReadme01

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.produce
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
Expand Down
4 changes: 0 additions & 4 deletions knit.code.include
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
package ${knit.package}.${knit.name}

import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking
9 changes: 8 additions & 1 deletion src/main/kotlin/io/github/nomisRev/kafka/Admin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ import org.apache.kafka.clients.admin.TopicDescription
* [kotlin.use], or arrow.fx.coroutines.Resource.
*
* <!--- INCLUDE
* import arrow.continuations.SuspendApp
* import io.github.nomisRev.kafka.Admin
* import io.github.nomisRev.kafka.AdminSettings
* import io.github.nomisRev.kafka.await
* import io.github.nomisRev.kafka.createTopic
* import io.github.nomisRev.kafka.deleteTopic
* import org.apache.kafka.clients.ClientDnsLookup
* import org.apache.kafka.clients.admin.AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG
* import org.apache.kafka.clients.admin.NewTopic
* import java.util.Properties
* -->
* ```kotlin
* fun main() = runBlocking {
* fun main() = SuspendApp {
* val settings = AdminSettings(
* Kafka.container.bootstrapServers,
* Properties().apply {
Expand Down
8 changes: 6 additions & 2 deletions src/main/kotlin/io/github/nomisRev/kafka/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,20 @@ public fun <K, V> KafkaConsumer(settings: ConsumerSettings<K, V>): KafkaConsumer

/**
* <!--- INCLUDE
* import arrow.continuations.SuspendApp
* import io.github.nomisRev.kafka.ConsumerSettings
* import io.github.nomisRev.kafka.kafkaConsumer
* import io.github.nomisRev.kafka.map
* import io.github.nomisRev.kafka.subscribeTo
* import java.util.UUID
* import kotlinx.coroutines.flow.map
* import kotlinx.coroutines.flow.collect
* import org.apache.kafka.common.serialization.IntegerDeserializer
* import org.apache.kafka.common.serialization.StringDeserializer
* @JvmInline value class Key(val index: Int)
* @JvmInline value class Message(val content: String)
* -->
* ```kotlin
* fun main() = runBlocking {
* fun main() = SuspendApp {
* val settings: ConsumerSettings<Key, Message> = ConsumerSettings(
* Kafka.container.bootstrapServers,
* IntegerDeserializer().map(::Key),
Expand Down
16 changes: 12 additions & 4 deletions src/main/kotlin/io/github/nomisRev/kafka/Producer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package io.github.nomisRev.kafka

import kotlinx.coroutines.ExperimentalCoroutinesApi
import java.util.Properties
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
Expand All @@ -24,16 +25,20 @@ import org.apache.kafka.common.serialization.Serializer
* all [ProducerRecord] are published *in-order* in a synchronous way.
*
* <!--- INCLUDE
* import arrow.continuations.SuspendApp
* import io.github.nomisRev.kafka.Acks
* import io.github.nomisRev.kafka.ProducerSettings
* import io.github.nomisRev.kafka.imap
* import io.github.nomisRev.kafka.produce
* import kotlinx.coroutines.flow.asFlow
* import kotlinx.coroutines.flow.collect
* import org.apache.kafka.clients.producer.ProducerRecord
* import org.apache.kafka.common.serialization.IntegerSerializer
* import org.apache.kafka.common.serialization.StringSerializer
* @JvmInline value class Key(val index: Int)
* @JvmInline value class Message(val content: String)
* -->
* ```kotlin
* fun main() = runBlocking {
* fun main() = SuspendApp {
* val settings: ProducerSettings<Key, Message> = ProducerSettings(
* Kafka.container.bootstrapServers,
* IntegerSerializer().imap { key: Key -> key.index },
Expand All @@ -49,7 +54,7 @@ import org.apache.kafka.common.serialization.Serializer
* ```
* <!--- KNIT example-producer-01.kt -->
*/
@OptIn(FlowPreview::class)
@OptIn(ExperimentalCoroutinesApi::class)
public fun <A, B> Flow<ProducerRecord<A, B>>.produce(
settings: ProducerSettings<A, B>,
): Flow<RecordMetadata> =
Expand All @@ -61,12 +66,15 @@ public fun <A, B> Flow<ProducerRecord<A, B>>.produce(
* Sends a record to a Kafka topic in a suspending way.
*
* <!--- INCLUDE
* import arrow.continuations.SuspendApp
* import io.github.nomisRev.kafka.sendAwait
* import org.apache.kafka.clients.producer.KafkaProducer
* import org.apache.kafka.clients.producer.ProducerRecord
* import org.apache.kafka.common.serialization.StringSerializer
* import java.util.Properties
* -->
* ```kotlin
* fun main() = runBlocking<Unit> {
* fun main() = SuspendApp {
* KafkaProducer(Properties(), StringSerializer(), StringSerializer()).use { producer ->
* producer.sendAwait(ProducerRecord("topic-name", "message #1"))
* producer.sendAwait(ProducerRecord("topic-name", "message #2"))
Expand Down
3 changes: 1 addition & 2 deletions src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.nomisrev.kafka

import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.deleteTopic
Expand All @@ -11,7 +11,6 @@ import io.github.nomisRev.kafka.produce
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.kotest.core.spec.style.StringSpec
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ProducerSettingSpec : StringSpec({
)

assertSoftly(settings.properties()) {
@Suppress("UNCHECKED_CAST")
toMap().shouldContainAll(map as Map<Any, Any>)
getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) shouldBe bootstrapServers
getProperty(ProducerConfig.ACKS_CONFIG) shouldBe acks.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.kotest.assertions.assertSoftly
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectIndexed
Expand All @@ -21,6 +22,7 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.yield
import org.apache.kafka.clients.producer.ProducerRecord

@OptIn(ExperimentalCoroutinesApi::class)
class KafakReceiverSpec : KafkaSpec({

val depth = 100
Expand Down

0 comments on commit 6344476

Please sign in to comment.