New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error in stage [RetryBidi] - "cannot push port twice" #575
Comments
Please see the code at https://github.com/paypal/squbs/blob/master/squbs-ext/src/main/scala/org/squbs/streams/RetryBidi.scala#L377 setHandler(in2, new InHandler {
override def onPush(): Unit = {
val (elem, context) = grab(in2)
if (isFailure(elem)) {
if (emitOrQueueFailure(context))
push(out2, (elem, context))
else if (isAvailable(out1)) pushIfReady()
if (retryRegistry.nonEmpty && !hasBeenPulled(in2))
pull(in2)
} else {
retryRegistry.get(uniqueId(context)) foreach(entry => {
retryDelayQ.remove(entry._3)
retryRegistry.remove(uniqueId(entry._2))
})
push(out2, (elem, context))
}
} IMO, the problem happens when retries are exhausted. Because we push to downstream, but without getting a demand from downstream we immediately create demand on So, I think the demand creation should be inside if (emitOrQueueFailure(context)) push(out2, (elem, context))
else if (isAvailable(out1)) {
pushIfReady()
if (retryRegistry.nonEmpty && !hasBeenPulled(in2)) pull(in2)
} And also, I am not sure if we need |
It appear to me even in the case a retry is exhausted and element is emitted on out2.
We still need a |
Check if out2 is available befor push. Log if we can push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element (Cherry-picked from commit dbc2b38)
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element (Cherry-picked from commit dbc2b38)
Check if out2 is available befor push. Log if we cant push to out2 on any succesful/exhausted element (Cherry-picked from commit dbc2b38)
This failure occurs intermittently in RetryBid on 0.9.X
Error in stage [RetryBidi]: requirement failed: Cannot push port (RetryBidi.out2) twice
java.lang.IllegalArgumentException: requirement failed: Cannot push port (RetryBidi.out2) twice
at scala.Predef$.require(Predef.scala:224)
at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:463)
at org.squbs.streams.RetryBidi$$anon$1$$anon$4.onPush(RetryBidi.scala:286)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:649)
The text was updated successfully, but these errors were encountered: