Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RestartSink for SimpleTopicProducer #13

Merged
merged 1 commit into from Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Expand Up @@ -126,6 +126,16 @@ foo-topic.producer {
# Possible values: "dropHead", "backpressure", "dropBuffer", "dropNew", "dropTail", "fail".
overflow-strategy = "dropHead"

# Minimum (initial) duration until the child actor will started again, if it is terminated.
min-backoff = 3s

# The exponential back-off is capped to this duration.
max-backoff = 30s

# After calculation of the exponential back-off an additional random delay based on this factor is added,
# e.g. 0.2 adds up to 20% delay. In order to skip this additional delay pass in 0.
random-factor = 0.2

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
Expand Down
Expand Up @@ -6,6 +6,7 @@ import akka.kafka.javadsl.Producer
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult
import akka.stream.javadsl.RestartSink
import akka.stream.javadsl.Source
import akka.stream.javadsl.SourceQueueWithComplete
import com.lightbend.lagom.javadsl.api.ServiceLocator
Expand All @@ -15,6 +16,7 @@ import com.typesafe.config.Config
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.common.serialization.StringSerializer
import java.time.Duration
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit

Expand All @@ -39,6 +41,9 @@ class SimpleTopicProducer<T> internal constructor(
val producerConfigPath = "${topicId.value()}.producer"
val bufferSizePath = "${topicId.value()}.producer.buffer-size"
val overflowStrategyPath = "${topicId.value()}.producer.overflow-strategy"
val minBackoffPath = "${topicId.value()}.producer.min-backoff"
ihostage marked this conversation as resolved.
Show resolved Hide resolved
val maxBackoffPath = "${topicId.value()}.producer.max-backoff"
val randomFactorPath = "${topicId.value()}.producer.random-factor"
val bufferSize = if (config.hasPath(bufferSizePath)) config.getInt(bufferSizePath) else 100
val producerSettings = if (config.hasPath(producerConfigPath)) {
ProducerSettings.create(
Expand Down Expand Up @@ -70,10 +75,19 @@ class SimpleTopicProducer<T> internal constructor(
else -> throw IllegalArgumentException("Unknown value overflow-strategy, " +
"expected [dropHead, backpressure, dropBuffer, dropNew, dropTail, fail]")
}
val minBackoff = if (config.hasPath(minBackoffPath))
config.getDuration(minBackoffPath)
else Duration.ofSeconds(3)
val maxBackoff = if (config.hasPath(maxBackoffPath))
config.getDuration(maxBackoffPath)
else Duration.ofSeconds(30)
val randomFactor = if (config.hasPath(randomFactorPath))
config.getDouble(randomFactorPath)
else 0.2

this.source = Source.queue<T>(bufferSize, overflowStrategy)
.map { ProducerRecord<String, T>(topicId.value(), partitionKeyStrategy?.computePartitionKey(it), it) }
.to(Producer.plainSink(this.producerSettings))
.to(RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { Producer.plainSink(this.producerSettings) })
.run(materializer)
}

Expand Down