diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/GrpcRetryer.java b/temporal-sdk/src/main/java/io/temporal/internal/common/GrpcRetryer.java index 5bc4523f3e..fef1e9d26d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/GrpcRetryer.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/GrpcRetryer.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -163,25 +164,28 @@ private static CompletableFuture retryWithResultAsync( .thenCompose( (ignore) -> { // try-catch is because get() call might throw. + CompletableFuture result; try { - CompletableFuture result = function.get(); - if (result == null) { - return CompletableFuture.completedFuture(null); - } - return result.handle( - (r, e) -> { - if (e == null) { - throttler.success(); - return r; - } else { - throttler.failure(); - throw CheckedExceptionWrapper.wrap(e); - } - }); + result = function.get(); } catch (Throwable e) { throttler.failure(); throw CheckedExceptionWrapper.wrap(e); } + + if (result == null) { + return CompletableFuture.completedFuture(null); + } + + return result.handle( + (r, e) -> { + if (e == null) { + throttler.success(); + return r; + } else { + throttler.failure(); + throw CheckedExceptionWrapper.wrap(e); + } + }); }) .handle((r, e) -> failOrRetry(options, function, attempt, startTime, throttler, r, e)) .thenCompose( @@ -205,9 +209,18 @@ private static ValueExceptionPair failOrRetry( if (e == null) { return new ValueExceptionPair<>(CompletableFuture.completedFuture(r), null); } + //If exception is thrown from CompletionStage/CompletableFuture methods like compose or handle - + //it gets wrapped into CompletionException, so here we need to unwrap it. + // We can get not wrapped raw exception here too if CompletableFuture was explicitly + //filled with this exception using CompletableFuture.completeExceptionally + if (e instanceof CompletionException) { + e = e.getCause(); + } + // Do not retry if it's not StatusRuntimeException if (!(e instanceof StatusRuntimeException)) { return new ValueExceptionPair<>(null, e); } + StatusRuntimeException exception = (StatusRuntimeException) e; long elapsed = System.currentTimeMillis() - startTime; for (RpcRetryOptions.DoNotRetryPair pair : options.getDoNotRetry()) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/Retryer.java b/temporal-sdk/src/main/java/io/temporal/internal/common/Retryer.java deleted file mode 100644 index 3381a4d161..0000000000 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/Retryer.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License is - * located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package io.temporal.internal.common; - -import static io.temporal.internal.common.CheckedExceptionWrapper.unwrap; - -import io.temporal.common.RetryOptions; -import io.temporal.failure.ApplicationFailure; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class Retryer { - - public interface RetryableProc { - - void apply() throws E; - } - - public interface RetryableFunc { - - R apply() throws E; - } - - /** - * Used to pass failure to a {@link java.util.concurrent.CompletionStage#thenCompose(Function)} - * which doesn't include exception parameter like {@link - * java.util.concurrent.CompletionStage#handle(BiFunction)} does. - */ - private static class ValueExceptionPair { - - private final CompletableFuture value; - private final Throwable exception; - - public ValueExceptionPair(CompletableFuture value, Throwable exception) { - this.value = value; - this.exception = exception; - } - - public CompletableFuture getValue() { - return value; - } - - public Throwable getException() { - return exception; - } - } - - private static final Logger log = LoggerFactory.getLogger(Retryer.class); - - public static void retry( - RetryOptions options, Optional expiration, RetryableProc r) throws T { - retryWithResult( - options, - expiration, - () -> { - r.apply(); - return null; - }); - } - - public static R retryWithResult( - RetryOptions options, Optional expiration, RetryableFunc r) throws T { - int attempt = 0; - long startTime = System.currentTimeMillis(); - BackoffThrottler throttler = - new BackoffThrottler( - options.getInitialInterval(), - options.getMaximumInterval(), - options.getBackoffCoefficient()); - do { - try { - attempt++; - throttler.throttle(); - R result = r.apply(); - throttler.success(); - return result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (Exception e) { - throttler.failure(); - if (options.getDoNotRetry() != null) { - String type; - if (e instanceof ApplicationFailure) { - type = ((ApplicationFailure) e).getType(); - } else { - type = e.getClass().getName(); - } - for (String exceptionToNotRetry : options.getDoNotRetry()) { - if (exceptionToNotRetry.equals(type)) { - rethrow(e); - } - } - } - long elapsed = System.currentTimeMillis() - startTime; - int maxAttempts = options.getMaximumAttempts(); - if ((maxAttempts > 0 && attempt >= maxAttempts) - || (expiration.isPresent() && elapsed >= expiration.get().toMillis())) { - rethrow(e); - } - log.warn("Retrying after failure", e); - } - } while (true); - } - - public static CompletableFuture retryWithResultAsync( - RetryOptions options, - Optional expiration, - Supplier> function) { - int attempt = 0; - long startTime = System.currentTimeMillis(); - AsyncBackoffThrottler throttler = - new AsyncBackoffThrottler( - options.getInitialInterval(), - options.getMaximumInterval(), - options.getBackoffCoefficient()); - // Need this to unwrap checked exception. - CompletableFuture unwrappedExceptionResult = new CompletableFuture<>(); - CompletableFuture result = - retryWithResultAsync(options, expiration, function, attempt + 1, startTime, throttler); - @SuppressWarnings({"FutureReturnValueIgnored", "unused"}) - CompletableFuture ignored = - result.handle( - (r, e) -> { - if (e == null) { - unwrappedExceptionResult.complete(r); - } else { - unwrappedExceptionResult.completeExceptionally(unwrap(e)); - } - return null; - }); - return unwrappedExceptionResult; - } - - private static CompletableFuture retryWithResultAsync( - RetryOptions options, - Optional expiration, - Supplier> function, - int attempt, - long startTime, - AsyncBackoffThrottler throttler) { - return throttler - .throttle() - .thenCompose( - (ignore) -> { - // try-catch is because get() call might throw. - try { - CompletableFuture result = function.get(); - if (result == null) { - return CompletableFuture.completedFuture(null); - } - return result.handle( - (r, e) -> { - if (e == null) { - throttler.success(); - return r; - } else { - throttler.failure(); - throw CheckedExceptionWrapper.wrap(e); - } - }); - } catch (Throwable e) { - throttler.failure(); - throw CheckedExceptionWrapper.wrap(e); - } - }) - .handle( - (r, e) -> - failOrRetry( - options.toBuilder().validateBuildWithDefaults(), - expiration, - function, - attempt, - startTime, - throttler, - r, - e)) - .thenCompose( - (pair) -> { - if (pair.getException() != null) { - throw CheckedExceptionWrapper.wrap(pair.getException()); - } - return pair.getValue(); - }); - } - - /** Using {@link ValueExceptionPair} as future#thenCompose doesn't include exception parameter. */ - private static ValueExceptionPair failOrRetry( - RetryOptions options, - Optional expiration, - Supplier> function, - int attempt, - long startTime, - AsyncBackoffThrottler throttler, - R r, - Throwable e) { - if (e == null) { - return new ValueExceptionPair<>(CompletableFuture.completedFuture(r), null); - } - if (e instanceof CompletionException) { - e = e.getCause(); - } - // Do not retry Error - if (e instanceof Error) { - return new ValueExceptionPair<>(null, e); - } - e = unwrap(e); - long elapsed = System.currentTimeMillis() - startTime; - if (options.getDoNotRetry() != null) { - String type; - if (e instanceof ApplicationFailure) { - type = ((ApplicationFailure) e).getType(); - } else { - type = e.getClass().getName(); - } - for (String exceptionToNotRetry : options.getDoNotRetry()) { - if (exceptionToNotRetry.equals(type)) { - return new ValueExceptionPair<>(null, e); - } - } - } - int maxAttempts = options.getMaximumAttempts(); - if ((maxAttempts > 0 && attempt >= maxAttempts) - || (expiration.isPresent() && elapsed >= expiration.get().toMillis())) { - return new ValueExceptionPair<>(null, e); - } - log.debug("Retrying after failure", e); - CompletableFuture next = - retryWithResultAsync(options, expiration, function, attempt + 1, startTime, throttler); - return new ValueExceptionPair<>(next, null); - } - - private static void rethrow(Exception e) throws T { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else { - @SuppressWarnings("unchecked") - T toRethrow = (T) e; - throw toRethrow; - } - } - - /** Prohibits instantiation. */ - private Retryer() {} -} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/GrpcRetryerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/GrpcRetryerTest.java new file mode 100644 index 0000000000..48110f051a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/GrpcRetryerTest.java @@ -0,0 +1,279 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.internal.common; + +import static org.junit.Assert.*; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.serviceclient.RpcRetryOptions; +import java.time.Duration; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Test; + +public class GrpcRetryerTest { + + @Test + public void testExpiration() { + final Status.Code STATUS_CODE = Status.Code.DATA_LOSS; + + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .setExpiration(Duration.ofMillis(500)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResult( + options, + () -> { + throw new StatusRuntimeException(Status.fromCode(STATUS_CODE)); + }); + fail("unreachable"); + } catch (Exception e) { + assertTrue(e instanceof StatusRuntimeException); + assertEquals(STATUS_CODE, ((StatusRuntimeException) e).getStatus().getCode()); + } + + assertTrue(System.currentTimeMillis() - start > 500); + } + + @Test + public void testDoNotRetry() { + final Status.Code STATUS_CODE = Status.Code.DATA_LOSS; + + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .addDoNotRetry(STATUS_CODE, null) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResult( + options, + () -> { + throw new StatusRuntimeException(Status.fromCode(STATUS_CODE)); + }); + fail("unreachable"); + } catch (Exception e) { + assertTrue(e instanceof StatusRuntimeException); + assertEquals(STATUS_CODE, ((StatusRuntimeException) e).getStatus().getCode()); + } + assertTrue( + "We should fail fast on exception that we specified to don't retry", + System.currentTimeMillis() - start < 10_000); + } + + @Test + public void testInterruptedException() { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResult( + options, + () -> { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + }); + fail("unreachable"); + } catch (Exception e) { + // I'm not sure it's a good idea to replace interrupted exception with CancellationException, + // especially when async version of this method doesn't shadow the InterruptedException + // @see #testInterruptedExceptionAsync + assertTrue(e instanceof CancellationException); + } + assertTrue( + "We should fail fast on InterruptedException", System.currentTimeMillis() - start < 10_000); + } + + @Test + public void testNotStatusRuntimeException() { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResult( + options, + () -> { + throw new IllegalArgumentException("simulated"); + }); + fail("unreachable"); + } catch (Exception e) { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("simulated", e.getMessage()); + } + assertTrue( + "If the exception is not StatusRuntimeException - we shouldn't retry", + System.currentTimeMillis() - start < 10_000); + } + + @Test + public void testExpirationAsync() throws InterruptedException { + final Status.Code STATUS_CODE = Status.Code.DATA_LOSS; + + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .setExpiration(Duration.ofMillis(500)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResultAsync( + options, + () -> { + throw new StatusRuntimeException(Status.fromCode(STATUS_CODE)); + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof StatusRuntimeException); + assertEquals(STATUS_CODE, ((StatusRuntimeException) e.getCause()).getStatus().getCode()); + } + + assertTrue(System.currentTimeMillis() - start > 500); + } + + @Test + public void testExpirationFutureAsync() throws InterruptedException { + final Status.Code STATUS_CODE = Status.Code.DATA_LOSS; + + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .setExpiration(Duration.ofMillis(500)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResultAsync( + options, + () -> { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally( + new StatusRuntimeException(Status.fromCode(STATUS_CODE))); + return result; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof StatusRuntimeException); + assertEquals(STATUS_CODE, ((StatusRuntimeException) e.getCause()).getStatus().getCode()); + } + assertTrue(System.currentTimeMillis() - start > 500); + } + + @Test + public void testDoNotRetryAsync() throws InterruptedException { + final Status.Code STATUS_CODE = Status.Code.DATA_LOSS; + + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .addDoNotRetry(STATUS_CODE, null) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResultAsync( + options, + () -> { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally( + new StatusRuntimeException(Status.fromCode(STATUS_CODE))); + return result; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof StatusRuntimeException); + assertEquals(STATUS_CODE, ((StatusRuntimeException) e.getCause()).getStatus().getCode()); + } + assertTrue( + "We should fail fast on exception that we specified to don't retry", + System.currentTimeMillis() - start < 10_000); + } + + @Test + public void testInterruptedExceptionAsync() throws InterruptedException { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResultAsync( + options, + () -> { + CompletableFuture result = new CompletableFuture<>(); + Thread.currentThread().interrupt(); + result.completeExceptionally(new InterruptedException("simulated")); + return result; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof CheckedExceptionWrapper); + assertTrue(e.getCause().getCause() instanceof InterruptedException); + assertEquals("simulated", e.getCause().getCause().getMessage()); + } + assertTrue( + "We should fail fast on InterruptedException", System.currentTimeMillis() - start < 10_000); + } + + @Test + public void testNotStatusRuntimeExceptionAsync() throws InterruptedException { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(10)) + .setMaximumInterval(Duration.ofMillis(100)) + .validateBuildWithDefaults(); + long start = System.currentTimeMillis(); + try { + GrpcRetryer.retryWithResultAsync( + options, + () -> { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(new IllegalArgumentException("simulated")); + return result; + }) + .get(); + fail("unreachable"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + assertEquals("simulated", e.getCause().getMessage()); + } + assertTrue( + "If the exception is not StatusRuntimeException - we shouldn't retry", + System.currentTimeMillis() - start < 10_000); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/RetryerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/RetryerTest.java deleted file mode 100644 index 43b8f6e995..0000000000 --- a/temporal-sdk/src/test/java/io/temporal/internal/common/RetryerTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License is - * located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package io.temporal.internal.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import io.temporal.common.RetryOptions; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import org.junit.Test; - -public class RetryerTest { - - @Test - public void testExpiration() throws InterruptedException { - RetryOptions options = - RetryOptions.newBuilder() - .setInitialInterval(Duration.ofMillis(10)) - .setMaximumInterval(Duration.ofMillis(100)) - .validateBuildWithDefaults(); - long start = System.currentTimeMillis(); - try { - Retryer.retryWithResultAsync( - options, - Optional.of(Duration.ofMillis(500)), - () -> { - throw new IllegalArgumentException("simulated"); - }) - .get(); - fail("unreachable"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof IllegalArgumentException); - assertEquals("simulated", e.getCause().getMessage()); - } - assertTrue(System.currentTimeMillis() - start > 500); - } - - @Test - public void testExpirationFuture() throws InterruptedException { - RetryOptions options = - RetryOptions.newBuilder() - .setInitialInterval(Duration.ofMillis(10)) - .setMaximumInterval(Duration.ofMillis(100)) - .validateBuildWithDefaults(); - long start = System.currentTimeMillis(); - try { - Retryer.retryWithResultAsync( - options, - Optional.of(Duration.ofMillis(500)), - () -> { - CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(new IllegalArgumentException("simulated")); - return result; - }) - .get(); - fail("unreachable"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof IllegalArgumentException); - assertEquals("simulated", e.getCause().getMessage()); - } - assertTrue(System.currentTimeMillis() - start > 500); - } - - @Test - public void testInterruptedException() throws InterruptedException { - RetryOptions options = - RetryOptions.newBuilder() - .setInitialInterval(Duration.ofMillis(10)) - .setMaximumInterval(Duration.ofMillis(100)) - .setDoNotRetry(InterruptedException.class.getName()) - .validateBuildWithDefaults(); - long start = System.currentTimeMillis(); - try { - Retryer.retryWithResultAsync( - options, - Optional.of(Duration.ofMillis(100)), - () -> { - CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(new InterruptedException("simulated")); - return result; - }) - .get(); - fail("unreachable"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof CheckedExceptionWrapper); - assertTrue(e.getCause().getCause() instanceof InterruptedException); - assertEquals("simulated", e.getCause().getCause().getMessage()); - } - assertTrue(System.currentTimeMillis() - start < 100000); - } -}