Skip to content
This repository has been archived by the owner on Nov 18, 2022. It is now read-only.

Commit

Permalink
add possibility to BroadcastHub to retry last N records for every new…
Browse files Browse the repository at this point in the history
… consumer akka#23816

* add new field to BroadcastHub to work with retry
* add unit test to check new feature
* update documentation to show this feature
  • Loading branch information
Sergey Grigorev committed Dec 14, 2017
1 parent 6541d3d commit 3a19758
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 6 deletions.
17 changes: 17 additions & 0 deletions akka-docs/src/main/paradox/stream/stream-dynamic.md
Expand Up @@ -120,6 +120,23 @@ backpressure the upstream producer until subscribers arrive. This behavior can b
are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new
subscriber arrives it will adaptively slow down, ensuring no more messages are dropped.

There is also another case when you could use an infinitive stream with `BroadcastHub`. It could change policy for
the consumed records. Instead of skipping old elements for new sink you could set how many elements (upper bound)
should be remained to be resent to a new sink.

Scala
: @@snip [HubsDocSpec.scala]($code$/scala/docs/stream/HubsDocSpec.scala) { #broadcast-hub-retry }

Java
: @@snip [HubDocTest.java]($code$/java/jdocs/stream/HubDocTest.java) { #broadcast-hub-retry }

@@@ note

If an upper stream will be closed and there is no any attached sink to the source it also will be closed.
You could duplicate last N messages only with indefinite streams.

@@@

### Combining dynamic stages to build a simple Publish-Subscribe service

The features provided by the Hub implementations are limited by default. This is by design, as various combinations
Expand Down
35 changes: 35 additions & 0 deletions akka-docs/src/test/java/jdocs/stream/HubDocTest.java
Expand Up @@ -13,6 +13,7 @@
import akka.stream.Materializer;
import akka.stream.ThrottleMode;
import akka.stream.UniqueKillSwitch;
import akka.stream.DelayOverflowStrategy;
import akka.stream.javadsl.*;
import akka.stream.javadsl.PartitionHub.ConsumerInfo;

Expand All @@ -24,6 +25,8 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -104,6 +107,38 @@ public void dynamicBroadcast() {
materializer.shutdown();
}

@Test
public void dynamicBroadcastRetry() throws InterruptedException {
// Used to be able to clean up the running stream
ActorMaterializer materializer = ActorMaterializer.create(system);

//#broadcast-hub-retry
// A simple producer that publishes unique "message" every second
Source<String, NotUsed> producer =
Source.from(new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,9,10)))
.map(i -> "tick" + i)
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());

// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
RunnableGraph<Source<String, NotUsed>> runnableGraph =
producer.toMat(BroadcastHub.of(String.class, 256, 4), Keep.right());
Source<String, NotUsed> fromProducer = runnableGraph.run(materializer);

// run first consumer
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);

// wait some time and add another consumer. This consumer will see skipped records
Thread.sleep(2000);
fromProducer.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
//#broadcast-hub-retry

// Cleanup
materializer.shutdown();
}

@Test
public void mergeBroadcastCombination() {
//#pub-sub-1
Expand Down
24 changes: 23 additions & 1 deletion akka-docs/src/test/scala/docs/stream/HubsDocSpec.scala
Expand Up @@ -4,7 +4,7 @@
package docs.stream

import akka.NotUsed
import akka.stream.{ ActorMaterializer, KillSwitches, UniqueKillSwitch }
import akka.stream.{ ActorMaterializer, KillSwitches, UniqueKillSwitch, DelayOverflowStrategy }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
Expand Down Expand Up @@ -65,6 +65,28 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
//#broadcast-hub
}

"demonstrate creating a dynamic broadcast with retry policy" in compileOnlySpec {
//#broadcast-hub-retry
// A simple producer that publishes unique "message" every second
val producer = Source(1 to 10).map(i => s"tick $i").delay(1.second, strategy = DelayOverflowStrategy.backpressure)

// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
producer.toMat(BroadcastHub.sink(bufferSize = 256, retryBufferSize = 4))(Keep.right)
val fromProducer: Source[String, NotUsed] = runnableGraph.run()

// run first consumer
fromProducer.runForeach(msg println("consumer1: " + msg))

// wait some time and add another consumer. This consumer will see skipped records
Thread.sleep(2000)
fromProducer.runForeach(msg println("consumer2: " + msg))
//#broadcast-hub-retry
}

"demonstrate combination" in {
def println(s: String) = testActor ! s

Expand Down
Expand Up @@ -276,6 +276,28 @@ class HubSpec extends StreamSpec {
f2.futureValue should ===(1 to 10)
}

"retry last 4 elements to the next consumer" in assertAllStagesStopped {
val ((firstElem, lastElem), source) =
Source
.maybe[Int]
.concat(Source(2 to 10))
.concatMat(Source.maybe[Int])(Keep.both)
.toMat(BroadcastHub.sink(8, 4))(Keep.both).run()

// Ensure subscription of Sinks. This is racy but there is no event we can hook into here.
val (f1Kill, f1) = source.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.seq)(Keep.both).run()
Thread.sleep(100)
firstElem.success(Some(1))
Thread.sleep(100)
f1Kill.shutdown()
f1.futureValue should ===(1 to 10)

val f2 = source.runWith(Sink.seq)
Thread.sleep(100)
lastElem.success(None)
f2.futureValue should ===(7 to 10)
}

"be able to implement a keep-dropping-if-unsubscribed policy with a simple Sink.ignore" in assertAllStagesStopped {
val killSwitch = KillSwitches.shared("test-switch")
val source = Source.fromIterator(() Iterator.from(0)).via(killSwitch.flow).runWith(BroadcastHub.sink(8))
Expand Down
32 changes: 32 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala
Expand Up @@ -8,6 +8,7 @@ import java.util.function.{ BiFunction, Supplier, ToLongBiFunction }

import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.stream.scaladsl.{ Sink, Source }

/**
* A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of
Expand Down Expand Up @@ -66,6 +67,37 @@ object MergeHub {
*/
object BroadcastHub {

/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
* value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
* broadcast elements from the original [[Sink]].
*
* Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
* [[Source]] for consuming the [[Sink]] of that materialization.
*
* If parameter retryBufferSize is not equal zero, [[Source]] will duplicate last records for every new consumer.
*
* If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
* [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
* all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
* materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
* cancelled are simply removed from the dynamic set of consumers.
*
* @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
* concurrent consumers can be in terms of element. If this buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
* @param retryBufferSize Count of remaining records for new consumers. Gives an upper bound on how "old" records
* from the latest can be produced for a new consumer. Must be non negative and less or equal
* than the buffer size. This size could be zero. In this case any consumer consumes
* all records and new consumers will not see that old records.
*/
def of[T](clazz: Class[T], bufferSize: Int, retryBufferSize: Int): Sink[T, Source[T, NotUsed]] = {
akka.stream.scaladsl.BroadcastHub.sink[T](bufferSize, retryBufferSize)
.mapMaterializedValue(_.asJava)
.asJava
}

/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
Expand Down
62 changes: 57 additions & 5 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala
Expand Up @@ -297,6 +297,34 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) extends GraphStageWi
*/
object BroadcastHub {

/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
* value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
* broadcast elements from the original [[Sink]].
*
* Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
* [[Source]] for consuming the [[Sink]] of that materialization.
*
* If parameter retryBufferSize is not equal zero, [[Source]] will duplicate last records for every new consumer.
*
* If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
* [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
* all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
* materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
* cancelled are simply removed from the dynamic set of consumers.
*
* @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
* concurrent consumers can be in terms of element. If this buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
* @param retryBufferSize Count of remaining records for new consumers. Gives an upper bound on how "old" records
* from the latest can be produced for a new consumer. Must be non negative and less or equal
* than the buffer size. This size could be zero. In this case any consumer consumes
* all records and new consumers will not see that old records.
*/
def sink[T](bufferSize: Int, retryBufferSize: Int): Sink[T, Source[T, NotUsed]] =
Sink.fromGraph(new BroadcastHub[T](bufferSize, retryBufferSize))

/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
Expand All @@ -316,7 +344,7 @@ object BroadcastHub {
* concurrent consumers can be in terms of element. If this buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
*/
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = sink(bufferSize = bufferSize, retryBufferSize = 0)

/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
Expand All @@ -341,10 +369,12 @@ object BroadcastHub {
/**
* INTERNAL API
*/
private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] {
private[akka] class BroadcastHub[T](bufferSize: Int, retryBufferSize: Int) extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] {
require(bufferSize > 0, "Buffer size must be positive")
require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed")
require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two")
require(retryBufferSize >= 0, "Retry size must be non negative")
require(retryBufferSize <= bufferSize, "Retry size can't be bigger than the buffer size")

private val Mask = bufferSize - 1
private val WheelMask = (bufferSize * 2) - 1
Expand Down Expand Up @@ -441,11 +471,17 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
val limitClear =
if (tail - finalOffset >= retryBufferSize) finalOffset
else if (tail - head < retryBufferSize) head
else tail - retryBufferSize

while (head != limitClear) {
queue(head & Mask) = null
head += 1
}
head = finalOffset

head = limitClear
if (!hasBeenPulled(in)) pull(in)
}
} else checkUnblock(previousOffset)
Expand Down Expand Up @@ -525,9 +561,14 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater
private def unblockIfPossible(offsetOfConsumerRemoved: Int): Boolean = {
var unblocked = false
if (offsetOfConsumerRemoved == head) {
// do not clear retry count (if less than retryBufferSize - do not clear)
val offset =
if (tail - head > retryBufferSize) tail - retryBufferSize
else head

// Try to advance along the wheel. We can skip any wheel slots which have no waiting Consumers, until
// we either find a nonempty one, or we reached the end of the buffer.
while (consumerWheel(head & WheelMask).isEmpty && head != tail) {
while (consumerWheel(head & WheelMask).isEmpty && head != offset) {
queue(head & Mask) = null
head += 1
unblocked = true
Expand Down Expand Up @@ -583,6 +624,17 @@ private[akka] class BroadcastHub[T](bufferSize: Int) extends GraphStageWithMater
val idx = tail & Mask
val wheelSlot = tail & WheelMask
queue(idx) = elem.asInstanceOf[AnyRef]

val diff = tail - head
if (retryBufferSize > 0 && diff > retryBufferSize) {
/* if no consumers on head - move head */
val consumersInSlot = consumerWheel(head & WheelMask)
if (consumersInSlot.isEmpty) {
/* forget about the oldest record */
head = head + 1
}
}

// Publish the new tail before calling the wakeup
tail = tail + 1
wakeupIdx(wheelSlot)
Expand Down

0 comments on commit 3a19758

Please sign in to comment.