Skip to content

Commit

Permalink
Issue ReactiveX#43: Added CircuitBreaker and Retry decorations for Co…
Browse files Browse the repository at this point in the history
…mpletionStage
  • Loading branch information
Oleksandr Goldobin authored and RobWin committed Jan 24, 2017
1 parent 495e380 commit b9e61a9
Show file tree
Hide file tree
Showing 8 changed files with 559 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
*.iml
.gradle
build
classes
43 changes: 35 additions & 8 deletions src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -286,17 +287,43 @@ static <T> Try.CheckedSupplier<T> decorateCheckedSupplier(CircuitBreaker circuit
*
* @return a supplier which is secured by a CircuitBreaker.
*/
static <T> Supplier<CompletableFuture<T>> decorateCompletableFuture(CircuitBreaker circuitBreaker, Supplier<CompletableFuture<T>> supplier){
static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
CircuitBreaker circuitBreaker,
Supplier<CompletionStage<T>> supplier
) {
return () -> {
CircuitBreakerUtils.isCallPermitted(circuitBreaker);
StopWatch stopWatch = StopWatch.start(circuitBreaker.getName());
return supplier.get().whenComplete((returnValue, throwable) -> {
if (returnValue != null) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration());
} else {

final CompletableFuture<T> promise = new CompletableFuture<>();

if (!circuitBreaker.isCallPermitted()) {
promise.completeExceptionally(
new CircuitBreakerOpenException(
String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())));

} else {
final StopWatch stopWatch = StopWatch.start(circuitBreaker.getName());

try {
supplier.get().whenComplete((result, throwable) -> {

final Duration duration = stopWatch.stop().getProcessingDuration();

if (throwable != null) {
circuitBreaker.onError(duration, throwable);
promise.completeExceptionally(throwable);

} else {
circuitBreaker.onSuccess(duration);
promise.complete(result);
}
});
} catch (Throwable throwable) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration(), throwable);
throw throwable;
}
});
}

return promise;
};
}

Expand Down
31 changes: 31 additions & 0 deletions src/main/java/io/github/robwin/decorators/Decorators.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import io.github.robwin.cache.Cache;
import io.github.robwin.circuitbreaker.CircuitBreaker;
import io.github.robwin.ratelimiter.RateLimiter;
import io.github.robwin.retry.AsyncRetry;
import io.github.robwin.retry.Retry;
import javaslang.control.Try;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -38,6 +41,11 @@ static DecorateCheckedRunnable ofCheckedRunnable(Try.CheckedRunnable supplier){
return new DecorateCheckedRunnable(supplier);
}

static <T> DecorateCompletionStage<T> ofCompletionStage(Supplier<CompletionStage<T>> stageSupplier){
return new DecorateCompletionStage<>(stageSupplier);
}


class DecorateSupplier<T>{
private Supplier<T> supplier;

Expand Down Expand Up @@ -209,4 +217,27 @@ public Try.CheckedRunnable decorate() {
return runnable;
}
}

class DecorateCompletionStage<T> {

private Supplier<CompletionStage<T>> stageSupplier;

public DecorateCompletionStage(Supplier<CompletionStage<T>> stageSupplier) {
this.stageSupplier = stageSupplier;
}

public DecorateCompletionStage<T> withCircuitBreaker(CircuitBreaker circuitBreaker) {
stageSupplier = CircuitBreaker.decorateCompletionStage(circuitBreaker, stageSupplier);
return this;
}

public DecorateCompletionStage<T> withRetry(AsyncRetry retryContext, ScheduledExecutorService scheduler) {
stageSupplier = AsyncRetry.decorateCompletionStage(retryContext, scheduler, stageSupplier);
return this;
}

public Supplier<CompletionStage<T>> decorate() {
return stageSupplier;
}
}
}
147 changes: 147 additions & 0 deletions src/main/java/io/github/robwin/retry/AsyncRetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.github.robwin.retry;

import io.github.robwin.retry.event.RetryEvent;
import io.github.robwin.retry.internal.AsyncRetryContext;
import io.reactivex.Flowable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public interface AsyncRetry {

/**
* Returns the ID of this Retry.
*
* @return the ID of this Retry
*/
String getId();

/**
* Records a successful call.
*/
void onSuccess();

/**
* Records an failed call.
* @return delay in milliseconds until the next try
*/
long onError(Throwable throwable);

/**
* Returns a reactive stream of RetryEvents.
*
* @return a reactive stream of RetryEvents
*/
Flowable<RetryEvent> getEventStream();

/**
* Creates a Retry with a custom Retry configuration.
*
* @param id the ID of the Retry
* @param retryConfig a custom Retry configuration
*
* @return a Retry with a custom Retry configuration.
*/
static AsyncRetry of(String id, RetryConfig retryConfig){
return new AsyncRetryContext(id, retryConfig);
}

/**
* Creates a Retry with a custom Retry configuration.
*
* @param id the ID of the Retry
* @param retryConfigSupplier a supplier of a custom Retry configuration
*
* @return a Retry with a custom Retry configuration.
*/
static AsyncRetry of(String id, Supplier<RetryConfig> retryConfigSupplier){
return of(id, retryConfigSupplier.get());
}

/**
* Creates a Retry with default configuration.
*
* @param id the ID of the Retry
* @return a Retry with default configuration
*/
static AsyncRetry ofDefaults(String id){
return of(id, RetryConfig.ofDefaults());
}

/**
* Decorates CompletionStageSupplier with Retry
*
* @param retryContext retry context
* @param scheduler execution service to use to schedule retries
* @param supplier completion stage supplier
* @param <T> type of completion stage result
* @return decorated supplier
*/
static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
AsyncRetry retryContext,
ScheduledExecutorService scheduler,
Supplier<CompletionStage<T>> supplier
) {
return () -> {

final CompletableFuture<T> promise = new CompletableFuture<>();
final Runnable block = new AsyncRetryBlock<>(scheduler, retryContext, supplier, promise);
block.run();

return promise;
};
}
}

class AsyncRetryBlock<T> implements Runnable {
private final ScheduledExecutorService scheduler;
private final AsyncRetry retryContext;
private final Supplier<CompletionStage<T>> supplier;
private final CompletableFuture<T> promise;

AsyncRetryBlock(
ScheduledExecutorService scheduler,
AsyncRetry retryContext,
Supplier<CompletionStage<T>> supplier,
CompletableFuture<T> promise
) {
this.scheduler = scheduler;
this.retryContext = retryContext;
this.supplier = supplier;
this.promise = promise;
}

@Override
public void run() {
final CompletionStage<T> stage;

try {
stage = supplier.get();
} catch (Throwable t) {
onError(t);
return;
}

stage.whenComplete((result, t) -> {
if (t != null) {
onError(t);
} else {
promise.complete(result);
retryContext.onSuccess();
}
});
}

private void onError(Throwable t) {
final long delay = retryContext.onError(t);

if (delay < 1) {
promise.completeExceptionally(t);
} else {
scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.github.robwin.retry.internal;

import io.github.robwin.retry.AsyncRetry;
import io.github.robwin.retry.RetryConfig;
import io.github.robwin.retry.event.RetryEvent;
import io.github.robwin.retry.event.RetryOnErrorEvent;
import io.github.robwin.retry.event.RetryOnSuccessEvent;
import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
import javaslang.collection.Stream;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

public class AsyncRetryContext implements AsyncRetry {

private final String id;
private final int maxAttempts;
private Duration waitDuration;
private final Function<Duration, Duration> backoffFunction;
private final FlowableProcessor<RetryEvent> eventPublisher;

private final AtomicInteger numOfAttempts = new AtomicInteger(0);

public AsyncRetryContext(String id, RetryConfig config) {
this.id = id;
this.maxAttempts = config.getMaxAttempts();
this.backoffFunction = config.getBackoffFunction();
this.waitDuration = config.getWaitDuration();

PublishProcessor<RetryEvent> publisher = PublishProcessor.create();
this.eventPublisher = publisher.toSerialized();
}

@Override
public String getId() {
return id;
}

@Override
public void onSuccess() {
int currentNumOfAttempts = numOfAttempts.get();
publishRetryEvent(() -> new RetryOnSuccessEvent(id, currentNumOfAttempts, null));
}

@Override
public long onError(Throwable throwable) {
int attempt = numOfAttempts.addAndGet(1);
publishRetryEvent(() -> new RetryOnErrorEvent(id, attempt, throwable));
return calculateInterval(attempt);
}

@Override
public Flowable<RetryEvent> getEventStream() {
return eventPublisher;
}


private long calculateInterval(int attempt) {

if (attempt > maxAttempts) {
return -1;
} else {
return Stream.iterate(waitDuration, backoffFunction)
.get(attempt - 1)
.toMillis();
}
}

private void publishRetryEvent(Supplier<RetryEvent> event) {
if(eventPublisher.hasSubscribers()) {
eventPublisher.onNext(event.get());
}
}
}
Loading

0 comments on commit b9e61a9

Please sign in to comment.