Skip to content

Commit

Permalink
Issue #201 timeUnit replaced by currentTimestamp (#2007)
Browse files Browse the repository at this point in the history
  • Loading branch information
hexmind committed Aug 14, 2023
1 parent 71744be commit 2f1f437
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
* additional operations will be permitted to execute until space is available.
* <p>
* Once the operation is complete, regardless of the result (Success or Failure), client needs to
* call {@link AdaptiveBulkhead#onSuccess(long, TimeUnit)} or {@link AdaptiveBulkhead#onError(long,
* TimeUnit, Throwable)} in order to maintain integrity of internal bulkhead state which is handled
* call {@link AdaptiveBulkhead#onSuccess} or {@link AdaptiveBulkhead#onError}
* in order to maintain integrity of internal bulkhead state which is handled
* by invoking the configured adaptive limit policy.
* <p>
*/
Expand All @@ -72,27 +72,27 @@ public interface AdaptiveBulkhead {
/**
* Releases a permission and increases the number of available permits by one.
* <p>
* Should only be used when a permission was acquired but not used. Otherwise use {@link
* AdaptiveBulkhead#onSuccess(long, TimeUnit)} to signal a completed call and release a
* Should only be used when a permission was acquired but not used. Otherwise, use {@link
* AdaptiveBulkhead#onSuccess} to signal a completed call and release a
* permission.
*/
void releasePermission();

/**
* Records a call and releases a permission.
*/
void onResult(long startTime, TimeUnit timeUnit, @Nullable Object result);
void onResult(long startTime, @Nullable Object result);


/**
* Records a successful call and releases a permission.
*/
void onSuccess(long startTime, TimeUnit timeUnit);
void onSuccess(long startTime);

/**
* Records a failed call and releases a permission.
*/
void onError(long startTime, TimeUnit timeUnit, Throwable throwable);
void onError(long startTime, Throwable throwable);

/**
* Returns the name of this bulkhead.
Expand Down Expand Up @@ -198,10 +198,10 @@ static <T> CheckedSupplier<T> decorateCheckedSupplier(AdaptiveBulkhead bulkhead,
try {
start = bulkhead.getCurrentTimestamp();
T result = supplier.get();
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
return result;
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand Down Expand Up @@ -231,16 +231,16 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(AdaptiveBulkhead
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), throwable);
bulkhead.onError(start, throwable);
promise.completeExceptionally(throwable);
} else {
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
promise.complete(result);
}
}
);
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
promise.completeExceptionally(e);
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ static <T> Supplier<Future<T>> decorateFuture(AdaptiveBulkhead bulkhead, Supplie
try {
return new BulkheadFuture<>(bulkhead, supplier.get(), start);
} catch (Throwable e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand All @@ -293,11 +293,11 @@ static CheckedRunnable decorateCheckedRunnable(AdaptiveBulkhead bulkhead,
runnable.run();
} catch (Exception e) {
isFailed = true;
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
} finally {
if (start != 0 && !isFailed) {
bulkhead.onSuccess(start, bulkhead.getTimestampUnit());
bulkhead.onSuccess(start);
}
}
};
Expand All @@ -318,10 +318,10 @@ static <T> Callable<T> decorateCallable(AdaptiveBulkhead bulkhead, Callable<T> c
try {
start = bulkhead.getCurrentTimestamp();
T result = callable.call();
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
return result;
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand All @@ -342,10 +342,10 @@ static <T> Supplier<T> decorateSupplier(AdaptiveBulkhead bulkhead, Supplier<T> s
try {
start = bulkhead.getCurrentTimestamp();
T result = supplier.get();
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
return result;
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand All @@ -369,11 +369,11 @@ static <T> Consumer<T> decorateConsumer(AdaptiveBulkhead bulkhead, Consumer<T> c
consumer.accept(t);
} catch (Exception e) {
failed = true;
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
} finally {
if (start != 0 && !failed) {
bulkhead.onSuccess(start, bulkhead.getTimestampUnit());
bulkhead.onSuccess(start);
}
}
};
Expand All @@ -398,11 +398,11 @@ static <T> CheckedConsumer<T> decorateCheckedConsumer(AdaptiveBulkhead bulkhead,
consumer.accept(t);
} catch (Exception e) {
failed = true;
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
} finally {
if (start != 0 && !failed) {
bulkhead.onSuccess(start, bulkhead.getTimestampUnit());
bulkhead.onSuccess(start);
}
}
};
Expand All @@ -425,11 +425,11 @@ static Runnable decorateRunnable(AdaptiveBulkhead bulkhead, Runnable runnable) {
runnable.run();
} catch (Exception e) {
failed = true;
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
} finally {
if (start != 0 && !failed) {
bulkhead.onSuccess(start, bulkhead.getTimestampUnit());
bulkhead.onSuccess(start);
}
}
};
Expand All @@ -452,10 +452,10 @@ static <T, R> Function<T, R> decorateFunction(AdaptiveBulkhead bulkhead,
try {
start = bulkhead.getCurrentTimestamp();
R result = function.apply(t);
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
return result;
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand All @@ -478,10 +478,10 @@ static <T, R> CheckedFunction<T, R> decorateCheckedFunction(AdaptiveBulkhead bul
try {
start = bulkhead.getCurrentTimestamp();
R result = function.apply(t);
bulkhead.onResult(start, bulkhead.getTimestampUnit(), result);
bulkhead.onResult(start, result);
return result;
} catch (Exception e) {
bulkhead.onError(start, bulkhead.getTimestampUnit(), e);
bulkhead.onError(start, e);
throw e;
}
};
Expand Down Expand Up @@ -556,14 +556,14 @@ enum State {

/**
* Order is a FIXED integer, it should be preserved regardless of the ordinal number of the
* enumeration. While a State.ordinal() does mostly the same, it is prone to changing the
* enumeration. While a State.ordinal() does mostly the same, it is prone to change the
* order based on how the programmer sets the enum. If more states are added the "order"
* should be preserved. For example, if there is a state inserted between CLOSED and
* HALF_OPEN (say FIXED_OPEN) then the order of HALF_OPEN remains at 2 and the new state
* takes 3 regardless of its order in the enum.
*
* @param order
* @param allowPublish
* @param order int
* @param allowPublish boolean
*/
State(int order, boolean allowPublish) {
this.order = order;
Expand Down Expand Up @@ -629,7 +629,7 @@ interface Metrics extends Bulkhead.Metrics {
/**
* Returns the current total number of buffered calls in the sliding window.
*
* @return he current total number of buffered calls in the sliding window
* @return the current total number of buffered calls in the sliding window
*/
int getNumberOfBufferedCalls();

Expand Down Expand Up @@ -709,11 +709,11 @@ public T get() throws InterruptedException, ExecutionException {
try {
T result = future.get();
onceToBulkhead.applyOnce(b ->
b.onResult(start, b.getTimestampUnit(), result));
b.onResult(start, result));
return result;
} catch (Throwable throwable) {
onceToBulkhead.applyOnce(b ->
b.onError(start, b.getTimestampUnit(), throwable));
b.onError(start, throwable));
throw throwable;
}
}
Expand All @@ -723,11 +723,11 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
try {
T result = future.get(timeout, unit);
onceToBulkhead.applyOnce(b ->
b.onResult(start, b.getTimestampUnit(), result));
b.onResult(start, result));
return result;
} catch (Throwable throwable) {
onceToBulkhead.applyOnce(b ->
b.onError(start, b.getTimestampUnit(), throwable));
b.onError(start, throwable));
throw throwable;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class AdaptiveBulkheadConfig implements Serializable {
private static final Predicate<Object> DEFAULT_RECORD_RESULT_PREDICATE = (Object object) -> false;
private static final boolean DEFAULT_RESET_METRICS_ON_TRANSITION = false;


@SuppressWarnings("unchecked")
private Class<? extends Throwable>[] recordExceptions = new Class[0];
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package io.github.resilience4j.bulkhead.adaptive;


import io.github.resilience4j.bulkhead.adaptive.internal.InMemoryAdaptiveBulkheadRegistry;
import io.github.resilience4j.core.Registry;

import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.adaptive.internal.InMemoryAdaptiveBulkheadRegistry;
import io.github.resilience4j.core.Registry;

/**
* The {@link AdaptiveBulkheadRegistry} is a factory to create AdaptiveBulkhead instances which stores all bulkhead instances in a registry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,54 +116,53 @@ public void releasePermission() {
* and the result predicate should decide if the call was successful or not.
*
* @param startTime The start time of the call
* @param timeUnit The time unit
* @param result The result of the protected function
*/
public void onResult(long startTime, TimeUnit timeUnit, @Nullable Object result) {
public void onResult(long startTime, @Nullable Object result) {
if (result != null && config.getRecordResultPredicate().test(result)) {
ResultRecordedAsFailureException failure = new ResultRecordedAsFailureException(name, result);
LOG.debug("'{}' recorded a result type '{}' as a failure", name, result.getClass());
onError(startTime, timeUnit, failure);
onError(startTime, failure);
} else {
onSuccess(startTime, timeUnit);
onSuccess(startTime);
}
}

/**
* @param startTime The start time of the call
* @param timeUnit The time unit
*/
@Override
public void onSuccess(long startTime, TimeUnit timeUnit) {
releasePermission(); // ?
recordSuccess(startTime, timeUnit);
public void onSuccess(long startTime) {
releasePermission(); // TODO?
recordSuccess(startTime);
tryPublishEvent(
new BulkheadOnSuccessEvent(name, now()));
}

/**
* @param startTime The start time of the call
* @param timeUnit The time unit
* @param throwable An error
*/
@Override
public void onError(long startTime, TimeUnit timeUnit, Throwable throwable) {
public void onError(long startTime, Throwable throwable) {
if (config.getIgnoreExceptionPredicate().test(throwable)) {
releasePermission();
tryPublishEvent(
new BulkheadOnIgnoreEvent(name, now(), throwable));
} else if (config.getRecordExceptionPredicate().test(throwable)) {
releasePermission(); // ?
recordError(startTime, timeUnit, throwable);
recordError(startTime, throwable);
tryPublishEvent(
new BulkheadOnErrorEvent(name, now(), throwable));
} else {
onSuccess(startTime, timeUnit);
onSuccess(startTime);
}
}

private long nanosUntilNow(long startTime, TimeUnit timeUnit) {
return Math.max(0, getTimestampUnit().toNanos(getCurrentTimestamp()) - timeUnit.toNanos(startTime));
private long nanosUntilNow(long startTime) {
long now = getTimestampUnit().toNanos(getCurrentTimestamp());
long start = timestampUnit.toNanos(startTime);
return Math.max(0, now - start);
}

@Override
Expand Down Expand Up @@ -252,15 +251,15 @@ private void changeConfig(int oldValue, int newValue) {
}
}

private void recordError(long startTime, TimeUnit timeUnit, Throwable throwable) {
private void recordError(long startTime, Throwable throwable) {
LOG.debug("'{}' recorded an error:", name, throwable);
long nanoseconds = nanosUntilNow(startTime, timeUnit);
long nanoseconds = nanosUntilNow(startTime);
onThresholdExcess(metrics.onError(nanoseconds));
}

private void recordSuccess(long startTime, TimeUnit timeUnit) {
private void recordSuccess(long startTime) {
LOG.debug("'{}' recorded a success", name);
long nanoseconds = nanosUntilNow(startTime, timeUnit);
long nanoseconds = nanosUntilNow(startTime);
onThresholdExcess(metrics.onSuccess(nanoseconds));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
*/
package io.github.resilience4j.bulkhead.adaptive.internal;

import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Objects;
import java.util.function.Supplier;

import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkhead;
import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkheadConfig;
import io.github.resilience4j.bulkhead.adaptive.AdaptiveBulkheadRegistry;
import io.github.resilience4j.core.ConfigurationNotFoundException;
import io.github.resilience4j.core.registry.AbstractRegistry;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
* Bulkhead instance manager;
* Constructs/returns AdaptiveBulkhead instances.
Expand Down

0 comments on commit 2f1f437

Please sign in to comment.