Skip to content

Commit

Permalink
Issue ReactiveX#331: Fixed Retry.decorateCallable. It handled only Ru…
Browse files Browse the repository at this point in the history
…ntimeExce… (ReactiveX#389)

* Issue ReactiveX#331: Fixed Retry.decorateCallable. It handled only RuntimeExceptions, but should handle Exceptions instead.

* Issue ReactiveX#331: CircuitBreaker not handle java.lang.Error, but only java.lang.Exception.
  • Loading branch information
RobWin committed Apr 2, 2019
1 parent 462960f commit 0ef7c41
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,11 @@ static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBre
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Throwable throwable) {
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand Down Expand Up @@ -439,12 +440,15 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
try {
supplier.get().whenComplete((result, throwable) -> {
long durationInNanos = System.nanoTime() - start;
if (throwable != null) {
circuitBreaker.onError(durationInNanos, throwable);
promise.completeExceptionally(throwable);
} else {
if (result != null) {
circuitBreaker.onSuccess(durationInNanos);
promise.complete(result);
} else if (throwable instanceof Exception) {
circuitBreaker.onError(durationInNanos, throwable);
promise.completeExceptionally(throwable);
} else{
// Do not handle java.lang.Error
promise.completeExceptionally(throwable);
}
});
} catch (Throwable throwable) {
Expand Down Expand Up @@ -474,10 +478,11 @@ static CheckedRunnable decorateCheckedRunnable(CircuitBreaker circuitBreaker, Ch
runnable.run();
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
} catch (Throwable throwable){
} catch (Exception exception){
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -500,10 +505,11 @@ static <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Throwable throwable) {
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -526,10 +532,11 @@ static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Throwable throwable) {
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -551,10 +558,11 @@ static <T> Consumer<T> decorateConsumer(CircuitBreaker circuitBreaker, Consumer<
consumer.accept(t);
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
} catch (Throwable throwable) {
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -576,10 +584,11 @@ static <T> CheckedConsumer<T> decorateCheckedConsumer(CircuitBreaker circuitBrea
consumer.accept(t);
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
} catch (Throwable throwable) {
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -600,10 +609,11 @@ static Runnable decorateRunnable(CircuitBreaker circuitBreaker, Runnable runnabl
runnable.run();
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
} catch (Throwable throwable){
} catch (Exception exception){
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -626,10 +636,11 @@ static <T, R> Function<T, R> decorateFunction(CircuitBreaker circuitBreaker, Fun
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Throwable throwable){
} catch (Exception exception){
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand All @@ -652,10 +663,11 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(CircuitBreaker circ
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Throwable throwable){
} catch (Exception exception){
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, throwable);
throw throwable;
circuitBreaker.onError(durationInNanos, exception);
throw exception;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
return () -> {

final CompletableFuture<T> promise = new CompletableFuture<>();
@SuppressWarnings("unchecked") final Runnable block = new AsyncRetryBlock<>(scheduler, retry.asyncContext(), supplier, promise);
final Runnable block = new AsyncRetryBlock<>(scheduler, retry.asyncContext(), supplier, promise);
block.run();

return promise;
Expand All @@ -109,7 +109,6 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunction0<T> supplier) {
return () -> {
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.apply();
Expand Down Expand Up @@ -155,7 +154,6 @@ static CheckedRunnable decorateCheckedRunnable(Retry retry, CheckedRunnable runn
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, CheckedFunction1<T, R> function) {
return (T t) -> {
@SuppressWarnings("unchecked")
Retry.Context<R> context = retry.context();
do try {
R result = function.apply(t);
Expand All @@ -180,7 +178,6 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, Checke
*/
static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
return () -> {
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
Expand All @@ -205,7 +202,6 @@ static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
*/
static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier) {
return () -> {
@SuppressWarnings("unchecked")
Retry.Context<T> context = retry.context();
do try {
T result = supplier.call();
Expand All @@ -214,8 +210,8 @@ static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} catch (Exception exception) {
context.onError(exception);
} while (true);
};
}
Expand Down Expand Up @@ -251,7 +247,6 @@ static Runnable decorateRunnable(Retry retry, Runnable runnable) {
*/
static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> function) {
return (T t) -> {
@SuppressWarnings("unchecked")
Retry.Context<R> context = retry.context();
do try {
R result = function.apply(t);
Expand All @@ -278,14 +273,14 @@ static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> functi
*
* @return the retry Context
*/
Retry.Context context();
<T> Retry.Context<T> context();

/**
* Creates a async retry Context.
*
* @return the async retry Context
*/
Retry.AsyncContext asyncContext();
<T> Retry.AsyncContext<T> asyncContext();

/**
* Returns the RetryConfig of this Retry.
Expand All @@ -301,6 +296,18 @@ static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> functi
*/
EventPublisher getEventPublisher();

/**
* Decorates and executes the decorated Supplier.
*
* @param checkedSupplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
* @throws Throwable if something goes wrong applying this function to the given arguments
*/
default <T> T executeCheckedSupplier(CheckedFunction0<T> checkedSupplier) throws Throwable {
return decorateCheckedSupplier(this, checkedSupplier).apply();
}

/**
* Decorates and executes the decorated Supplier.
*
Expand Down Expand Up @@ -429,14 +436,15 @@ interface Context<T> {
* Handles a checked exception
*
* @param exception the exception to handle
* @throws Throwable the exception
* @throws Exception when retry count has exceeded
*/
void onError(Exception exception) throws Throwable;
void onError(Exception exception) throws Exception;

/**
* Handles a runtime exception
*
* @param runtimeException the exception to handle
* @throws RuntimeException when retry count has exceeded
*/
void onRuntimeError(RuntimeException runtimeException);
}
Expand Down Expand Up @@ -481,21 +489,24 @@ public void run() {

try {
stage = supplier.get();
} catch (Throwable t) {
} catch (Exception t) {
onError(t);
return;
}

stage.whenComplete((result, t) -> {
if (result != null) {
onResult(result);
} else if (t != null) {
onError(t);
} else if (t instanceof Exception) {
onError((Exception) t);
} else{
// Do not handle java.lang.Error
promise.completeExceptionally(t);
}
});
}

private void onError(Throwable t) {
private void onError(Exception t) {
final long delay = retryContext.onError(t);

if (delay < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public Context context() {
}

@Override
@SuppressWarnings("unchecked")
public AsyncContext asyncContext() {
return new AsyncContextImpl();
}
Expand Down Expand Up @@ -147,7 +148,7 @@ public boolean onResult(T result) {
return false;
}

public void onError(Exception exception) throws Throwable {
public void onError(Exception exception) throws Exception {
if (exceptionPredicate.test(exception)) {
lastException.set(exception);
throwOrSleepAfterException();
Expand Down

0 comments on commit 0ef7c41

Please sign in to comment.