Skip to content

Commit

Permalink
Issue ReactiveX#687: Introduces a new onComplete method on Retry.Cont…
Browse files Browse the repository at this point in the history
…ext (ReactiveX#698)
  • Loading branch information
Romeh authored and RobWin committed Oct 31, 2019
1 parent 5eb7a10 commit c016641
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private CompletionStage<?> executeCompletionStage(MethodInvocation invocation, C
completeFailedFuture(t2, recoveryFunction, promise);
}
} else {
context.onSuccess();
context.onComplete();
promise.complete(v);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Upstream<T> apply(Upstream<? extends T> upstream) {

@Override
public void success(T value) {
context.onSuccess();
context.onComplete();
down.success(value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

import io.github.resilience4j.reactor.IllegalPublisherException;
import io.github.resilience4j.retry.Retry;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import org.reactivestreams.Publisher;

/**
* A Reactor Retry operator which wraps a reactive type in a Retry.
* @param <T> the value type of the upstream and downstream
Expand Down Expand Up @@ -76,7 +75,7 @@ private static class Context<T> {
}

void onComplete() {
this.context.onSuccess();
this.context.onComplete();
}

void throwExceptionToForceRetryOnResult(T value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void retryOnResultUsingFlux() {

Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(0);
assertThat(metrics.getNumberOfSuccessfulCallsWithRetryAttempt()).isEqualTo(1);
assertThat(metrics.getNumberOfFailedCallsWithRetryAttempt()).isEqualTo(1);
}

@Test
Expand All @@ -252,7 +252,7 @@ public void retryOnResultFailAfterMaxAttemptsUsingFlux() {

Retry.Metrics metrics = retry.getMetrics();
assertThat(metrics.getNumberOfFailedCallsWithoutRetryAttempt()).isEqualTo(0);
assertThat(metrics.getNumberOfSuccessfulCallsWithRetryAttempt()).isEqualTo(1);
assertThat(metrics.getNumberOfFailedCallsWithRetryAttempt()).isEqualTo(1);
}

private RetryConfig retryConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* Copyright 2019 Mahmoud Romeh
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.retry;

/**
* Max Retries reached out exception , to be thrown on result predicate check exceed the max configured retries
*/
public class MaxRetriesExceeded extends RuntimeException {
public MaxRetriesExceeded(String errorMsg) {
super(errorMsg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static <T> CheckedFunction0<T> decorateCheckedSupplier(Retry retry, CheckedFunct
T result = supplier.apply();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
} catch (Exception exception) {
Expand All @@ -155,7 +155,7 @@ static CheckedRunnable decorateCheckedRunnable(Retry retry, CheckedRunnable runn
Retry.Context context = retry.context();
do try {
runnable.run();
context.onSuccess();
context.onComplete();
break;
} catch (Exception exception) {
context.onError(exception);
Expand All @@ -179,7 +179,7 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(Retry retry, Checke
R result = function.apply(t);
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
} catch (Exception exception) {
Expand All @@ -203,7 +203,7 @@ static <T> Supplier<T> decorateSupplier(Retry retry, Supplier<T> supplier) {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
} catch (RuntimeException runtimeException) {
Expand All @@ -228,7 +228,7 @@ static <E extends Exception, T> Supplier<Either<E, T>> decorateEitherSupplier(Re
if(result.isRight()){
final boolean validationOfResult = context.onResult(result.get());
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
}else{
Expand Down Expand Up @@ -259,7 +259,7 @@ static <T> Supplier<Try<T>> decorateTrySupplier(Retry retry, Supplier<Try<T>> su
if(result.isSuccess()){
final boolean validationOfResult = context.onResult(result.get());
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
}else{
Expand Down Expand Up @@ -294,7 +294,7 @@ static <T> Callable<T> decorateCallable(Retry retry, Callable<T> supplier) {
T result = supplier.call();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
} catch (Exception exception) {
Expand All @@ -315,7 +315,7 @@ static Runnable decorateRunnable(Retry retry, Runnable runnable) {
Retry.Context context = retry.context();
do try {
runnable.run();
context.onSuccess();
context.onComplete();
break;
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
Expand All @@ -339,7 +339,7 @@ static <T, R> Function<T, R> decorateFunction(Retry retry, Function<T, R> functi
R result = function.apply(t);
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
context.onComplete();
return result;
}
} catch (RuntimeException runtimeException) {
Expand Down Expand Up @@ -510,9 +510,18 @@ interface AsyncContext<T> {

/**
* Records a successful call.
* @deprecated since 1.2.0
*/
@Deprecated
void onSuccess();

/**
* Records a successful call or retryable call with the needed generated retry events.
* When there is a successful retry before reaching the max retries limit , it will generate {@link RetryOnSuccessEvent}
* When the retry reach the max retries limit , it will generate {@link RetryOnErrorEvent} with last exception or {@link MaxRetriesExceeded} if no other exception thrown
*/
void onComplete();

/**
* Records an failed call.
*
Expand All @@ -539,9 +548,19 @@ interface Context<T> {

/**
* Records a successful call.
* @deprecated since 1.2.0
*/
@Deprecated
void onSuccess();


/**
* Records a successful call or retryable call with the needed generated retry events.
* When there is a successful retry before reaching the max retries limit , it will generate {@link RetryOnSuccessEvent}
* When the retry reach the max retries limit , it will generate {@link RetryOnErrorEvent} with last exception or {@link MaxRetriesExceeded} if no other exception thrown
*/
void onComplete();

/**
* @param result the returned result from the called logic
* @return true if we need to retry again or false if no retry anymore
Expand Down Expand Up @@ -633,7 +652,7 @@ private void onResult(T result) {

if (delay < 1) {
promise.complete(result);
retryContext.onSuccess();
retryContext.onComplete();
} else {
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.core.lang.Nullable;
import io.github.resilience4j.retry.MaxRetriesExceeded;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.event.*;
Expand Down Expand Up @@ -136,19 +136,34 @@ public final class ContextImpl implements Retry.Context<T> {
private ContextImpl() {
}

@Override
/**
* @deprecated since 1.2.0
*/
@Override
@Deprecated
public void onSuccess() {
onComplete();
}

@Override
public void onComplete() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
if (currentNumOfAttempts > 0 && currentNumOfAttempts < maxAttempts) {
succeededAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnSuccessEvent(getName(), currentNumOfAttempts, throwable));
} else {
succeededWithoutRetryCounter.increment();
if (currentNumOfAttempts >= maxAttempts) {
failedAfterRetryCounter.increment();
Throwable throwable = Option.of(lastException.get()).getOrElse(lastRuntimeException.get());
publishRetryEvent(() -> new RetryOnErrorEvent(name, currentNumOfAttempts, throwable != null ? throwable : new MaxRetriesExceeded("max retries is reached out for the result predicate check")));
} else {
succeededWithoutRetryCounter.increment();
}
}
}

@Override
@Override
public boolean onResult(T result) {
if (null != resultPredicate && resultPredicate.test(result)) {
int currentNumOfAttempts = numOfAttempts.incrementAndGet();
Expand Down Expand Up @@ -225,14 +240,29 @@ public final class AsyncContextImpl implements Retry.AsyncContext<T> {
private final AtomicInteger numOfAttempts = new AtomicInteger(0);
private final AtomicReference<Throwable> lastException = new AtomicReference<>();

/**
* @deprecated since 1.2.0
*/
@Override
@Deprecated
public void onSuccess() {
onComplete();
}

@Override
public void onComplete() {
int currentNumOfAttempts = numOfAttempts.get();
if (currentNumOfAttempts > 0) {
if (currentNumOfAttempts > 0 && currentNumOfAttempts < maxAttempts) {
succeededAfterRetryCounter.increment();
publishRetryEvent(() -> new RetryOnSuccessEvent(name, currentNumOfAttempts, lastException.get()));
} else {
succeededWithoutRetryCounter.increment();
if (currentNumOfAttempts >= maxAttempts) {
failedAfterRetryCounter.increment();
publishRetryEvent(() -> new RetryOnErrorEvent(name, currentNumOfAttempts, lastException.get() != null ? lastException.get() : new MaxRetriesExceeded("max retries is reached out for the result predicate check")));
} else {
succeededWithoutRetryCounter.increment();

}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.github.resilience4j.retry;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class MaxRetriesExceededTest {


@Test
public void errorMessageShouldReportedRight() {
MaxRetriesExceeded maxRetriesExceeded = new MaxRetriesExceeded("test max retries");
assertEquals(maxRetriesExceeded.getMessage(), "test max retries");

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,39 @@ public void shouldNotRetryWithResult() {
assertThat(sleptTime).isEqualTo(0);
}

@Test
public void shouldDeprecatedOnSuccessCallOnFinish() {
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
final RetryConfig tryAgain = RetryConfig.<String>custom()
.retryOnResult(s -> s.contains("tryAgain"))
.maxAttempts(2).build();
Retry retry = Retry.of("id", tryAgain);

Supplier<String> supplier = decorateSupplierWithOnSuccess(retry, helloWorldService::returnHelloWorld);
String result = supplier.get();
then(helloWorldService).should().returnHelloWorld();
assertThat(result).isEqualTo("Hello world");
assertThat(sleptTime).isEqualTo(0);
}


private <T> Supplier<T> decorateSupplierWithOnSuccess(Retry retry, Supplier<T> supplier) {
return () -> {
Retry.Context<T> context = retry.context();
do try {
T result = supplier.get();
final boolean validationOfResult = context.onResult(result);
if (!validationOfResult) {
context.onSuccess();
return result;
}
} catch (RuntimeException runtimeException) {
context.onRuntimeError(runtimeException);
} while (true);
};
}


@Test
public void shouldRetryWithResult() {
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,9 @@

package io.github.resilience4j.retry.transformer;

import org.reactivestreams.Publisher;

import io.github.resilience4j.retry.Retry;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.CompletableTransformer;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;
import io.reactivex.*;
import org.reactivestreams.Publisher;

public class RetryTransformer<T> implements FlowableTransformer<T, T>, ObservableTransformer<T, T>,
SingleTransformer<T, T>, CompletableTransformer, MaybeTransformer<T, T> {
Expand Down Expand Up @@ -101,7 +87,7 @@ private static class Context<T> {
}

void onComplete() {
this.context.onSuccess();
this.context.onComplete();
}

void throwExceptionToForceRetryOnResult(T value) {
Expand Down

0 comments on commit c016641

Please sign in to comment.