From e94fe2c61eb4b6d2f6aaab66675010cf55223bad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Wed, 6 Jul 2022 14:33:30 +0200 Subject: [PATCH] Rework guide on Retry with Transient Errors (#3093) This commit reworks the reference guide section on Retry with Transient Errors, improving the explanation and adding more details in a separate section. The code snippet has also been changed to reflect a more practical use case, with the original artificial `Flux` split out in a secondary snippet. Supersedes #3079. --- docs/asciidoc/coreFeatures.adoc | 80 ++++++++++++------- .../test/java/reactor/guide/GuideTests.java | 44 +++++----- 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/docs/asciidoc/coreFeatures.adoc b/docs/asciidoc/coreFeatures.adoc index d047a006e7..5d626e73e4 100644 --- a/docs/asciidoc/coreFeatures.adoc +++ b/docs/asciidoc/coreFeatures.adoc @@ -945,45 +945,69 @@ The core-provided `Retry` helpers, `RetrySpec` and `RetryBackoffSpec`, both allo By default, `Exceptions.retryExhausted(...)` is used, which can be distinguished with `Exceptions.isRetryExhausted(Throwable)` - activating the handling of _transient errors_ (see below) -Transient error handling in the `Retry` specs makes use of `RetrySignal#totalRetriesInARow()`: to check whether to retry or not and to compute the retry delays, the index used is an alternative one that is reset to 0 each time an `onNext` is emitted. -This has the consequence that if a re-subscribed source generates some data before failing again, previous failures don't count toward the maximum number of retry attempts. -In the case of exponential backoff strategy, this also means that the next attempt will be back to using the minimum `Duration` backoff instead of a longer one. -This can be especially useful for long-lived sources that see sporadic bursts of errors (or _transient_ errors), where each burst should be retried with its own backoff. +===== Retrying with transient errors +Some long-lived sources may see sporadic bursts of errors followed by longer periods of time during which all is running smoothly. +This documentation refers to this pattern of errors as _transient errors_. + +In such cases, it would be desirable to deal with each burst in isolation, so that the next burst doesn't inherit the retry state from the previous one. +For instance, with an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff `Duration` instead of an ever-growing one. + +The `RetrySignal` interface, which represents `retryWhen` state, has a `totalRetriesInARow()` value which can be used for this. +Instead of the usual monotonically-increasing `totalRetries()` index, this secondary index is reset to 0 each time an error +is recovered from by the retry (ie. when a retry attempt results in an incoming `onNext` instead of an `onError` again). + +When setting the `transientErrors(boolean)` configuration parameter to `true` in the `RetrySpec` or `RetryBackoffSpec`, the resulting strategy makes use of that `totalRetriesInARow()` index, effectively dealing with _transient errors_. +These specs compute the retry pattern from the index, so in effect all other configuration parameters of the spec apply to each burst of error independently. ==== [source,java] ---- AtomicInteger errorCount = new AtomicInteger(); // <1> -AtomicInteger transientHelper = new AtomicInteger(); -Flux transientFlux = Flux.generate(sink -> { - int i = transientHelper.getAndIncrement(); - if (i == 10) { // <2> - sink.next(i); - sink.complete(); - } - else if (i % 3 == 0) { // <3> - sink.next(i); - } - else { - sink.error(new IllegalStateException("Transient error at " + i)); // <4> - } -}) - .doOnError(e -> errorCount.incrementAndGet()); - -transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5> +Flux transientFlux = httpRequest.get() // <2> + .doOnError(e -> errorCount.incrementAndGet()); + +transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <3> .blockLast(); -assertThat(errorCount).hasValue(6); // <6> +assertThat(errorCount).hasValue(6); // <4> ---- -<1> We will count the number of errors in the retried sequence. -<2> We `generate` a source that has bursts of errors. It will successfully complete when the counter reaches 10. +<1> We will count the number of errors in the retried sequence for illustration. +<2> We assume a http request source, eg. a streaming endpoint that will sometimes fail two times in a row, then recover. +<3> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode. +<4> At the end, a valid response is achieved and the `transientFlux` successfully completes after `6` attempts have been registered in `errorCount`. +==== + +Without the `transientErrors(true)`, the configured maximum attempt of `2` would be exceeded by the second burst and the whole sequence would have ultimately failed. + +[NOTE] +==== +If you want to locally try this without an actual http remote endpoint, you can implement a pseudo `httpRequest` method as a `Supplier`, as follows: + +===== +[source,java] +---- +final AtomicInteger transientHelper = new AtomicInteger(); +Supplier> httpRequest = () -> + Flux.generate(sink -> { // <1> + int i = transientHelper.getAndIncrement(); + if (i == 10) { // <2> + sink.next(i); + sink.complete(); + } + else if (i % 3 == 0) { // <3> + sink.next(i); + } + else { + sink.error(new IllegalStateException("Transient error at " + i)); // <4> + } + }); +---- +<1> We `generate` a source that has bursts of errors. +<2> It will successfully complete when the counter reaches 10. <3> If the `transientHelper` atomic is at a multiple of `3`, we emit `onNext` and thus end the current burst. <4> In other cases we emit an `onError`. That's 2 out of 3 times, so bursts of 2 `onError` interrupted by 1 `onNext`. -<5> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode. -<6> At the end, the sequence reaches `onNext(10)` and completes, after `6` errors have been registered in `errorCount`. +===== ==== -Without the `transientErrors(true)`, the configured maximum attempt of `2` would be reached by the second burst and the sequence would fail after having emitted `onNext(3)`. - === Handling Exceptions in Operators or Functions In general, all operators can themselves contain code that potentially trigger an diff --git a/reactor-core/src/test/java/reactor/guide/GuideTests.java b/reactor-core/src/test/java/reactor/guide/GuideTests.java index eb9fc6c134..e1943cecf1 100644 --- a/reactor-core/src/test/java/reactor/guide/GuideTests.java +++ b/reactor-core/src/test/java/reactor/guide/GuideTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -734,26 +736,30 @@ public void errorHandlingRetryWhenExponential() { @Test public void errorHandlingRetryWhenTransient() { + final AtomicInteger transientHelper = new AtomicInteger(); + Supplier> httpRequest = () -> + Flux.generate(sink -> { // <1> + int i = transientHelper.getAndIncrement(); + if (i == 10) { // <2> + sink.next(i); + sink.complete(); + } + else if (i % 3 == 0) { // <3> + sink.next(i); + } + else { + sink.error(new IllegalStateException("Transient error at " + i)); // <4> + } + }); + // NB: in the guide, the executable transientFlux above is shown second, separately from the simplified snippet below + AtomicInteger errorCount = new AtomicInteger(); // <1> - AtomicInteger transientHelper = new AtomicInteger(); - Flux transientFlux = Flux.generate(sink -> { - int i = transientHelper.getAndIncrement(); - if (i == 10) { // <2> - sink.next(i); - sink.complete(); - } - else if (i % 3 == 0) { // <3> - sink.next(i); - } - else { - sink.error(new IllegalStateException("Transient error at " + i)); // <4> - } - }) - .doOnError(e -> errorCount.incrementAndGet()); + Flux transientFlux = httpRequest.get() // <2> + .doOnError(e -> errorCount.incrementAndGet()); -transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5> - .blockLast(); -assertThat(errorCount).hasValue(6); // <6> + transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <3> + .blockLast(); + assertThat(errorCount).hasValue(6); // <4> transientHelper.set(0); transientFlux.retryWhen(Retry.max(2).transientErrors(true))