Skip to content

Commit

Permalink
paypal#575: Fix "cannot push port twice error"
Browse files Browse the repository at this point in the history
Check if out2 is available befor push.
Log if we cant push to out2 on any succesful/exhausted element
  • Loading branch information
Sherif Ebady committed Jan 16, 2018
1 parent 26c14f9 commit b36fad0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
12 changes: 8 additions & 4 deletions squbs-ext/src/main/scala/org/squbs/streams/RetryBidi.scala
Expand Up @@ -278,13 +278,17 @@ final class RetryBidi[In, Out, Context] private[streams](maxRetries: Long, uniqu
override def onPush(): Unit = {
val (elem, context) = grab(in2)
if (isFailure(elem)) {
if (incrementOrRemoveFailure(context)) push(out2, (elem, context))
else if (isAvailable(out1)) push(out1, readyToRetry.dequeue())
if (retryRegistry.nonEmpty) pull(in2)
if (incrementOrRemoveFailure(context)) {
if (isAvailable(out2)) push(out2, (elem, context))
else log.error("out2 is not available for push. Dropping exhausted element")
} else if (isAvailable(out1)) push(out1, readyToRetry.dequeue())
} else {
retryRegistry.remove(uniqueId(context))
push(out2, (elem, context))
if (isAvailable(out2)) push(out2, (elem, context))
else log.error("out2 is not available for push. Dropping successful element")
}
// continue propagating demand on in2 if we still have retries queue'd.
if (retryRegistry.nonEmpty && !hasBeenPulled(in2)) pull(in2)
}

override def onUpstreamFailure(ex: Throwable): Unit = if (readyToRetry.isEmpty) fail(out2, ex)
Expand Down
52 changes: 44 additions & 8 deletions squbs-ext/src/test/scala/org/squbs/streams/RetryBidiSpec.scala
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Attributes.inputBuffer
import akka.stream.{ActorMaterializer, BufferOverflowException, OverflowStrategy}
import akka.stream.{ActorMaterializer, BufferOverflowException, OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.TestKit
Expand Down Expand Up @@ -55,7 +55,7 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat

val expected = (Success("a"), 1) :: (Success("b"), 2) :: (Success("c"), 3) :: Nil
result map {
_ should contain theSameElementsAs expected
_ should contain theSameElementsInOrderAs expected
}
}

Expand All @@ -70,9 +70,9 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat
.via(RetryBidi[String, String, Long](3).join(flow))
.runWith(Sink.seq)

val expected = (Success("b"), 2) :: (Success("c"), 3) :: (failure, 1) :: Nil
val expected = (failure, 1) :: (Success("b"), 2) :: (Success("c"), 3) :: Nil
result map {
_ should contain theSameElementsAs expected
_ should contain theSameElementsInOrderAs expected
}
}

Expand All @@ -92,7 +92,7 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat

val expected = (Success("d"), 1) :: (failure, 2) :: (Success("f"), 3) :: Nil
result map {
_ should contain theSameElementsAs expected
_ should contain theSameElementsInOrderAs expected
}
}

Expand All @@ -112,7 +112,7 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat

val expected = (Success("d"), 1) :: (Success("e"), 2) :: (failure, 3) :: Nil
result map {
_ should contain theSameElementsAs expected
_ should contain theSameElementsInOrderAs expected
}
}

Expand Down Expand Up @@ -386,7 +386,7 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat
.via(retry.join(bottom))
.runWith(TestSink.probe)

sink.request(3).expectError(new BufferOverflowException("Retry buffer overflow for retry stage (max capacity was: 1)!"))
sink.request(3).expectError(BufferOverflowException("Retry buffer overflow for retry stage (max capacity was: 1)!"))
succeed
}

Expand All @@ -406,4 +406,40 @@ class RetryBidiSpec extends TestKit(ActorSystem("RetryBidiSpec")) with AsyncFlat
_ should contain theSameElementsAs expected
}
}
}

it should "retry when the joined flow buffers" in {
val bottom = Flow[(Long, Long)].map {
case (elem, ctx) => if (ctx % 2 == 0) (failure, ctx) else (Success(elem), ctx) // fail every even element
} buffer(5, OverflowStrategy.backpressure)

val retry = RetryBidi[Long, Long, Long](2)
val result = Source(1L to 100)
.map(x => (x, x))
.via(retry.join(bottom))
.runWith(Sink.seq)

val expected = 1 to 100 map (x => if (x % 2 == 0) (failure, x) else (Success(x), x))
result map {
_ should contain theSameElementsInOrderAs expected
}
}

it should "retry when the downstream flow throttle" in {
val bottom = Flow[(Long, Long)].map {
case (elem, ctx) => if (ctx % 2 == 0) (failure, ctx) else (Success(elem), ctx) // fail every even element
}
val retry = RetryBidi[Long, Long, Long](2)
val result = Source(1L to 100)
.map(x => (x, x))
.throttle(10, 10.millis, 10, ThrottleMode.shaping)
.via(retry.join(bottom))
.throttle(1, 10.millis, 10, ThrottleMode.shaping)
.runWith(Sink.seq)

val expected = 1 to 100 map (x => if (x % 2 == 0) (failure, x) else (Success(x), x))
result map {
_ should contain theSameElementsInOrderAs expected
}
}

}

0 comments on commit b36fad0

Please sign in to comment.