Skip to content

Commit

Permalink
Issue ReactiveX#43: Added a new higher-order function to decorate a m…
Browse files Browse the repository at this point in the history
…ethod which returns a CompletableFuture.
  • Loading branch information
Robert Winkler committed Jan 23, 2017
1 parent 6ab8e3f commit 2e02065
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dependencies {

testCompile "io.dropwizard.metrics:metrics-core:3.1.2"
testCompile "junit:junit:4.11"
testCompile "org.assertj:assertj-core:3.0.0"
testCompile "org.assertj:assertj-core:3.6.2"
testCompile "ch.qos.logback:logback-classic:0.9.26"
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
testCompile "org.mockito:mockito-core:1.10.19"
Expand Down
43 changes: 33 additions & 10 deletions src/main/java/io/github/robwin/circuitbreaker/CircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package io.github.robwin.circuitbreaker;

import io.github.robwin.circuitbreaker.event.CircuitBreakerEvent;
import io.github.robwin.circuitbreaker.internal.*;
import io.github.robwin.circuitbreaker.internal.CircuitBreakerStateMachine;
import io.github.robwin.circuitbreaker.utils.CircuitBreakerUtils;
import io.github.robwin.metrics.StopWatch;
import io.reactivex.Flowable;
import javaslang.control.Try;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -255,7 +256,7 @@ interface Metrics {
}

/**
* Creates a supplier which is secured by a CircuitBreaker.
* Returns a supplier which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param supplier the original supplier
Expand All @@ -278,7 +279,29 @@ static <T> Try.CheckedSupplier<T> decorateCheckedSupplier(CircuitBreaker circuit
}

/**
* Creates a runnable which is secured by a CircuitBreaker.
* Returns a supplier which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param supplier the original supplier
*
* @return a supplier which is secured by a CircuitBreaker.
*/
static <T> Supplier<CompletableFuture<T>> decorateCompletableFuture(CircuitBreaker circuitBreaker, Supplier<CompletableFuture<T>> supplier){
return () -> {
CircuitBreakerUtils.isCallPermitted(circuitBreaker);
StopWatch stopWatch = StopWatch.start(circuitBreaker.getName());
return supplier.get().whenComplete((returnValue, throwable) -> {
if (returnValue != null) {
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration());
} else {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration(), throwable);
}
});
};
}

/**
* Returns a runnable which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param runnable the original runnable
Expand All @@ -300,7 +323,7 @@ static Try.CheckedRunnable decorateCheckedRunnable(CircuitBreaker circuitBreaker
}

/**
* Creates a callable which is secured by a CircuitBreaker.
* Returns a callable which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param callable the original Callable
Expand All @@ -323,7 +346,7 @@ static <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<
}

/**
* Creates a supplier which is secured by a CircuitBreaker.
* Returns a supplier which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param supplier the original supplier
Expand All @@ -346,7 +369,7 @@ static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<
}

/**
* Creates a consumer which is secured by a CircuitBreaker.
* Returns a consumer which is secured by a CircuitBreaker.
* @param circuitBreaker the CircuitBreaker
* @param consumer the original consumer
Expand All @@ -368,7 +391,7 @@ static <T> Consumer<T> decorateConsumer(CircuitBreaker circuitBreaker, Consumer<
}

/**
* Creates a consumer which is secured by a CircuitBreaker.
* Returns a consumer which is secured by a CircuitBreaker.
* @param circuitBreaker the CircuitBreaker
* @param consumer the original consumer
Expand All @@ -390,7 +413,7 @@ static <T> Try.CheckedConsumer<T> decorateCheckedConsumer(CircuitBreaker circuit
}

/**
* Creates a runnable which is secured by a CircuitBreaker.
* Returns a runnable which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param runnable the original runnable
Expand All @@ -412,7 +435,7 @@ static Runnable decorateRunnable(CircuitBreaker circuitBreaker, Runnable runnabl
}

/**
* Creates a function which is secured by a CircuitBreaker.
* Returns a function which is secured by a CircuitBreaker.
* @param circuitBreaker the CircuitBreaker
* @param function the original function
Expand All @@ -435,7 +458,7 @@ static <T, R> Function<T, R> decorateFunction(CircuitBreaker circuitBreaker, Fun
}

/**
* Creates a function which is secured by a CircuitBreaker.
* Returns a function which is secured by a CircuitBreaker.
*
* @param circuitBreaker the CircuitBreaker
* @param function the original function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static javaslang.API.Case;
import static javaslang.Predicates.instanceOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -636,6 +637,51 @@ public void shouldInvokeAsyncApply() throws ExecutionException, InterruptedExcep
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(0);
}

@Test
public void shouldDecorateCompletableFutureAndReturnWithSuccess() throws ExecutionException, InterruptedException {
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName");
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
// Given the HelloWorldService returns Hello world
given(helloWorldService.returnHelloWorld()).willReturn("Hello");
// When

Supplier<CompletableFuture<String>> completableFutureSupplier = () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);

Supplier<CompletableFuture<String>> decoratedCompletableFutureSupplier = CircuitBreaker.decorateCompletableFuture(circuitBreaker, completableFutureSupplier);
CompletableFuture<String> decoratedCompletableFuture = decoratedCompletableFutureSupplier.get()
.thenApply(value -> value + " world");

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
assertThat(decoratedCompletableFuture.get()).isEqualTo("Hello world");

CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(0);
}

@Test
public void shouldDecorateCompletableFutureAndReturnWithException() throws ExecutionException, InterruptedException {
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName");
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
// Given the HelloWorldService throws an exception
given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM!"));

// When
Supplier<CompletableFuture<String>> completableFutureSupplier = () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);
Supplier<CompletableFuture<String>> decoratedCompletableFutureSupplier = CircuitBreaker.decorateCompletableFuture(circuitBreaker, completableFutureSupplier);
CompletableFuture<String> decoratedCompletableFuture = decoratedCompletableFutureSupplier.get();

// Then the helloWorldService should be invoked 1 time
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
assertThatThrownBy(decoratedCompletableFuture::get).isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM!"));

CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
assertThat(metrics.getNumberOfFailedCalls()).isEqualTo(1);
}

@Test
public void shouldChainDecoratedFunctions() throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 2e02065

Please sign in to comment.