Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Convenience methods to be used by unit tests and during development.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public <R> CompletableFuture<R> retry(
options.getMaximumInterval(),
options.getBackoffCoefficient());
CompletableFuture<R> resultCF = new CompletableFuture<>();
retry(options, function, 1, startTime, throttler, resultCF);
retry(options, function, 1, startTime, null, throttler, resultCF);
return resultCF;
}

Expand All @@ -56,6 +56,7 @@ private <R> void retry(
Supplier<CompletableFuture<R>> function,
int attempt,
long startTime,
StatusRuntimeException previousException,
AsyncBackoffThrottler throttler,
CompletableFuture<R> resultCF) {
options.validate();
Expand All @@ -74,7 +75,16 @@ private <R> void retry(
// CompletableFuture even if it's a failed one.
// But if this happens - process the same way as it would be an exception from
// completable future
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
// Do not retry if it's not StatusRuntimeException
failOrRetry(
options,
function,
attempt,
startTime,
throttler,
previousException,
e,
resultCF);
return;
}
if (result == null) {
Expand All @@ -89,7 +99,15 @@ private <R> void retry(
resultCF.complete(r);
} else {
throttler.failure();
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
failOrRetry(
options,
function,
attempt,
startTime,
throttler,
previousException,
e,
resultCF);
}
});
});
Expand All @@ -101,37 +119,39 @@ private <R> void failOrRetry(
int attempt,
long startTime,
AsyncBackoffThrottler throttler,
Throwable lastException,
StatusRuntimeException previousException,
Throwable currentException,
CompletableFuture<R> resultCF) {

// If exception is thrown from CompletionStage/CompletableFuture methods like compose or handle
// - it gets wrapped into CompletionException, so here we need to unwrap it. We can get not
// wrapped raw exception here too if CompletableFuture was explicitly filled with this exception
// using CompletableFuture.completeExceptionally
lastException = unwrapCompletionException(lastException);
currentException = unwrapCompletionException(currentException);

// Do not retry if it's not StatusRuntimeException
if (!(lastException instanceof StatusRuntimeException)) {
resultCF.completeExceptionally(lastException);
if (!(currentException instanceof StatusRuntimeException)) {
resultCF.completeExceptionally(currentException);
return;
}

StatusRuntimeException exception = (StatusRuntimeException) lastException;
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) currentException;

RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(exception, options);
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(
statusRuntimeException, previousException, options);
if (finalException != null) {
resultCF.completeExceptionally(finalException);
return;
}

if (GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
resultCF.completeExceptionally(exception);
resultCF.completeExceptionally(statusRuntimeException);
return;
}

log.debug("Retrying after failure", lastException);
retry(options, function, attempt + 1, startTime, throttler, resultCF);
log.debug("Retrying after failure", currentException);
retry(options, function, attempt + 1, startTime, statusRuntimeException, throttler, resultCF);
}

private static Throwable unwrapCompletionException(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,27 @@
import io.temporal.serviceclient.StatusUtils;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class GrpcRetryerUtils {
/**
* This method encapsulates the logic if {@code StatusRuntimeException exception} is retryable or
* not.
*
Comment on lines 34 to 35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth mentioning here that in the face of DEADLINE_EXCEEDED errors, we will attempt to return original error

* @param exception exception to analyze
* @param currentException exception to analyze
* @param previousException previous exception happened before this one, {@code null} if {@code
* currentException} is the first exception in the chain
* @param options retry options
* @return null if the {@code exception} can be retried, a final exception to throw in the
* external code otherwise
* external code otherwise. In case {@code previousException} is {@link
* io.grpc.Status#DEADLINE_EXCEEDED}, we will prefer the previous exception if it's present.
*/
static @Nullable RuntimeException createFinalExceptionIfNotRetryable(
StatusRuntimeException exception, RpcRetryOptions options) {
Status.Code code = exception.getStatus().getCode();
@Nonnull StatusRuntimeException currentException,
@Nullable StatusRuntimeException previousException,
@Nonnull RpcRetryOptions options) {
Status.Code code = currentException.getStatus().getCode();

switch (code) {
// CANCELLED and DEADLINE_EXCEEDED usually considered non-retryable in GRPC world, for
Expand All @@ -48,13 +54,13 @@ class GrpcRetryerUtils {
case CANCELLED:
return new CancellationException();
case DEADLINE_EXCEEDED:
return exception;
return previousException != null ? previousException : currentException;
default:
for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) {
if (pair.getCode() == code
&& (pair.getDetailsClass() == null
|| StatusUtils.hasFailure(exception, pair.getDetailsClass()))) {
return exception;
|| StatusUtils.hasFailure(currentException, pair.getDetailsClass()))) {
return currentException;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public <R, T extends Throwable> R retry(
options.getMaximumInterval(),
options.getBackoffCoefficient());

Exception lastException = null;
StatusRuntimeException lastException = null;
while (!GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
attempt++;

Expand All @@ -63,12 +63,12 @@ public <R, T extends Throwable> R retry(
Thread.currentThread().interrupt();
throw new CancellationException();
} catch (StatusRuntimeException e) {
lastException = e;
RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, options);
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(e, lastException, options);
if (finalException != null) {
throw finalException;
}
lastException = e;
}
// No catch block for any other exceptions because we don't retry them, we pass them through.
// It's designed this way because it's GrpcRetryer, not general purpose retryer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,37 @@ public void testDeadlineExceededException() throws InterruptedException {

assertEquals("If the exception is DEADLINE_EXCEEDED, we shouldn't retry", 1, attempts.get());
}

@Test
public void testDeadlineExceededAfterAnotherException() throws InterruptedException {
RpcRetryOptions options =
RpcRetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(10))
.setMaximumInterval(Duration.ofMillis(100))
.validateBuildWithDefaults();
long start = System.currentTimeMillis();
final AtomicInteger attempts = new AtomicInteger();
try {
DEFAULT_ASYNC_RETRYER
.retry(
options,
() -> {
CompletableFuture<?> future = new CompletableFuture<>();
future.completeExceptionally(
new StatusRuntimeException(
attempts.incrementAndGet() > 1
? Status.fromCode(Status.Code.DEADLINE_EXCEEDED)
: Status.fromCode(Status.Code.DATA_LOSS)));
return future;
})
.get();
fail("unreachable");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof StatusRuntimeException);
assertEquals(
"We should get a previous exception in case of DEADLINE_EXCEEDED",
Status.Code.DATA_LOSS,
((StatusRuntimeException) e.getCause()).getStatus().getCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,32 @@ public void testDeadlineExceededException() {

assertEquals("If the exception is DEADLINE_EXCEEDED - we shouldn't retry", 1, attempts.get());
}

@Test
public void testDeadlineExceededAfterAnotherException() {
RpcRetryOptions options =
RpcRetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(10))
.setMaximumInterval(Duration.ofMillis(100))
.validateBuildWithDefaults();
final AtomicInteger attempts = new AtomicInteger();
try {
DEFAULT_SYNC_RETRYER.retry(
options,
() -> {
if (attempts.incrementAndGet() > 1) {
throw new StatusRuntimeException(Status.fromCode(Status.Code.DEADLINE_EXCEEDED));
} else {
throw new StatusRuntimeException(Status.fromCode(Status.Code.DATA_LOSS));
}
});
fail("unreachable");
} catch (Exception e) {
assertTrue(e instanceof StatusRuntimeException);
assertEquals(
"We should get a previous exception in case of DEADLINE_EXCEEDED",
Status.Code.DATA_LOSS,
((StatusRuntimeException) e).getStatus().getCode());
}
}
}