Skip to content

Commit

Permalink
Rework guide on Retry with Transient Errors (#3093)
Browse files Browse the repository at this point in the history
This commit reworks the reference guide section on Retry with Transient
Errors, improving the explanation and adding more details in a separate
section.

The code snippet has also been changed to reflect a more practical use
case, with the original artificial `Flux<Integer>` split out in a
secondary snippet.

Supersedes #3079.
  • Loading branch information
simonbasle committed Jul 6, 2022
1 parent a776230 commit e94fe2c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 47 deletions.
80 changes: 52 additions & 28 deletions docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -945,45 +945,69 @@ The core-provided `Retry` helpers, `RetrySpec` and `RetryBackoffSpec`, both allo
By default, `Exceptions.retryExhausted(...)` is used, which can be distinguished with `Exceptions.isRetryExhausted(Throwable)`
- activating the handling of _transient errors_ (see below)

Transient error handling in the `Retry` specs makes use of `RetrySignal#totalRetriesInARow()`: to check whether to retry or not and to compute the retry delays, the index used is an alternative one that is reset to 0 each time an `onNext` is emitted.
This has the consequence that if a re-subscribed source generates some data before failing again, previous failures don't count toward the maximum number of retry attempts.
In the case of exponential backoff strategy, this also means that the next attempt will be back to using the minimum `Duration` backoff instead of a longer one.
This can be especially useful for long-lived sources that see sporadic bursts of errors (or _transient_ errors), where each burst should be retried with its own backoff.
===== Retrying with transient errors
Some long-lived sources may see sporadic bursts of errors followed by longer periods of time during which all is running smoothly.
This documentation refers to this pattern of errors as _transient errors_.

In such cases, it would be desirable to deal with each burst in isolation, so that the next burst doesn't inherit the retry state from the previous one.
For instance, with an exponential backoff strategy each subsequent burst should delay retry attempts starting from the minimum backoff `Duration` instead of an ever-growing one.

The `RetrySignal` interface, which represents `retryWhen` state, has a `totalRetriesInARow()` value which can be used for this.
Instead of the usual monotonically-increasing `totalRetries()` index, this secondary index is reset to 0 each time an error
is recovered from by the retry (ie. when a retry attempt results in an incoming `onNext` instead of an `onError` again).

When setting the `transientErrors(boolean)` configuration parameter to `true` in the `RetrySpec` or `RetryBackoffSpec`, the resulting strategy makes use of that `totalRetriesInARow()` index, effectively dealing with _transient errors_.
These specs compute the retry pattern from the index, so in effect all other configuration parameters of the spec apply to each burst of error independently.

====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger(); // <1>
AtomicInteger transientHelper = new AtomicInteger();
Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
})
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5>
Flux<Integer> transientFlux = httpRequest.get() // <2>
.doOnError(e -> errorCount.incrementAndGet());
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <3>
.blockLast();
assertThat(errorCount).hasValue(6); // <6>
assertThat(errorCount).hasValue(6); // <4>
----
<1> We will count the number of errors in the retried sequence.
<2> We `generate` a source that has bursts of errors. It will successfully complete when the counter reaches 10.
<1> We will count the number of errors in the retried sequence for illustration.
<2> We assume a http request source, eg. a streaming endpoint that will sometimes fail two times in a row, then recover.
<3> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode.
<4> At the end, a valid response is achieved and the `transientFlux` successfully completes after `6` attempts have been registered in `errorCount`.
====

Without the `transientErrors(true)`, the configured maximum attempt of `2` would be exceeded by the second burst and the whole sequence would have ultimately failed.

[NOTE]
====
If you want to locally try this without an actual http remote endpoint, you can implement a pseudo `httpRequest` method as a `Supplier`, as follows:
=====
[source,java]
----
final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
Flux.generate(sink -> { // <1>
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
});
----
<1> We `generate` a source that has bursts of errors.
<2> It will successfully complete when the counter reaches 10.
<3> If the `transientHelper` atomic is at a multiple of `3`, we emit `onNext` and thus end the current burst.
<4> In other cases we emit an `onError`. That's 2 out of 3 times, so bursts of 2 `onError` interrupted by 1 `onNext`.
<5> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode.
<6> At the end, the sequence reaches `onNext(10)` and completes, after `6` errors have been registered in `errorCount`.
=====
====

Without the `transientErrors(true)`, the configured maximum attempt of `2` would be reached by the second burst and the sequence would fail after having emitted `onNext(3)`.

=== Handling Exceptions in Operators or Functions

In general, all operators can themselves contain code that potentially trigger an
Expand Down
44 changes: 25 additions & 19 deletions reactor-core/src/test/java/reactor/guide/GuideTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -734,26 +736,30 @@ public void errorHandlingRetryWhenExponential() {

@Test
public void errorHandlingRetryWhenTransient() {
final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
Flux.generate(sink -> { // <1>
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
});
// NB: in the guide, the executable transientFlux above is shown second, separately from the simplified snippet below

AtomicInteger errorCount = new AtomicInteger(); // <1>
AtomicInteger transientHelper = new AtomicInteger();
Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
})
.doOnError(e -> errorCount.incrementAndGet());
Flux<Integer> transientFlux = httpRequest.get() // <2>
.doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5>
.blockLast();
assertThat(errorCount).hasValue(6); // <6>
transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <3>
.blockLast();
assertThat(errorCount).hasValue(6); // <4>

transientHelper.set(0);
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
Expand Down

0 comments on commit e94fe2c

Please sign in to comment.