Skip to content

Commit

Permalink
Adds RestartFlow to restart only on failures akka#24421
Browse files Browse the repository at this point in the history
  • Loading branch information
nachinius committed Jan 29, 2018
1 parent b90a116 commit e734347
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 16 deletions.
Expand Up @@ -5,13 +5,12 @@ package akka.stream.scaladsl

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.DefaultTimeout
import akka.testkit.TestDuration
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.testkit.{ DefaultTimeout, TestDuration }
import akka.{ Done, NotUsed }

import scala.concurrent.Promise
import scala.concurrent.duration._
Expand All @@ -20,6 +19,7 @@ import scala.util.{ Failure, Success }
class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "10s")) with DefaultTimeout {

implicit val mat = ActorMaterializer()

import system.dispatcher

private val shortMinBackoff = 10.millis
Expand Down Expand Up @@ -453,7 +453,15 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1

"A restart with backoff flow" should {

def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1) = {
// helps reuse all the setupFlow code for both methods: withBackoff, and onlyOnFailuresWithBackoff
def RestartFlowFactory[In, Out](onlyOnFailures: Boolean): (FiniteDuration, FiniteDuration, Double, Int) (() Flow[In, Out, _]) Flow[In, Out, NotUsed] = if (onlyOnFailures) {
RestartFlow.onFailuresWithBackoff
} else {
// choose the correct backoff method
(minBackoff, maxBackoff, randomFactor, maxRestarts) RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts)
}

def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1, onlyOnFailures: Boolean = false) = {
val created = new AtomicInteger()
val (flowInSource, flowInProbe) = TestSource.probe[String]
.buffer(4, OverflowStrategy.backpressure)
Expand All @@ -462,7 +470,7 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1

// We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we
// simply use the probes as a message bus for feeding and capturing events.
val (source, sink) = TestSource.probe[String].viaMat(RestartFlow.withBackoff(minBackoff, maxBackoff, 0, maxRestarts) { ()
val (source, sink) = TestSource.probe[String].viaMat(RestartFlowFactory(onlyOnFailures)(minBackoff, maxBackoff, 0, maxRestarts) { ()
created.incrementAndGet()
Flow.fromSinkAndSource(
Flow[String]
Expand Down Expand Up @@ -667,6 +675,65 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1

created.get() should ===(2)
}
}

// onlyOnFailures -->
"stop on cancellation when using onlyOnFailuresWithBackoff" in {
val onlyOnFailures = true
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff, -1, onlyOnFailures)

source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")

source.sendNext("cancel")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
flowInProbe.expectNext("in complete")

source.expectCancellation()

created.get() should ===(1)
}

"stop on completion when using onlyOnFailuresWithBackoff" in {
val onlyOnFailures = true
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff, -1, onlyOnFailures)

source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")

flowOutProbe.sendNext("complete")
sink.request(1)
sink.expectComplete()

created.get() should ===(1)
}

"restart on failure when using onlyOnFailuresWithBackoff" in {
val (created, source, flowInProbe, flowOutProbe, sink) = setupFlow(shortMinBackoff, shortMaxBackoff, -1, true)

source.sendNext("a")
flowInProbe.requestNext("a")
flowOutProbe.sendNext("b")
sink.requestNext("b")

sink.request(1)
flowOutProbe.sendNext("error")

// This should complete the in probe
flowInProbe.requestNext("in complete")

// and it should restart
source.sendNext("c")
flowInProbe.requestNext("c")
flowOutProbe.sendNext("d")
sink.requestNext("d")

created.get() should ===(2)
}

}
}
32 changes: 32 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Restart.scala
Expand Up @@ -276,4 +276,36 @@ object RestartFlow {
flowFactory.create().asScala
}.asJava
}

/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart only when it fails that restarts
* using an exponential backoff.
*
* This new [[Flow]] will not emit failures. Any failure by the original [[Flow]] (the wrapped one) before that
* time will be handled by restarting it as long as maxRestarts is not reached.
* However, any termination signals, completion or cancellation sent to this [[Flow]] will terminate
* the wrapped [[Flow]], if it's running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double,
maxRestarts: Int, flowFactory: Creator[Flow[In, Out, _]]): Flow[In, Out, NotUsed] = {
akka.stream.scaladsl.RestartFlow.onFailuresWithBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { ()
flowFactory.create().asScala
}.asJava
}
}
50 changes: 41 additions & 9 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala
Expand Up @@ -286,7 +286,7 @@ object RestartFlow {
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, Int.MaxValue))
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, Int.MaxValue))
}

/**
Expand Down Expand Up @@ -315,23 +315,55 @@ object RestartFlow {
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, maxRestarts))
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts))
}

/**
* Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails using an exponential
* backoff. Notice that this [[Flow]] will not restart on completion of the wrapped flow.
*
* This [[Flow]] will not emit any failure
* The failures by the wrapped [[Flow]] will be handled by
* restarting the wrapping [[Flow]] as long as maxRestarts is not reached.
* Any termination signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's
* running, and then the [[Flow]] will be allowed to terminate without being restarted.
*
* The restart process is inherently lossy, since there is no coordination between cancelling and the sending of
* messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated,
* and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure.
*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
* In order to skip this additional delay pass in `0`.
* @param maxRestarts the amount of restarts is capped to this amount within a time frame of minBackoff.
* Passing `0` will cause no restarts and a negative number will not cap the amount of restarts.
* @param flowFactory A factory for producing the [[Flow]] to wrap.
*/
def onFailuresWithBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)(flowFactory: () Flow[In, Out, _]): Flow[In, Out, NotUsed] = {
Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor, onlyOnFailures = true, maxRestarts))
}

}

private final class RestartWithBackoffFlow[In, Out](
flowFactory: () Flow[In, Out, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int) extends GraphStage[FlowShape[In, Out]] { self
flowFactory: () Flow[In, Out, _],
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
onlyOnFailures: Boolean,
maxRestarts: Int) extends GraphStage[FlowShape[In, Out]] { self

val in = Inlet[In]("RestartWithBackoffFlow.in")
val out = Outlet[Out]("RestartWithBackoffFlow.out")

override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures = false, maxRestarts) {
"Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {

var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None

Expand Down Expand Up @@ -439,7 +471,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
}
}
override def onDownstreamFinish() = {
if (finishing || maxRestartsReached()) {
if (finishing || maxRestartsReached() || onlyOnFailures) {
cancel(in)
} else {
log.debug("Graph in finished")
Expand Down

0 comments on commit e734347

Please sign in to comment.