Skip to content

Commit

Permalink
Bumping versions
Browse files Browse the repository at this point in the history
  • Loading branch information
spring-builds committed Dec 1, 2023
1 parent 10f2896 commit 82309b7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ public class ReactiveResilience4JCircuitBreaker implements ReactiveCircuitBreake

@Deprecated
public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer) {
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer) {
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, false);
}

public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer,
boolean disableTimeLimiter) {
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer, boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = config.getCircuitBreakerConfig();
Expand All @@ -91,14 +90,12 @@ public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn
.timeout(timeoutDuration)
// Since we are using the Mono timeout we need to tell the circuit breaker
toReturn = toReturn.timeout(timeoutDuration)
// Since we are using the Mono timeout we need to tell the circuit
// breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
t -> tuple.getT1().onError(timeoutDuration.toMillis(), TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
Expand All @@ -113,12 +110,11 @@ public <T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback) {
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn.timeout(timeoutDuration)
// Since we are using the Flux timeout we need to tell the circuit breaker
// Since we are using the Flux timeout we need to tell the circuit
// breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
t -> tuple.getT1().onError(timeoutDuration.toMillis(), TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ReactiveResilience4JCircuitBreakerFactory extends

@Deprecated
public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
TimeLimiterRegistry timeLimiterRegistry) {
this(circuitBreakerRegistry, timeLimiterRegistry, null);
}

Expand Down Expand Up @@ -101,7 +101,7 @@ public ReactiveCircuitBreaker create(String id, String groupName) {
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config = new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(circuitBreakerConfig).timeLimiterConfig(timeLimiterConfig).build();
return new ReactiveResilience4JCircuitBreaker(id, groupName, config, circuitBreakerRegistry,
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)), isDisableTimeLimiter());
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)), isDisableTimeLimiter());
}

private boolean isDisableTimeLimiter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {
private final String id;

private final String groupName;

private final Map<String, String> tags;

private Resilience4jBulkheadProvider bulkheadProvider;
Expand All @@ -65,22 +66,21 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {

@Deprecated
public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
this(id, groupName, circuitBreakerConfig, timeLimiterConfig, circuitBreakerRegistry, timeLimiterRegistry,
executorService, circuitBreakerCustomizer, bulkheadProvider, false);
executorService, circuitBreakerCustomizer, bulkheadProvider, false);
}

public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider,
boolean disableTimeLimiter) {
Resilience4jBulkheadProvider bulkheadProvider, boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = circuitBreakerConfig;
Expand Down Expand Up @@ -109,40 +109,42 @@ public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
Optional<TimeLimiter> timeLimiter = loadTimeLimiter();
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,
this.circuitBreakerConfig, tags);
this.circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
if (bulkheadProvider != null) {

if (executorService != null) {
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
/* conditionally wrap in time-limiter */
Callable<T> timeLimitedCall = timeLimiter.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> timeLimitedCall = timeLimiter
.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> bulkheadCall = bulkheadProvider.decorateCallable(this.groupName, tags, timeLimitedCall);
Callable<T> circuitBreakerCall = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
return getAndApplyFallback(circuitBreakerCall, fallback);
}
else {
Callable<T> bulkheadCall = bulkheadProvider.decorateCallable(this.groupName, tags, toRun::get);
Callable<T> circuitBreakerCall = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
return getAndApplyFallback(circuitBreakerCall, fallback);
}
}
else {
if (executorService != null) {
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
/* conditionally wrap in time-limiter */
Callable<T> restrictedCall = timeLimiter.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> restrictedCall = timeLimiter
.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> callable = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, restrictedCall);
.decorateCallable(defaultCircuitBreaker, restrictedCall);
return getAndApplyFallback(callable, fallback);
}
else {
Supplier<T> decorator = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateSupplier(defaultCircuitBreaker, toRun);
.decorateSupplier(defaultCircuitBreaker, toRun);
return getAndApplyFallback(decorator, fallback);
}
}
Expand Down Expand Up @@ -171,7 +173,8 @@ private Optional<TimeLimiter> loadTimeLimiter() {
return Optional.empty();
}
return Optional.of(this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, this.tags))));
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName).orElseGet(
() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, this.tags))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public void setDisableThreadPool(boolean disableThreadPool) {
this.disableThreadPool = disableThreadPool;
}


boolean isDisableTimeLimiter() {
return disableTimeLimiter;
}

void setDisableTimeLimiter(boolean disableTimeLimiter) {
this.disableTimeLimiter = disableTimeLimiter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private boolean useSemaphoreBulkhead(String id) {
|| (bulkheadRegistry.find(id).isPresent() && threadPoolBulkheadRegistry.find(id).isEmpty());
}

private static <T> Callable<T> decorateTimeLimiter(final Supplier<? extends CompletionStage<T>> supplier, TimeLimiter timeLimiter) {
private static <T> Callable<T> decorateTimeLimiter(final Supplier<? extends CompletionStage<T>> supplier,
TimeLimiter timeLimiter) {
final Supplier<Future<T>> futureSupplier = () -> supplier.get().toCompletableFuture();
if (timeLimiter == null) {
/* execute without time-limiter */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,89 +71,81 @@ public void runFluxWithFallback() {
}

/**
* Run circuit breaker with default time limiter and expects everything to run without errors.
* Run circuit breaker with default time limiter and expects everything to run without
* errors.
*/
@Test
public void runWithDefaultTimeLimiter() {
final TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
ReactiveCircuitBreaker cb = new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
timeLimiterRegistry, new Resilience4JConfigurationProperties()).create("foo");
timeLimiterRegistry, new Resilience4JConfigurationProperties()).create("foo");

assertThat(Mono.fromCallable(() -> {
try {
/* sleep less than time limit allows us to */
TimeUnit.MILLISECONDS.sleep(Math.min(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration()
.toMillis() / 2L, 0L));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
})
.subscribeOn(Schedulers.single())
.transform(cb::run)
.block()
).isEqualTo("foobar");
try {
/* sleep less than time limit allows us to */
TimeUnit.MILLISECONDS.sleep(
Math.min(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration().toMillis() / 2L, 0L));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
}).subscribeOn(Schedulers.single()).transform(cb::run).block()).isEqualTo("foobar");
}

/**
* Run circuit breaker with default time limiter and expects the time limit to get exceeded.
* Run circuit breaker with default time limiter and expects the time limit to get
* exceeded.
*/
@Test(expected = NoFallbackAvailableException.class)
public void runWithDefaultTimeLimiterTooSlow() {
final TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
ReactiveCircuitBreaker cb = new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
timeLimiterRegistry, new Resilience4JConfigurationProperties()).create("foo");
timeLimiterRegistry, new Resilience4JConfigurationProperties()).create("foo");

Mono.fromCallable(() -> {
try {
/* sleep longer than time limit allows us to */
TimeUnit.MILLISECONDS.sleep(Math.max(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration().toMillis(), 100L) * 2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
})
.subscribeOn(Schedulers.single())
.transform(cb::run)
.doOnSuccess(s -> {
throw new AssertionError("timeout did not occur");
})
.block();
try {
/* sleep longer than time limit allows us to */
TimeUnit.MILLISECONDS.sleep(
Math.max(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration().toMillis(), 100L) * 2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
}).subscribeOn(Schedulers.single()).transform(cb::run).doOnSuccess(s -> {
throw new AssertionError("timeout did not occur");
}).block();

Assert.fail("execution did not cause exception");
}

/**
* Run circuit breaker with default time limiter and exceed time limit. Due to the disabled time limiter execution,
* everything should finish without errors.
* Run circuit breaker with default time limiter and exceed time limit. Due to the
* disabled time limiter execution, everything should finish without errors.
*/
@Test
public void runWithDisabledTimeLimiter() {
final TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
final Resilience4JConfigurationProperties resilience4JConfigurationProperties = new Resilience4JConfigurationProperties();
resilience4JConfigurationProperties.setDisableTimeLimiter(true);
ReactiveCircuitBreaker cb = new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
timeLimiterRegistry, resilience4JConfigurationProperties).create("foo");
timeLimiterRegistry, resilience4JConfigurationProperties).create("foo");

assertThat(Mono.fromCallable(() -> {
try {
/* sleep longer than timit limit allows us to */
TimeUnit.MILLISECONDS.sleep(Math.max(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration()
.toMillis(), 100L) * 2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
})
.subscribeOn(Schedulers.single())
.transform(cb::run)
.block()
).isEqualTo("foobar");
try {
/* sleep longer than timit limit allows us to */
TimeUnit.MILLISECONDS.sleep(
Math.max(timeLimiterRegistry.getDefaultConfig().getTimeoutDuration().toMillis(), 100L) * 2);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("thread got interrupted", e);
}
return "foobar";
}).subscribeOn(Schedulers.single()).transform(cb::run).block()).isEqualTo("foobar");
}

}

0 comments on commit 82309b7

Please sign in to comment.