From 187dfeed56eed4976d1fe0a944b529a4a679966d Mon Sep 17 00:00:00 2001 From: atyutin Date: Thu, 20 Aug 2020 02:09:55 +0300 Subject: [PATCH] Add RestartSink for starting again when it fails or completes. --- README.md | 10 ++++++++++ .../lagom/javadsl/broker/SimpleTopicProducer.kt | 16 +++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b7952b9..822fbbe 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,16 @@ foo-topic.producer { # Possible values: "dropHead", "backpressure", "dropBuffer", "dropNew", "dropTail", "fail". overflow-strategy = "dropHead" + # Minimum (initial) duration until the child actor will started again, if it is terminated. + min-backoff = 3s + + # The exponential back-off is capped to this duration. + max-backoff = 30s + + # After calculation of the exponential back-off an additional random delay based on this factor is added, + # e.g. 0.2 adds up to 20% delay. In order to skip this additional delay pass in 0. + random-factor = 0.2 + # Properties defined by org.apache.kafka.clients.producer.ProducerConfig # can be defined in this configuration section. kafka-clients { diff --git a/java/src/main/kotlin/org/taymyr/lagom/javadsl/broker/SimpleTopicProducer.kt b/java/src/main/kotlin/org/taymyr/lagom/javadsl/broker/SimpleTopicProducer.kt index 5746dcb..254e004 100644 --- a/java/src/main/kotlin/org/taymyr/lagom/javadsl/broker/SimpleTopicProducer.kt +++ b/java/src/main/kotlin/org/taymyr/lagom/javadsl/broker/SimpleTopicProducer.kt @@ -6,6 +6,7 @@ import akka.kafka.javadsl.Producer import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.QueueOfferResult +import akka.stream.javadsl.RestartSink import akka.stream.javadsl.Source import akka.stream.javadsl.SourceQueueWithComplete import com.lightbend.lagom.javadsl.api.ServiceLocator @@ -15,6 +16,7 @@ import com.typesafe.config.Config import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.Serializer import org.apache.kafka.common.serialization.StringSerializer +import java.time.Duration import java.util.concurrent.CompletionStage import java.util.concurrent.TimeUnit @@ -39,6 +41,9 @@ class SimpleTopicProducer internal constructor( val producerConfigPath = "${topicId.value()}.producer" val bufferSizePath = "${topicId.value()}.producer.buffer-size" val overflowStrategyPath = "${topicId.value()}.producer.overflow-strategy" + val minBackoffPath = "${topicId.value()}.producer.min-backoff" + val maxBackoffPath = "${topicId.value()}.producer.max-backoff" + val randomFactorPath = "${topicId.value()}.producer.random-factor" val bufferSize = if (config.hasPath(bufferSizePath)) config.getInt(bufferSizePath) else 100 val producerSettings = if (config.hasPath(producerConfigPath)) { ProducerSettings.create( @@ -70,10 +75,19 @@ class SimpleTopicProducer internal constructor( else -> throw IllegalArgumentException("Unknown value overflow-strategy, " + "expected [dropHead, backpressure, dropBuffer, dropNew, dropTail, fail]") } + val minBackoff = if (config.hasPath(minBackoffPath)) + config.getDuration(minBackoffPath) + else Duration.ofSeconds(3) + val maxBackoff = if (config.hasPath(maxBackoffPath)) + config.getDuration(maxBackoffPath) + else Duration.ofSeconds(30) + val randomFactor = if (config.hasPath(randomFactorPath)) + config.getDouble(randomFactorPath) + else 0.2 this.source = Source.queue(bufferSize, overflowStrategy) .map { ProducerRecord(topicId.value(), partitionKeyStrategy?.computePartitionKey(it), it) } - .to(Producer.plainSink(this.producerSettings)) + .to(RestartSink.withBackoff(minBackoff, maxBackoff, randomFactor) { Producer.plainSink(this.producerSettings) }) .run(materializer) }