From 211a3e30836914a64751b10c5bb4b6c8d82d88e1 Mon Sep 17 00:00:00 2001 From: sergeygrigorev Date: Wed, 18 Oct 2017 12:56:10 +0300 Subject: [PATCH] add possibility to BroadcastHub to retry last N records for every new consumer #23816 * add new field to BroadcastHub to work with retry * add unit test to check new feature * update documentation to show this feature --- .../paradox/scala/stream/stream-dynamic.md | 17 +++++ .../test/java/jdocs/stream/HubDocTest.java | 35 +++++++++++ .../test/scala/docs/stream/HubsDocSpec.scala | 24 ++++++- .../scala/akka/stream/scaladsl/HubSpec.scala | 22 +++++++ .../main/scala/akka/stream/javadsl/Hub.scala | 34 +++++++++- .../main/scala/akka/stream/scaladsl/Hub.scala | 62 +++++++++++++++++-- 6 files changed, 187 insertions(+), 7 deletions(-) diff --git a/akka-docs/src/main/paradox/scala/stream/stream-dynamic.md b/akka-docs/src/main/paradox/scala/stream/stream-dynamic.md index f408514ccd32..88932da58566 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-dynamic.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-dynamic.md @@ -120,6 +120,23 @@ backpressure the upstream producer until subscribers arrive. This behavior can b are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new subscriber arrives it will adaptively slow down, ensuring no more messages are dropped. +There is also another case when you could use an infinitive stream with `BroadcastHub`. It could change policy for +the consumed records. Instead of skipping old elements for new sink you could set how many elements (upper bound) +should be remained to be resent to a new sink. + +Scala +: @@snip [HubsDocSpec.scala]($code$/scala/docs/stream/HubsDocSpec.scala) { #broadcast-hub-retry } + +Java +: @@snip [HubDocTest.java]($code$/java/jdocs/stream/HubDocTest.java) { #broadcast-hub-retry } + +@@@ note + +If an upper stream will be closed and there is no any attached sink to the source it also will be closed. +You could duplicate last N messages only with indefinite streams. + +@@@ + ### Combining dynamic stages to build a simple Publish-Subscribe service The features provided by the Hub implementations are limited by default. This is by design, as various combinations diff --git a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java index 63746772d213..251462e66659 100644 --- a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java @@ -13,6 +13,7 @@ import akka.stream.Materializer; import akka.stream.ThrottleMode; import akka.stream.UniqueKillSwitch; +import akka.stream.DelayOverflowStrategy; import akka.stream.javadsl.*; import akka.stream.javadsl.PartitionHub.ConsumerInfo; @@ -24,6 +25,8 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -104,6 +107,38 @@ public void dynamicBroadcast() { materializer.shutdown(); } + @Test + public void dynamicBroadcastRetry() throws InterruptedException { + // Used to be able to clean up the running stream + ActorMaterializer materializer = ActorMaterializer.create(system); + + //#broadcast-hub-retry + // A simple producer that publishes unique "message" every second + Source producer = + Source.from(new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,9,10))) + .map(i -> "tick" + i) + .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + + // Attach a BroadcastHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + RunnableGraph> runnableGraph = + producer.toMat(BroadcastHub.of(String.class, 256, 4), Keep.right()); + Source fromProducer = runnableGraph.run(materializer); + + // run first consumer + fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); + + // wait some time and add another consumer. This consumer will see skipped records + Thread.sleep(2000); + fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer); + //#broadcast-hub-retry + + // Cleanup + materializer.shutdown(); + } + @Test public void mergeBroadcastCombination() { //#pub-sub-1 diff --git a/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala index 78cca87565a9..f4a885cd2955 100644 --- a/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala @@ -4,7 +4,7 @@ package docs.stream import akka.NotUsed -import akka.stream.{ ActorMaterializer, KillSwitches, UniqueKillSwitch } +import akka.stream.{ ActorMaterializer, KillSwitches, UniqueKillSwitch, DelayOverflowStrategy } import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import docs.CompileOnlySpec @@ -65,6 +65,28 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec { //#broadcast-hub } + "demonstrate creating a dynamic broadcast with retry policy" in compileOnlySpec { + //#broadcast-hub-retry + // A simple producer that publishes unique "message" every second + val producer = Source(1 to 10).map(i => s"tick $i").delay(1.second, strategy = DelayOverflowStrategy.backpressure) + + // Attach a BroadcastHub Sink to the producer. This will materialize to a + // corresponding Source. + // (We need to use toMat and Keep.right since by default the materialized + // value to the left is used) + val runnableGraph: RunnableGraph[Source[String, NotUsed]] = + producer.toMat(BroadcastHub.sink(bufferSize = 256, retryBufferSize = 4))(Keep.right) + val fromProducer: Source[String, NotUsed] = runnableGraph.run() + + // run first consumer + fromProducer.runForeach(msg ⇒ println("consumer1: " + msg)) + + // wait some time and add another consumer. This consumer will see skipped records + Thread.sleep(2000) + fromProducer.runForeach(msg ⇒ println("consumer2: " + msg)) + //#broadcast-hub-retry + } + "demonstrate combination" in { def println(s: String) = testActor ! s diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala index 9897e3a2a453..44647e4a1ee8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala @@ -276,6 +276,28 @@ class HubSpec extends StreamSpec { f2.futureValue should ===(1 to 10) } + "retry last 4 elements to the next consumer" in assertAllStagesStopped { + val ((firstElem, lastElem), source) = + Source + .maybe[Int] + .concat(Source(2 to 10)) + .concatMat(Source.maybe[Int])(Keep.both) + .toMat(BroadcastHub.sink(8, 4))(Keep.both).run() + + // Ensure subscription of Sinks. This is racy but there is no event we can hook into here. + val (f1Kill, f1) = source.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.seq)(Keep.both).run() + Thread.sleep(100) + firstElem.success(Some(1)) + Thread.sleep(100) + f1Kill.shutdown() + f1.futureValue should ===(1 to 10) + + val f2 = source.runWith(Sink.seq) + Thread.sleep(100) + lastElem.success(None) + f2.futureValue should ===(7 to 10) + } + "be able to implement a keep-dropping-if-unsubscribed policy with a simple Sink.ignore" in assertAllStagesStopped { val killSwitch = KillSwitches.shared("test-switch") val source = Source.fromIterator(() ⇒ Iterator.from(0)).via(killSwitch.flow).runWith(BroadcastHub.sink(8)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala index c1833811562d..b15086b1e653 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -4,10 +4,11 @@ package akka.stream.javadsl import akka.NotUsed -import java.util.function.{ BiFunction, Supplier, ToLongBiFunction } +import java.util.function.{BiFunction, Supplier, ToLongBiFunction} import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange +import akka.stream.scaladsl.{Sink, Source} /** * A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of @@ -66,6 +67,37 @@ object MergeHub { */ object BroadcastHub { + /** + * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set + * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized + * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the + * broadcast elements from the original [[Sink]]. + * + * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own + * [[Source]] for consuming the [[Sink]] of that materialization. + * + * If parameter retryBufferSize is not equal zero, [[Source]] will duplicate last records for every new consumer. + * + * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized + * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then + * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later + * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are + * cancelled are simply removed from the dynamic set of consumers. + * + * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two + * concurrent consumers can be in terms of element. If this buffer is full, the producer + * is backpressured. Must be a power of two and less than 4096. + * @param retryBufferSize Count of remaining records for new consumers. Gives an upper bound on how "old" records + * from the latest can be produced for a new consumer. Must be non negative and less or equal + * than the buffer size. This size could be zero. In this case any consumer consumes + * all records and new consumers will not see that old records. + */ + def of[T](clazz: Class[T], bufferSize: Int, retryBufferSize: Int): Sink[T, Source[T, NotUsed]] = { + akka.stream.scaladsl.BroadcastHub.sink[T](bufferSize, retryBufferSize) + .mapMaterializedValue(_.asJava) + .asJava + } + /** * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index 0d0f13a480b6..f9271568457a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -298,6 +298,34 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) extends GraphStageWi */ object BroadcastHub { + /** + * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set + * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized + * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the + * broadcast elements from the original [[Sink]]. + * + * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own + * [[Source]] for consuming the [[Sink]] of that materialization. + * + * If parameter retryBufferSize is not equal zero, [[Source]] will duplicate last records for every new consumer. + * + * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized + * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then + * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later + * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are + * cancelled are simply removed from the dynamic set of consumers. + * + * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two + * concurrent consumers can be in terms of element. If this buffer is full, the producer + * is backpressured. Must be a power of two and less than 4096. + * @param retryBufferSize Count of remaining records for new consumers. Gives an upper bound on how "old" records + * from the latest can be produced for a new consumer. Must be non negative and less or equal + * than the buffer size. This size could be zero. In this case any consumer consumes + * all records and new consumers will not see that old records. + */ + def sink[T](bufferSize: Int, retryBufferSize: Int): Sink[T, Source[T, NotUsed]] = + Sink.fromGraph(new BroadcastHub[T](bufferSize, retryBufferSize)) + /** * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized @@ -317,7 +345,7 @@ object BroadcastHub { * concurrent consumers can be in terms of element. If this buffer is full, the producer * is backpressured. Must be a power of two and less than 4096. */ - def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize)) + def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = sink(bufferSize = bufferSize, retryBufferSize = 0) /** * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set @@ -342,10 +370,12 @@ object BroadcastHub { /** * INTERNAL API */ -private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { +private[akka] class BroadcastHub[T](bufferSize: Int, retryBufferSize: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { require(bufferSize > 0, "Buffer size must be positive") require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed") require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two") + require(retryBufferSize >= 0, "Retry size must be non negative") + require(retryBufferSize <= bufferSize, "Retry size can't be bigger than the buffer size") private val Mask = bufferSize - 1 private val WheelMask = (bufferSize * 2) - 1 @@ -435,11 +465,17 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater else if (head != finalOffset) { // If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not // see the already consumed elements. This feature is quite handy. - while (head != finalOffset) { + val limitClear = + if (tail - finalOffset >= retryBufferSize) finalOffset + else if (tail - head < retryBufferSize) head + else tail - retryBufferSize + + while (head != limitClear) { queue(head & Mask) = null head += 1 } - head = finalOffset + + head = limitClear if (!hasBeenPulled(in)) pull(in) } } else checkUnblock(previousOffset) @@ -518,9 +554,14 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater private def unblockIfPossible(offsetOfConsumerRemoved: Int): Boolean = { var unblocked = false if (offsetOfConsumerRemoved == head) { + // do not clear retry count (if less than retryBufferSize - do not clear) + val offset = + if (tail - head > retryBufferSize) tail - retryBufferSize + else head + // Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until // we either find a nonempty one, or we reached the end of the buffer. - while (consumerWheel(head & WheelMask).isEmpty && head != tail) { + while (consumerWheel(head & WheelMask).isEmpty && head != offset) { queue(head & Mask) = null head += 1 unblocked = true @@ -576,6 +617,17 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater val idx = tail & Mask val wheelSlot = tail & WheelMask queue(idx) = elem.asInstanceOf[AnyRef] + + val diff = tail - head + if (retryBufferSize > 0 && diff > retryBufferSize) { + /* if no consumers on head - move head */ + val consumersInSlot = consumerWheel(head & WheelMask) + if (consumersInSlot.isEmpty) { + /* forget about the oldest record */ + head = head + 1 + } + } + // Publish the new tail before calling the wakeup tail = tail + 1 wakeupIdx(wheelSlot)