Skip to content

Commit

Permalink
ReactiveX#625 Fix for some issues raised by sonar (ReactiveX#621)
Browse files Browse the repository at this point in the history
* Make this config explicitly immutable

* Explicit doc for config field

* Temporary change, to trigger warning from sonar.

* Make this config explicitly immutable

* Fix few other sonar complaints

* Disable non-relevant check in sonar

* Remove unused import

* Propagate right info about thread interruption to callers via exceptions and Thread.interrupted flag

* Fixes after merge with master

* Additional tests and code cleanup

* Tests fix

* Update documentation about thread interruption contract.
Update release notes.

* Make common exception type for RateLimiter and Bulkhead

* Code review comments fixed
  • Loading branch information
storozhukBM authored and RobWin committed Sep 24, 2019
1 parent 2e8c0da commit 188966d
Show file tree
Hide file tree
Showing 20 changed files with 815 additions and 595 deletions.
6 changes: 5 additions & 1 deletion RELEASENOTES.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,8 @@ NOTE: Breaking changes:
* Issue #568: Allow to configure exceptions which should be treated as a success in the CircuitBreaker.
* Issue #381: Allow to configure a slow response time threshold. If too many slow calls are recorded the CircuitBreaker opens.
* Issue #488: Micrometer support for ThreadPoolBulkhead
* Issue #540: Fixed a bug where IDE did not recognize the auto config properties
* Issue #540: Fixed a bug where IDE did not recognize the auto config properties

== Version 1.1.0
* Issue #625: Fix reliability and security issues raised by sonar
* Issue #626: Handle thread interruption consistently in Bulkhead and RateLimiter blocking methods
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead;
import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.core.exception.AcquirePermissionCancelledException;
import io.vavr.CheckedConsumer;
import io.vavr.CheckedFunction0;
import io.vavr.CheckedFunction1;
Expand Down Expand Up @@ -64,15 +65,22 @@ public interface Bulkhead {

/**
* Acquires a permission to execute a call, only if one is available at the time of invocation.
* If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @return {@code true} if a permission was acquired and {@code false} otherwise
*/
boolean tryAcquirePermission();

/**
* Acquires a permission to execute a call, only if one is available at the time of invocation
* If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @throws BulkheadFullException when the Bulkhead is full and no further calls are permitted.
* @throws AcquirePermissionCancelledException if thread was interrupted during permission wait
*/
void acquirePermission();

Expand Down Expand Up @@ -336,7 +344,7 @@ static <T> Supplier<Try<T>> decorateTrySupplier(Bulkhead bulkhead, Supplier<Try<
finally {
bulkhead.onComplete();
}
}else{
} else {
return Try.failure(BulkheadFullException.createBulkheadFullException(bulkhead));
}
};
Expand All @@ -361,7 +369,7 @@ static <T> Supplier<Either<Exception, T>> decorateEitherSupplier(Bulkhead bulkhe
finally {
bulkhead.onComplete();
}
}else{
} else {
return Either.left(BulkheadFullException.createBulkheadFullException(bulkhead));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@
*/
package io.github.resilience4j.bulkhead;

import javax.annotation.concurrent.Immutable;
import java.time.Duration;

/**
* A {@link BulkheadConfig} configures a {@link Bulkhead}
*/
@Immutable
public class BulkheadConfig {

public static final int DEFAULT_MAX_CONCURRENT_CALLS = 25;
public static final Duration DEFAULT_MAX_WAIT_DURATION = Duration.ofSeconds(0);
public static final boolean DEFAULT_WRITABLE_STACK_TRACE_ENABLED = true;

private int maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
private Duration maxWaitDuration= DEFAULT_MAX_WAIT_DURATION;
private boolean writableStackTraceEnabled = DEFAULT_WRITABLE_STACK_TRACE_ENABLED;
private final int maxConcurrentCalls;
private final Duration maxWaitDuration;
private final boolean writableStackTraceEnabled;

private BulkheadConfig() {
}
private BulkheadConfig(int maxConcurrentCalls, Duration maxWaitDuration, boolean writableStackTraceEnabled) {
this.maxConcurrentCalls = maxConcurrentCalls;
this.maxWaitDuration = maxWaitDuration;
this.writableStackTraceEnabled = writableStackTraceEnabled;
}

/**
* Returns a builder to create a custom BulkheadConfig.
Expand Down Expand Up @@ -76,14 +81,20 @@ public boolean isWritableStackTraceEnabled() {
}

public static class Builder {

private BulkheadConfig config = new BulkheadConfig();
private int maxConcurrentCalls;
private Duration maxWaitDuration;
private boolean writableStackTraceEnabled;

public Builder() {
this.maxConcurrentCalls = DEFAULT_MAX_CONCURRENT_CALLS;
this.maxWaitDuration = DEFAULT_MAX_WAIT_DURATION;
this.writableStackTraceEnabled = DEFAULT_WRITABLE_STACK_TRACE_ENABLED;
}

public Builder(BulkheadConfig bulkheadConfig) {
this.config = bulkheadConfig;
this.maxConcurrentCalls = bulkheadConfig.getMaxConcurrentCalls();
this.maxWaitDuration = bulkheadConfig.getMaxWaitDuration();
this.writableStackTraceEnabled = bulkheadConfig.isWritableStackTraceEnabled();
}

/**
Expand All @@ -96,7 +107,7 @@ public Builder maxConcurrentCalls(int maxConcurrentCalls) {
if (maxConcurrentCalls < 0) {
throw new IllegalArgumentException("maxConcurrentCalls must be an integer value >= 0");
}
config.maxConcurrentCalls = maxConcurrentCalls;
this.maxConcurrentCalls = maxConcurrentCalls;
return this;
}

Expand All @@ -114,7 +125,7 @@ public Builder maxWaitDuration(Duration maxWaitDuration) {
if (maxWaitDuration.toMillis() < 0) {
throw new IllegalArgumentException("maxWaitDuration must be a positive integer value >= 0");
}
config.maxWaitDuration = maxWaitDuration;
this.maxWaitDuration = maxWaitDuration;
return this;
}

Expand All @@ -127,7 +138,7 @@ public Builder maxWaitDuration(Duration maxWaitDuration) {
* @return the BulkheadConfig.Builder
*/
public Builder writableStackTraceEnabled(boolean writableStackTraceEnabled) {
config.writableStackTraceEnabled = writableStackTraceEnabled;
this.writableStackTraceEnabled = writableStackTraceEnabled;
return this;
}

Expand All @@ -137,7 +148,7 @@ public Builder writableStackTraceEnabled(boolean writableStackTraceEnabled) {
* @return the BulkheadConfig
*/
public BulkheadConfig build() {
return config;
return new BulkheadConfig(maxConcurrentCalls, maxWaitDuration, writableStackTraceEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public class BulkheadFullException extends RuntimeException {
public static BulkheadFullException createBulkheadFullException(Bulkhead bulkhead) {
boolean writableStackTraceEnabled = bulkhead.getBulkheadConfig().isWritableStackTraceEnabled();

String message = String.format("Bulkhead '%s' is full and does not permit further calls", bulkhead.getName());
String message;
if (Thread.currentThread().isInterrupted()) {
message = String.format("Bulkhead '%s' is full and thread was interrupted during permission wait",
bulkhead.getName());
} else {
message = String.format("Bulkhead '%s' is full and does not permit further calls", bulkhead.getName());
}

return new BulkheadFullException(message, writableStackTraceEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import io.github.resilience4j.bulkhead.event.BulkheadOnCallRejectedEvent;
import io.github.resilience4j.core.EventConsumer;
import io.github.resilience4j.core.EventProcessor;
import io.github.resilience4j.core.exception.AcquirePermissionCancelledException;
import io.github.resilience4j.core.lang.Nullable;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -46,11 +46,13 @@ public class SemaphoreBulkhead implements Bulkhead {

private final String name;
private final Semaphore semaphore;
private final Object configChangesLock = new Object();
private volatile BulkheadConfig config;
private final BulkheadMetrics metrics;
private final BulkheadEventProcessor eventProcessor;

private final Object configChangesLock = new Object();
@SuppressWarnings("squid:S3077") // this object is immutable and we replace ref entirely during config change.
private volatile BulkheadConfig config;

/**
* Creates a bulkhead using a configuration supplied
*
Expand Down Expand Up @@ -92,7 +94,7 @@ public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
@Override
public void changeConfig(final BulkheadConfig newConfig) {
synchronized (configChangesLock) {
int delta = newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls();
int delta = newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls();
if (delta < 0) {
semaphore.acquireUninterruptibly(-delta);
} else if (delta > 0) {
Expand All @@ -102,6 +104,9 @@ public void changeConfig(final BulkheadConfig newConfig) {
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean tryAcquirePermission() {
boolean callPermitted = tryEnterBulkhead();
Expand All @@ -114,13 +119,24 @@ public boolean tryAcquirePermission() {
return callPermitted;
}

/**
* {@inheritDoc}
*/
@Override
public void acquirePermission() {
if(!tryAcquirePermission()) {
throw BulkheadFullException.createBulkheadFullException(this);
boolean permitted = tryAcquirePermission();
if (permitted) {
return;
}
if (Thread.currentThread().isInterrupted()) {
throw new AcquirePermissionCancelledException();
}
throw BulkheadFullException.createBulkheadFullException(this);
}

/**
* {@inheritDoc}
*/
@Override
public void releasePermission() {
semaphore.release();
Expand Down Expand Up @@ -198,6 +214,9 @@ public String toString() {
return String.format("Bulkhead '%s'", this.name);
}

/**
* @return true if caller was able to wait for permission without {@link Thread#interrupt}
*/
boolean tryEnterBulkhead() {

boolean callPermitted;
Expand All @@ -209,6 +228,7 @@ boolean tryEnterBulkhead() {
try {
callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
callPermitted = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.*;

import static org.assertj.core.api.BDDAssertions.assertThat;
import static org.mockito.Mockito.mock;


public class BulkheadRegistryTest {

private BulkheadConfig config;
private BulkheadRegistry registry;
private Logger LOGGER;

@Before
public void setUp() {
LOGGER = mock(Logger.class);
// registry with default config
registry = BulkheadRegistry.ofDefaults();
// registry with custom config
Expand Down
Loading

0 comments on commit 188966d

Please sign in to comment.