Skip to content

Commit

Permalink
Merge 396d4d8 into 2b984b1
Browse files Browse the repository at this point in the history
  • Loading branch information
RobWin committed Apr 3, 2019
2 parents 2b984b1 + 396d4d8 commit 0f91b98
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.OPEN;

import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.slf4j.Logger;
Expand Down Expand Up @@ -105,7 +107,19 @@ public boolean isCallPermitted() {

@Override
public void onError(long durationInNanos, Throwable throwable) {
if (circuitBreakerConfig.getRecordFailurePredicate().test(throwable)) {
// Handle the case if the completable future throw CompletionException wrapping the original exception
// where original exception is the the one to retry not the CompletionException.
Predicate<Throwable> recordFailurePredicate = circuitBreakerConfig.getRecordFailurePredicate();
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
handleThrowable(durationInNanos, recordFailurePredicate, cause);
}else{
handleThrowable(durationInNanos, recordFailurePredicate, throwable);
}
}

private void handleThrowable(long durationInNanos, Predicate<Throwable> recordFailurePredicate, Throwable throwable) {
if (recordFailurePredicate.test(throwable)) {
LOG.debug("CircuitBreaker '{}' recorded a failure:", name, throwable);
publishCircuitErrorEvent(name, durationInNanos, throwable);
stateReference.get().onError(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -798,6 +795,35 @@ public void shouldDecorateCompletionStageAndReturnWithExceptionAtAsyncStage() {
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(1);
}

@Test
public void shouldDecorateCompletionStageAndIgnoreWebServiceException() {
// Given
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.ignoreExceptions(WebServiceException.class)
.build();

CircuitBreaker circuitBreaker = CircuitBreaker.of("backendName", config);
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
// Given the HelloWorldService throws an exception
BDDMockito.given(helloWorldService.returnHelloWorld()).willThrow(new WebServiceException("BAM! At async stage"));

// When
Supplier<CompletionStage<String>> completionStageSupplier =
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);

CompletionStage<String> stringCompletionStage = circuitBreaker.executeCompletionStage(completionStageSupplier);

// Then the helloWorldService should be invoked 1 time
assertThatThrownBy(stringCompletionStage.toCompletableFuture()::get)
.isInstanceOf(ExecutionException.class).hasCause(new WebServiceException("BAM! At async stage"));
BDDMockito.then(helloWorldService).should(Mockito.times(1)).returnHelloWorld();

CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
// WebServiceException should be ignored
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(0);
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(0);
}

@Test
public void shouldChainDecoratedFunctions() {
// tag::shouldChainDecoratedFunctions[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,7 @@ class AsyncRetryBlock<T> implements Runnable {

@Override
public void run() {
final CompletionStage<T> stage;

try {
stage = supplier.get();
} catch (Throwable t) {
onError(t);
return;
}
final CompletionStage<T> stage = supplier.get();

stage.whenComplete((result, t) -> {
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,7 @@ class AsyncRetryBlock<T> implements Runnable {

@Override
public void run() {
final CompletionStage<T> stage;

try {
stage = supplier.get();
} catch (Exception t) {
onError(t);
return;
}
final CompletionStage<T> stage = supplier.get();

stage.whenComplete((result, t) -> {
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,25 @@ public void onSuccess() {

@Override
public long onError(Throwable throwable) {
// handle the case if the completable future throw CompletionException wrapping the original exception
// where original exception is the the one to retry not the CompletionException
// for more information about exception handling in completable future check for example :
//https://stackoverflow.com/questions/44409962/throwing-exception-from-completablefuture
if (throwable instanceof CompletionException && !exceptionPredicate.test(throwable)) {
if (!exceptionPredicate.test(throwable.getCause())) {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), throwable));
return -1;
}
return handleOnError(throwable.getCause());
// Handle the case if the completable future throw CompletionException wrapping the original exception
// where original exception is the the one to retry not the CompletionException.
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
return handleThrowable(cause);
}
else {
return handleThrowable(throwable);
}

}

private long handleThrowable(Throwable throwable) {
if (!exceptionPredicate.test(throwable)) {
failedWithoutRetryCounter.increment();
publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), throwable));
return -1;
}
return handleOnError(throwable);

}

private long handleOnError(Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ public class AsyncRetryEventPublisherTest {

private AsyncHelloWorldService helloWorldService;
private Logger logger;
private AsyncRetry retry;
private Retry retry;
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

@Before
public void setUp(){
helloWorldService = mock(AsyncHelloWorldService.class);
logger = mock(Logger.class);
retry = AsyncRetry.ofDefaults("testName");
retry = Retry.ofDefaults("testName");
}

@Test
public void shouldReturnTheSameConsumer() {
AsyncRetry.EventPublisher eventPublisher = retry.getEventPublisher();
AsyncRetry.EventPublisher eventPublisher2 = retry.getEventPublisher();
Retry.EventPublisher eventPublisher = retry.getEventPublisher();
Retry.EventPublisher eventPublisher2 = retry.getEventPublisher();

assertThat(eventPublisher).isEqualTo(eventPublisher2);
}
Expand Down Expand Up @@ -132,27 +132,4 @@ public void shouldConsumeOnErrorEvent() {
then(helloWorldService).should(times(3)).returnHelloWorld();
}

@Test
public void shouldConsumeIgnoredErrorEvent() {
given(helloWorldService.returnHelloWorld())
.willThrow(new WebServiceException("BAM!"));

RetryConfig retryConfig = RetryConfig.custom()
.retryOnException(throwable -> Match(throwable).of(
Case($(instanceOf(WebServiceException.class)), false),
Case($(), true)))
.build();
retry = AsyncRetry.of("testName", retryConfig);

retry.getEventPublisher()
.onIgnoredError(event ->
logger.info(event.getEventType().toString()));

Try.of(() -> awaitResult(retry.executeCompletionStage(scheduler,
() -> helloWorldService.returnHelloWorld())));

then(logger).should(times(1)).info("IGNORED_ERROR");
then(helloWorldService).should(times(1)).returnHelloWorld();
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
/*
*
* Copyright 2016 Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.retry.internal;

import static io.github.resilience4j.retry.utils.AsyncUtils.awaitResult;
import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import javax.xml.ws.WebServiceException;

import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.test.AsyncHelloWorldService;
import io.vavr.control.Try;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.BDDMockito;
import org.mockito.Mockito;

import io.github.resilience4j.retry.AsyncRetry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.test.AsyncHelloWorldService;
import io.vavr.control.Try;
import javax.xml.ws.WebServiceException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static io.github.resilience4j.retry.utils.AsyncUtils.awaitResult;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class AsyncRetryTest {

Expand All @@ -36,7 +50,7 @@ public void setUp() {
}

@Test
public void shouldNotRetry() throws InterruptedException, ExecutionException, TimeoutException {
public void shouldNotRetry() {
// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld())
.willReturn(completedFuture("Hello world"));
Expand All @@ -56,7 +70,7 @@ public void shouldNotRetry() throws InterruptedException, ExecutionException, Ti
}

@Test
public void shouldNotRetryWithThatResult() throws InterruptedException, ExecutionException, TimeoutException {
public void shouldNotRetryWithThatResult(){
// Given the HelloWorldService returns Hello world
BDDMockito.given(helloWorldService.returnHelloWorld())
.willReturn(completedFuture("Hello world"));
Expand Down Expand Up @@ -90,27 +104,18 @@ public void shouldRetryTowAttemptsInCaseOResultRetryMatchAtSyncStage() {
shouldCompleteFutureAfterAttemptsInCaseOfRetyOnResultAtAsyncStage(2, "Hello world");
}

@Test
public void shouldRetryInCaseOfExceptionAtSyncStage() {
@Test(expected = IllegalArgumentException.class)
public void shouldRethrowExceptionInCaseOfExceptionAtSyncStage() {
// Given the HelloWorldService throws an exception
BDDMockito.given(helloWorldService.returnHelloWorld())
.willThrow(new WebServiceException("BAM!"))
.willReturn(completedFuture("Hello world"));
.willThrow(new IllegalArgumentException("BAM!"));

// Create a Retry with default configuration
AsyncRetry retryContext = AsyncRetry.ofDefaults("id");
AsyncRetry retry = AsyncRetry.ofDefaults("id");
// Decorate the invocation of the HelloWorldService
Supplier<CompletionStage<String>> supplier = AsyncRetry.decorateCompletionStage(
retryContext,
retry.executeCompletionStage(
scheduler,
() -> helloWorldService.returnHelloWorld());

// When
String result = awaitResult(supplier.get());

// Then the helloWorldService should be invoked 2 times
BDDMockito.then(helloWorldService).should(Mockito.times(2)).returnHelloWorld();
Assertions.assertThat(result).isEqualTo("Hello world");
}

@Test
Expand Down Expand Up @@ -139,21 +144,6 @@ public void shouldRetryInCaseOfAnExceptionAtAsyncStage() {
Assertions.assertThat(result).isEqualTo("Hello world");
}

@Test
public void shouldCompleteFutureAfterOneAttemptInCaseOfExceptionAtSyncStage() {
shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(1);
}

@Test
public void shouldCompleteFutureAfterTwoAttemptsInCaseOfExceptionAtSyncStage() {
shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(2);
}

@Test
public void shouldCompleteFutureAfterThreeAttemptsInCaseOfExceptionAtSyncStage() {
shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(3);
}

private void shouldCompleteFutureAfterAttemptsInCaseOfExceptionAtSyncStage(int noOfAttempts) {
// Given the HelloWorldService throws an exception
BDDMockito.given(helloWorldService.returnHelloWorld())
Expand Down

0 comments on commit 0f91b98

Please sign in to comment.