Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uni.onFailure().withBackoff().retry() occasionally stalls #1388

Closed
vladykin opened this issue Sep 29, 2023 · 8 comments · Fixed by #1448
Closed

Uni.onFailure().withBackoff().retry() occasionally stalls #1388

vladykin opened this issue Sep 29, 2023 · 8 comments · Fixed by #1448
Labels
bug Something isn't working
Milestone

Comments

@vladykin
Copy link

vladykin commented Sep 29, 2023

Context

One of our unit tests became flaky after we upgraded our app from Quarkus 2.16 to Quarkus 3.2.
Long story short, we tracked this down to Mutiny. With Mutiny up to 2.0.0-milestone1 (inclusive) the test reliably passes. Starting with Mutiny 2.0.0-milestone2 the test sometimes fails.

The test uses Uni.onFailure().withBackoff().retry() to retry a gRPC call until the target service comes up.
Sometimes the retry() doesn't trigger a new upstream subscription as expected, neither it emits any more downstream events.

The only non-trivial change in Mutiny 2.0.0-milestone2 is this one: #997.
It changes the behavior of MultiFlatten.concatenate(), which is used internally in ExponentialBackoff.randomExponentialBackoffFunction(). Looks like a root cause of our problem.

Reproducer

https://github.com/vladykin/mutiny-retry-issue-reproducer

mvn test -Dmutiny.version=... always passes with Mutiny up to 2.0.0-milestone1 (inclusive), but sometimes fails with Mutiny starting 2.0.0-milestone2 and up to the latest 2.5.1.

Use something like

for i in `seq 1 300`; do
  mvn test -Dmutiny.version=... || exit 1;
done

to run it in a loop.

Additional details

Example output from a failed run:

[--> Request #14.0 | onSubscribe()
[--> Request #3.0 | onSubscribe()
[--> Request #26.0 | onSubscribe()
[--> Request #31.0 | onSubscribe()
[--> Request #16.0 | onSubscribe()
[--> Request #28.0 | onSubscribe()
[--> Request #30.0 | onSubscribe()
[--> Request #10.0 | onSubscribe()
[--> Request #21.0 | onSubscribe()
[--> Request #18.0 | onSubscribe()
[--> Request #4.0 | onSubscribe()
[--> Request #9.0 | onSubscribe()
[--> Request #8.0 | onSubscribe()
[--> Request #7.0 | onSubscribe()
[--> Request #17.0 | onSubscribe()
[--> Request #25.0 | onSubscribe()
[--> Request #24.0 | onSubscribe()
[--> Request #2.0 | onSubscribe()
[--> Request #20.0 | onSubscribe()
[--> Request #13.0 | onSubscribe()
[--> Request #15.0 | onSubscribe()
[--> Request #6.0 | onSubscribe()
[--> Request #11.0 | onSubscribe()
[--> Request #27.0 | onSubscribe()
[--> Request #5.0 | onSubscribe()
[--> Request #29.0 | onSubscribe()
[--> Request #19.0 | onSubscribe()
[--> Request #23.0 | onSubscribe()
[--> Request #12.0 | onSubscribe()
[--> Request #0.0 | onSubscribe()
[--> Request #22.0 | onSubscribe()
[--> Request #1.0 | onSubscribe()
[--> Request #26.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #15.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #5.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #25.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #6.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #23.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #28.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #30.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #18.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #10.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #13.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #22.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #17.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #20.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #27.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #21.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #31.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #2.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #12.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #11.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #29.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #16.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #1.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #14.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #19.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #8.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #4.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #3.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #9.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #7.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #24.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #0.0 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #10.1 | onSubscribe()
[--> Request #27.1 | onSubscribe()
[--> Request #27.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #10.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #28.1 | onSubscribe()
[--> Request #28.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #30.1 | onSubscribe()
[--> Request #30.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #5.1 | onSubscribe()
[--> Request #5.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #17.1 | onSubscribe()
[--> Request #17.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #25.1 | onSubscribe()
[--> Request #25.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #14.1 | onSubscribe()
[--> Request #14.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #29.1 | onSubscribe()
[--> Request #29.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #31.1 | onSubscribe()
[--> Request #31.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #9.1 | onSubscribe()
[--> Request #9.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #3.1 | onSubscribe()
[--> Request #3.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #21.1 | onSubscribe()
[--> Request #21.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #22.1 | onSubscribe()
[--> Request #22.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #0.1 | onSubscribe()
[--> Request #0.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #4.1 | onSubscribe()
[--> Request #4.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #26.1 | onSubscribe()
[--> Request #26.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #23.1 | onSubscribe()
[--> Request #23.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #1.1 | onSubscribe()
[--> Request #1.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #16.1 | onSubscribe()
[--> Request #16.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #11.1 | onSubscribe()
[--> Request #11.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #18.1 | onSubscribe()
[--> Request #18.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #2.1 | onSubscribe()
[--> Request #2.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #20.1 | onSubscribe()
[--> Request #20.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #6.1 | onSubscribe()
[--> Request #6.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #7.1 | onSubscribe()
[--> Request #7.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #15.1 | onSubscribe()
[--> Request #15.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #19.1 | onSubscribe()
[--> Request #19.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #8.1 | onSubscribe()
[--> Request #8.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #12.1 | onSubscribe()
[--> Request #12.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #24.1 | onSubscribe()
[--> Request #24.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #13.1 | onSubscribe()
[--> Request #13.1 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #25.2 | onSubscribe()
[--> Request #25.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #10.2 | onSubscribe()
[--> Request #10.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #0.2 | onSubscribe()
[--> Request #0.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #23.2 | onSubscribe()
[--> Request #23.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #27.2 | onSubscribe()
[--> Request #27.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #22.2 | onSubscribe()
[--> Request #22.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #5.2 | onSubscribe()
[--> Request #5.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #13.2 | onSubscribe()
[--> Request #13.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #17.2 | onSubscribe()
[--> Request #17.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #21.2 | onSubscribe()
[--> Request #21.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #26.2 | onSubscribe()
[--> Request #26.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #24.2 | onSubscribe()
[--> Request #24.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #3.2 | onSubscribe()
[--> Request #3.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
[--> Request #15.2 | onSubscribe()
[--> Request #15.2 | onFailure(io.grpc.StatusRuntimeException("UNAVAILABLE: io exception"))
Server started on port 50051
[--> Request #7.2 | onSubscribe()
[--> Request #29.2 | onSubscribe()
[--> Request #1.2 | onSubscribe()
[--> Request #9.2 | onSubscribe()
[--> Request #31.2 | onSubscribe()
[--> Request #9.2 | onItem(greeting: "Hello"
)
[--> Request #29.2 | onItem(greeting: "Hello"
)
[--> Request #31.2 | onItem(greeting: "Hello"
)
[--> Request #7.2 | onItem(greeting: "Hello"
)
[--> Request #1.2 | onItem(greeting: "Hello"
)
[--> Request #14.2 | onSubscribe()
[--> Request #14.2 | onItem(greeting: "Hello"
)
[--> Request #2.2 | onSubscribe()
[--> Request #30.2 | onSubscribe()
[--> Request #2.2 | onItem(greeting: "Hello"
)
[--> Request #30.2 | onItem(greeting: "Hello"
)
[--> Request #6.2 | onSubscribe()
[--> Request #6.2 | onItem(greeting: "Hello"
)
[--> Request #8.2 | onSubscribe()
[--> Request #8.2 | onItem(greeting: "Hello"
)
[--> Request #28.2 | onSubscribe()
[--> Request #28.2 | onItem(greeting: "Hello"
)
[--> Request #16.2 | onSubscribe()
[--> Request #16.2 | onItem(greeting: "Hello"
)
[--> Request #4.2 | onSubscribe()
[--> Request #4.2 | onItem(greeting: "Hello"
)
[--> Request #18.2 | onSubscribe()
[--> Request #18.2 | onItem(greeting: "Hello"
)
[--> Request #20.2 | onSubscribe()
[--> Request #20.2 | onItem(greeting: "Hello"
)
[--> Request #12.2 | onSubscribe()
[--> Request #12.2 | onItem(greeting: "Hello"
)
[--> Request #19.2 | onSubscribe()
[--> Request #19.2 | onItem(greeting: "Hello"
)
[--> Request #27.3 | onSubscribe()
[--> Request #27.3 | onItem(greeting: "Hello"
)
[--> Request #23.3 | onSubscribe()
[--> Request #23.3 | onItem(greeting: "Hello"
)
[--> Request #22.3 | onSubscribe()
[--> Request #22.3 | onItem(greeting: "Hello"
)
[--> Request #25.3 | onSubscribe()
[--> Request #25.3 | onItem(greeting: "Hello"
)
[--> Request #13.3 | onSubscribe()
[--> Request #13.3 | onItem(greeting: "Hello"
)
[--> Request #26.3 | onSubscribe()
[--> Request #26.3 | onItem(greeting: "Hello"
)
[--> Request #21.3 | onSubscribe()
[--> Request #21.3 | onItem(greeting: "Hello"
)
[--> Request #0.3 | onSubscribe()
[--> Request #0.3 | onItem(greeting: "Hello"
)
[--> Request #15.3 | onSubscribe()
[--> Request #15.3 | onItem(greeting: "Hello"
)
[--> Request #10.3 | onSubscribe()
[--> Request #10.3 | onItem(greeting: "Hello"
)
[--> Request #3.3 | onSubscribe()
[--> Request #3.3 | onItem(greeting: "Hello"
)
[--> Request #5.3 | onSubscribe()
[--> Request #5.3 | onItem(greeting: "Hello"
)
[--> Request #24.3 | onSubscribe()
[--> Request #24.3 | onItem(greeting: "Hello"
)
[--> Request #17.3 | onSubscribe()
[--> Request #17.3 | onItem(greeting: "Hello"
)


[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 24.915 s <<< FAILURE! - in com.random.test.DummyClientTest
[ERROR] com.random.test.DummyClientTest.client_applies_retry_policy_when_server_is_unavailable  Time elapsed: 24.718 s  <<< FAILURE!
java.lang.AssertionError:
[Request #11]
Expecting
  <FutureTask[Incomplete]>
to be completed within 20L Seconds.

Note that for Request #11 there is an onFailure, but no subsequent onSubscribe.

@vladykin vladykin changed the title Uni.onFailure().withBackoff().retry() occasionally doesn't work Uni.onFailure().withBackoff().retry() occasionally stalls Sep 29, 2023
@jponge
Copy link
Member

jponge commented Oct 1, 2023

Thanks for your report, we'll have a look in the coming days.

@jponge jponge added the bug Something isn't working label Oct 1, 2023
@jponge jponge added this to the 2.6.0 milestone Oct 1, 2023
@jponge jponge added this to Backlog in Mutiny development via automation Oct 1, 2023
@jponge
Copy link
Member

jponge commented Oct 2, 2023

/cc @ozangunalp

@ozangunalp
Copy link
Collaborator

I've looked briefly yesterday. I was able to reproduce with 2.0.0 and 2.3.1 and not with 2.0.0.milestone1 (before the non-prefetch concatmap) and 2.5.1 (latest). I'll continue to look today.

@jponge jponge moved this from Backlog to In progress in Mutiny development Oct 3, 2023
@ozangunalp
Copy link
Collaborator

I can also reproduce it with the latest version. I see that the issue is with the usage of noprefetch concatmap for the exponential backoff. However, I can't reproduce any hanging within the Mutiny retry tests.

@jponge I'll need some help with this one.

@jponge
Copy link
Member

jponge commented Oct 3, 2023

Sure, and thanks for the explorations, I'll schedule a call with you

@vladykin
Copy link
Author

@ozangunalp, @jponge, is there any update on this?

@jponge
Copy link
Member

jponge commented Oct 19, 2023

We need to get back to this. We had a discussion with @ozangunalp but we couldn't pinpoint any obvious culprit in the concatMap operator. We also wonder if the test itself couldn't be the problem, but again we need to find time to investigate.

@ozangunalp did a great job to have a reproducer branch: https://github.com/ozangunalp/smallrye-mutiny/tree/reproducer_retry_exponential_backoff

jponge added a commit that referenced this issue Nov 30, 2023
This fixes race conditions in concatMap and stream concatenation operators.

Refs: #1388
@jponge jponge linked a pull request Nov 30, 2023 that will close this issue
@jponge
Copy link
Member

jponge commented Nov 30, 2023

Note: test this issue against #1448

Mutiny development automation moved this from In progress to Done Nov 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
No open projects
Development

Successfully merging a pull request may close this issue.

3 participants