Skip to content

Commit

Permalink
Added onRetriesExceeded listener
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Jul 1, 2016
1 parent 98d9122 commit 982501a
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 600 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -3,6 +3,7 @@
### New Features

* Added support for `onRetriesExceeded` listeners.
* `RetryPolicy` can be extended (it's no longer marked as final)

### Bug Fixes

Expand Down
15 changes: 8 additions & 7 deletions README.md
Expand Up @@ -133,14 +133,14 @@ Connection connection = Failsafe.with(retryPolicy).get(this::connect);

#### Asynchronous Retries

Asynchronous executions can be performed and retried on a [ScheduledExecutorService] or custom [Scheduler] implementation, and return a [FailsafeFuture]. When the execution succeeds or the retry policy is exceeded, the future is completed and any listeners registered against it are called:
Asynchronous executions can be performed and retried on a [ScheduledExecutorService] or custom [Scheduler] implementation, and return a [FailsafeFuture]. When the execution succeeds or the retry policy is exceeded, the future is completed and any registered [listeners](#event-listeners) are called:

```java
Failsafe.with(retryPolicy)
.with(executor)
.onSuccess(connection -> log.info("Connected to {}", connection))
.onFailure(failure -> log.error("Connection attempts failed", failure))
.run(this::connect);
.get(this::connect);
```

#### Circuit Breakers
Expand Down Expand Up @@ -258,7 +258,7 @@ Failsafe.with(retryPolicy).run(ctx -> {

#### Event Listeners

Failsafe supports a variety of execution and retry event [listeners]:
Failsafe supports a variety of execution and retry event [listeners][ListenerConfig]:

```java
Failsafe.with(retryPolicy)
Expand All @@ -268,7 +268,7 @@ Failsafe.with(retryPolicy)
.get(this::connect);
```

[Asynchronous listeners][AsyncListeners] are also supported:
[Asynchronous listeners][AsyncListenerConfig] are also supported:

```java
Failsafe.with(retryPolicy)
Expand All @@ -277,7 +277,7 @@ Failsafe.with(retryPolicy)
.onSuccessAsync(cxn -> log.info("Connected to {}", cxn), anotherExecutor);
```

Java 6 and 7 users can extend the `Listeners` class and override individual event handlers:
Java 6 and 7 users can extend the [Listeners] class and override individual event handlers:

```java
Failsafe.with(retryPolicy)
Expand Down Expand Up @@ -403,8 +403,9 @@ Failsafe is a volunteer effort. If you use it and you like it, you can help by s

Copyright 2015-2016 Jonathan Halterman - Released under the [Apache 2.0 license](http://www.apache.org/licenses/LICENSE-2.0.html).

[listeners]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/ListenerBindings.html
[AsyncListeners]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/AsyncListenerBindings.html
[Listeners]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/Listeners.html
[ListenerConfig]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/ListenerConfig.html
[AsyncListenerConfig]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/AsyncListenerConfig.html
[RetryPolicy]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/RetryPolicy.html
[FailsafeFuture]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/FailsafeFuture.html
[CompletableFuture]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/net/jodah/failsafe/AbstractExecution.java
Expand Up @@ -15,6 +15,7 @@ abstract class AbstractExecution extends ExecutionContext {
volatile Object lastResult;
volatile Throwable lastFailure;
volatile boolean completed;
volatile boolean retriesExceeded;
volatile boolean success;
volatile long waitNanos;

Expand Down Expand Up @@ -104,10 +105,10 @@ boolean complete(Object result, Throwable failure, boolean checkArgs) {
boolean maxRetriesExceeded = retryPolicy.getMaxRetries() != -1 && executions > retryPolicy.getMaxRetries();
boolean maxDurationExceeded = retryPolicy.getMaxDuration() != null
&& elapsedNanos > retryPolicy.getMaxDuration().toNanos();
retriesExceeded = maxRetriesExceeded || maxDurationExceeded;
boolean shouldAbort = retryPolicy.canAbortFor(result, failure);
boolean shouldRetry = !shouldAbort && checkArgs && retryPolicy.canRetryFor(result, failure);

completed = maxRetriesExceeded || maxDurationExceeded || !shouldRetry || shouldAbort;
boolean shouldRetry = !retriesExceeded && !shouldAbort && checkArgs && retryPolicy.canRetryFor(result, failure);
completed = shouldAbort || !shouldRetry;
success = completed && !shouldRetry && !shouldAbort && failure == null;

// Call listeners
Expand All @@ -116,8 +117,12 @@ boolean complete(Object result, Throwable failure, boolean checkArgs) {
listeners.handleFailedAttempt(result, failure, this);
if (shouldAbort)
listeners.handleAbort(result, failure, this);
else if (completed)
listeners.complete(result, failure, this, success);
else {
if (retriesExceeded)
listeners.handleRetriesExceeded(result, failure, this);
if (completed)
listeners.handleComplete(result, failure, this, success);
}
}

return completed;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/jodah/failsafe/AsyncExecution.java
Expand Up @@ -116,7 +116,7 @@ void before() {
completed = true;
Exception failure = new CircuitBreakerOpenException();
if (listeners != null)
listeners.complete(null, failure, this, false);
listeners.handleComplete(null, failure, this, false);
future.complete(null, failure);
return;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ synchronized boolean completeOrRetry(Object result, Throwable failure) {
} catch (Throwable t) {
failure = t;
if (listeners != null)
listeners.complete(null, t, this, false);
listeners.handleComplete(null, t, this, false);
future.complete(null, failure);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/jodah/failsafe/AsyncFailsafe.java
Expand Up @@ -207,7 +207,7 @@ private <T> FailsafeFuture<T> call(AsyncCallableWrapper<T> callable, FailsafeFut
try {
future.setFuture((Future<T>) scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS));
} catch (Throwable t) {
complete(null, t, execution, false);
handleComplete(null, t, execution, false);
future.complete(null, t);
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/net/jodah/failsafe/AsyncListenerConfig.java
Expand Up @@ -125,17 +125,17 @@ public S onFailureAsync(ResultListener<? extends R, ? extends Throwable> listene
}

/**
* Registers the {@code listener} to be called asynchronously on Failsafe's configured executor or Scheduler when the
* retry policy is exceeded and the result is a failure.
* Registers the {@code listener} to be called asynchronously on Failsafe's configured executor or Scheduler when an
* execution fails and the retry policy is exceeded.
*/
public S onRetriesExceededAsync(FailureListener<? extends Throwable> listener) {
registry().retriesExceeded().add(Listeners.of(Listeners.of(listener), null, scheduler));
return (S) this;
}

/**
* Registers the {@code listener} to be called asynchronously on Failsafe's configured executor or Scheduler when the
* retry policy is exceeded and the result is a failure.
* Registers the {@code listener} to be called asynchronously on Failsafe's configured executor or Scheduler when an
* execution fails and the retry policy is exceeded.
*/
public S onRetriesExceededAsync(ResultListener<? extends R, ? extends Throwable> listener) {
registry().retriesExceeded().add(Listeners.of(Listeners.of(listener), null, scheduler));
Expand Down
32 changes: 17 additions & 15 deletions src/main/java/net/jodah/failsafe/CircuitBreaker.java
Expand Up @@ -40,8 +40,9 @@ public int getCurrentExecutions() {
private Ratio failureThresholdRatio;
private Integer successThreshold;
private Ratio successThresholdRatio;
private boolean failureConditionChecked;
private List<BiPredicate<Object, Throwable>> failurePredicates;
/** Indicates whether failures are checked by a configured failure condition */
private boolean failuresChecked;
private List<BiPredicate<Object, Throwable>> failureConditions;
CheckedRunnable onOpen;
CheckedRunnable onHalfOpen;
CheckedRunnable onClose;
Expand All @@ -50,7 +51,7 @@ public int getCurrentExecutions() {
* Creates a Circuit that opens after a single failure, closes after a single success, and has no delay.
*/
public CircuitBreaker() {
failurePredicates = new ArrayList<BiPredicate<Object, Throwable>>();
failureConditions = new ArrayList<BiPredicate<Object, Throwable>>();
}

/**
Expand Down Expand Up @@ -87,8 +88,8 @@ public void close() {
@SuppressWarnings("unchecked")
public <T> CircuitBreaker failIf(BiPredicate<T, ? extends Throwable> completionPredicate) {
Assert.notNull(completionPredicate, "completionPredicate");
failureConditionChecked = true;
failurePredicates.add((BiPredicate<Object, Throwable>) completionPredicate);
failuresChecked = true;
failureConditions.add((BiPredicate<Object, Throwable>) completionPredicate);
return this;
}

Expand All @@ -99,7 +100,7 @@ public <T> CircuitBreaker failIf(BiPredicate<T, ? extends Throwable> completionP
*/
public <T> CircuitBreaker failIf(Predicate<T> resultPredicate) {
Assert.notNull(resultPredicate, "resultPredicate");
failurePredicates.add(Predicates.resultPredicateFor(resultPredicate));
failureConditions.add(Predicates.resultPredicateFor(resultPredicate));
return this;
}

Expand All @@ -125,8 +126,8 @@ public CircuitBreaker failOn(Class<? extends Throwable>... failures) {
public CircuitBreaker failOn(List<Class<? extends Throwable>> failures) {
Assert.notNull(failures, "failures");
Assert.isTrue(!failures.isEmpty(), "failures cannot be empty");
failureConditionChecked = true;
failurePredicates.add(Predicates.failurePredicateFor(failures));
failuresChecked = true;
failureConditions.add(Predicates.failurePredicateFor(failures));
return this;
}

Expand All @@ -137,16 +138,16 @@ public CircuitBreaker failOn(List<Class<? extends Throwable>> failures) {
*/
public CircuitBreaker failOn(Predicate<? extends Throwable> failurePredicate) {
Assert.notNull(failurePredicate, "failurePredicate");
failureConditionChecked = true;
failurePredicates.add(Predicates.failurePredicateFor(failurePredicate));
failuresChecked = true;
failureConditions.add(Predicates.failurePredicateFor(failurePredicate));
return this;
}

/**
* Specifies that a failure should be recorded if the execution result matches the {@code result}.
*/
public CircuitBreaker failWhen(Object result) {
failurePredicates.add(Predicates.resultPredicateFor(result));
failureConditions.add(Predicates.resultPredicateFor(result));
return this;
}

Expand Down Expand Up @@ -230,8 +231,8 @@ public boolean isClosed() {
}

/**
* Returns whether the circuit breaker considers the {@code result} or {@code throwable} a failure based on its
* failure configuration.
* Returns whether the circuit breaker considers the {@code result} or {@code throwable} a failure based on the
* configured conditions, or if {@code failure} is not null it is not checked by any configured condition.
*
* @see #failIf(BiPredicate)
* @see #failIf(Predicate)
Expand All @@ -241,12 +242,13 @@ public boolean isClosed() {
* @see #failWhen(Object)
*/
public boolean isFailure(Object result, Throwable failure) {
for (BiPredicate<Object, Throwable> predicate : failurePredicates) {
for (BiPredicate<Object, Throwable> predicate : failureConditions) {
if (predicate.test(result, failure))
return true;
}

return failure != null && !failureConditionChecked;
// Return true if the failure is not checked by a configured condition
return failure != null && !failuresChecked;
}

/**
Expand Down

0 comments on commit 982501a

Please sign in to comment.