Skip to content

Commit

Permalink
Add support for composable policies
Browse files Browse the repository at this point in the history
- Adds new FailsafePolicy and PolicyExecutor SPIs
- Adds separate FailsafePolicy and executor implemenations for RetryPolicy, CircuitBreaker, and Fallback

# Conflicts:
#	CHANGES.md
#	README.md
#	src/main/java/net/jodah/failsafe/AbstractExecution.java
#	src/main/java/net/jodah/failsafe/CircuitBreaker.java
  • Loading branch information
jhalterman committed Dec 25, 2018
1 parent 41edf0b commit 5d919d5
Show file tree
Hide file tree
Showing 37 changed files with 1,403 additions and 766 deletions.
11 changes: 8 additions & 3 deletions CHANGES.md
@@ -1,8 +1,13 @@
# 1.1.1
# Next

### Bug Fixes
### Improvements

* Issue #21 - Support failure strategy precedence
- This change respects the order that a `RetryPolicy`, `CircuitBreaker`, or fallback are configured in, and will apply them to any failures in order. The first policy strategy that is configured will handle a failure first, and so on.

### API Changes

* Issue #131 - Fix interaction between CircuitBreaker + Predicate + failure.
* Retry related listener methods such as `onRetry`, `onAbort`, and `onRetriesExceeded`, along with their async variants, will throw `IllegalStateException` if they're used when a `RetryPolicy` is not configured.

# 1.1.0

Expand Down
5 changes: 2 additions & 3 deletions README.md
Expand Up @@ -255,7 +255,6 @@ breaker.close();

if (breaker.allowsExecution()) {
try {
breaker.preExecute();
doSomething();
breaker.recordSuccess();
} catch (Exception e) {
Expand Down Expand Up @@ -450,7 +449,7 @@ Execution execution = new Execution(retryPolicy);

// On failure
if (execution.canRetryOn(someFailure))
service.scheduleRetry(execution.getWaitTime(), TimeUnit.MILLISECONDS);
service.scheduleRetry(execution.getWaitTime().toNanos(), TimeUnit.MILLISECONDS);
```

See the [RxJava example][RxJava] for a more detailed implementation.
Expand All @@ -474,7 +473,7 @@ Failsafe is a volunteer effort. If you use it and you like it, [let us know][who

## License

Copyright 2015-2016 Jonathan Halterman and friends. Released under the [Apache 2.0 license](http://www.apache.org/licenses/LICENSE-2.0.html).
Copyright 2015-2018 Jonathan Halterman and friends. Released under the [Apache 2.0 license](http://www.apache.org/licenses/LICENSE-2.0.html).

[backoff]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/RetryPolicy.html#withBackoff-long-long-java.util.concurrent.TimeUnit-
[computed-delay]: http://jodah.net/failsafe/javadoc/net/jodah/failsafe/RetryPolicy.html#withDelay-net.jodah.failsafe.RetryPolicy.DelayFunction-
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -57,7 +57,7 @@
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.4.2</version>
<version>0.4.4</version>
<scope>test</scope>
</dependency>

Expand Down
232 changes: 105 additions & 127 deletions src/main/java/net/jodah/failsafe/AbstractExecution.java
Expand Up @@ -15,39 +15,125 @@
*/
package net.jodah.failsafe;

import net.jodah.failsafe.RetryPolicy.DelayFunction;
import net.jodah.failsafe.PolicyExecutor.PolicyResult;
import net.jodah.failsafe.event.EventHandler;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.Duration;

import java.util.ListIterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

abstract class AbstractExecution extends ExecutionContext {
public abstract class AbstractExecution extends ExecutionContext {
final FailsafeConfig<Object, ?> config;
final RetryPolicy retryPolicy;
final CircuitBreaker circuitBreaker;
final EventHandler<Object> eventHandler;
final Callable<Object> callable;

// Mutable state
long attemptStartTime;
// Internally mutable state
volatile Object lastResult;
volatile Throwable lastFailure;
PolicyExecutor head;
volatile PolicyExecutor lastExecuted;

// Externally mutable state
/** The wait time in nanoseconds. */
private volatile long waitNanos;
volatile boolean completed;
volatile boolean retriesExceeded;
volatile boolean success;
/** The fixed, backoff, random or computed delay time in nanoseconds. */
private volatile long delayNanos = -1;
/** The wait time, which is the delay time adjusted for jitter and max duration, in nanoseconds. */
volatile long waitNanos;

/**
* Creates a new Execution for the {@code retryPolicy} and {@code circuitBreaker}.
*
* @throws NullPointerException if {@code retryPolicy} is null
* Creates a new standalone AbstractExecution for the {@code config}.
*/
AbstractExecution(FailsafeConfig<Object, ?> config) {
this(null, config);
}

/**
* Creates a new AbstractExecution for the {@code callable} and {@code config}.
*/
AbstractExecution(Callable<Object> callable, FailsafeConfig<Object, ?> config) {
super(new Duration(System.nanoTime(), TimeUnit.NANOSECONDS));
this.config = config;
retryPolicy = config.retryPolicy;
this.circuitBreaker = config.circuitBreaker;
this.callable = callable;
eventHandler = config.eventHandler;

PolicyExecutor next = null;
if (config.policies == null || config.policies.isEmpty()) {
// Add policies in logical order
if (config.circuitBreaker != null)
next = buildPolicyExecutor(config.circuitBreaker, next);
if (config.retryPolicy != RetryPolicy.NEVER)
next = buildPolicyExecutor(config.retryPolicy, next);
if (config.fallback != null)
next = buildPolicyExecutor(config.fallback, next);
} else {
// Add policies in user-defined order
ListIterator<FailsafePolicy> policyIterator = config.policies.listIterator(config.policies.size());
while (policyIterator.hasPrevious())
next = buildPolicyExecutor(policyIterator.previous(), next);
}

head = next;
}

void addPolicy(FailsafePolicy policy) {
head = buildPolicyExecutor(policy, head);
}

private PolicyExecutor buildPolicyExecutor(FailsafePolicy policy, PolicyExecutor next) {
PolicyExecutor policyExecutor = policy.toExecutor();
policyExecutor.execution = this;
policyExecutor.eventHandler = eventHandler;
policyExecutor.next = next;
return policyExecutor;
}

/**
* Records an execution attempt.
*
* @throws IllegalStateException if the execution is already complete
*/
void record(PolicyResult pr) {
Assert.state(!completed, "Execution has already been completed");
executions++;
lastResult = pr.noResult ? null : pr.result;
lastFailure = pr.failure;
}

void preExecute() {
}

/**
* Performs post-execution handling of the {@code result}, returning true if complete else false.
*
* @throws IllegalStateException if the execution is already complete
*/
synchronized boolean postExecute(PolicyResult pr) {
record(pr);
pr = postExecute(pr, head);
waitNanos = pr.waitNanos;
completed = pr.completed;
success = pr.success;
return completed;
}

private PolicyResult postExecute(PolicyResult result, PolicyExecutor policyExecutor) {
// Traverse to the last executor
if (policyExecutor.next != null)
postExecute(result, policyExecutor.next);

return policyExecutor.postExecute(result);
}

/**
* Performs a synchronous execution.
*/
PolicyResult executeSync() {
PolicyResult pr = head.executeSync(null);
completed = pr.completed;
success = pr.success;
eventHandler.handleComplete(pr.result, pr.failure, this, success);
return pr;
}

/**
Expand All @@ -67,125 +153,17 @@ public <T> T getLastResult() {
}

/**
* Returns the time to wait before the next execution attempt. Returns {@code 0} if an execution has not yet occurred.
* Returns the time to wait before the next execution attempt. Returns {@code 0} if an execution has not yet
* occurred.
*/
public Duration getWaitTime() {
return new Duration(waitNanos, TimeUnit.NANOSECONDS);
}

/**
* Returns whether the execution is complete.
* Returns whether the execution is complete or if it can be retried.
*/
public boolean isComplete() {
return completed;
}

void before() {
if (circuitBreaker != null)
circuitBreaker.preExecute();
attemptStartTime = System.nanoTime();
}

/**
* Records and attempts to complete the execution, returning true if complete else false.
*
* @throws IllegalStateException if the execution is already complete
*/
@SuppressWarnings("unchecked")
boolean complete(Object result, Throwable failure, boolean checkArgs) {
Assert.state(!completed, "Execution has already been completed");
executions++;
lastResult = result;
lastFailure = failure;
long elapsedNanos = getElapsedTime().toNanos();

// Record the execution with the circuit breaker
if (circuitBreaker != null) {
Duration timeout = circuitBreaker.getTimeout();
boolean timeoutExceeded = timeout != null && elapsedNanos >= timeout.toNanos();
if (circuitBreaker.isFailure(result, failure) || timeoutExceeded)
circuitBreaker.recordFailure();
else
circuitBreaker.recordSuccess();
}

// Determine the computed delay
long computedDelayNanos = -1;
DelayFunction<Object, Throwable> delayFunction = (DelayFunction<Object, Throwable>) retryPolicy.getDelayFn();
if (delayFunction != null && retryPolicy.canApplyDelayFn(result, failure)) {
Duration computedDelay = delayFunction.computeDelay(result, failure, this);
if (computedDelay != null && computedDelay.toNanos() >= 0)
computedDelayNanos = computedDelay.toNanos();
}

// Determine the non-computed delay
if (computedDelayNanos == -1) {
Duration delay = retryPolicy.getDelay();
Duration delayMin = retryPolicy.getDelayMin();
Duration delayMax = retryPolicy.getDelayMax();

if (delayNanos == -1 && delay != null && !delay.equals(Duration.NONE))
delayNanos = delay.toNanos();
else if (delayMin != null && delayMax != null)
delayNanos = randomDelayInRange(delayMin.toNanos(), delayMax.toNanos(), Math.random());

// Adjust for backoff
if (executions != 1 && retryPolicy.getMaxDelay() != null)
delayNanos = (long) Math.min(delayNanos * retryPolicy.getDelayFactor(), retryPolicy.getMaxDelay().toNanos());
}

waitNanos = computedDelayNanos != -1 ? computedDelayNanos : delayNanos;

// Adjust the wait time for jitter
if (retryPolicy.getJitter() != null)
waitNanos = randomDelay(waitNanos, retryPolicy.getJitter().toNanos(), Math.random());
else if (retryPolicy.getJitterFactor() > 0.0)
waitNanos = randomDelay(waitNanos, retryPolicy.getJitterFactor(), Math.random());

// Adjust the wait time for max duration
if (retryPolicy.getMaxDuration() != null) {
long maxRemainingWaitTime = retryPolicy.getMaxDuration().toNanos() - elapsedNanos;
waitNanos = Math.min(waitNanos, maxRemainingWaitTime < 0 ? 0 : maxRemainingWaitTime);
if (waitNanos < 0)
waitNanos = 0;
}

boolean maxRetriesExceeded = retryPolicy.getMaxRetries() != -1 && executions > retryPolicy.getMaxRetries();
boolean maxDurationExceeded = retryPolicy.getMaxDuration() != null
&& elapsedNanos > retryPolicy.getMaxDuration().toNanos();
retriesExceeded = maxRetriesExceeded || maxDurationExceeded;
boolean isAbortable = retryPolicy.canAbortFor(result, failure);
boolean isRetryable = retryPolicy.canRetryFor(result, failure);
boolean shouldRetry = !retriesExceeded && checkArgs && !isAbortable && retryPolicy.allowsRetries() && isRetryable;
completed = isAbortable || !shouldRetry;
success = completed && !isAbortable && !isRetryable && failure == null;

// Call listeners
if (!success)
config.handleFailedAttempt(result, failure, this);
if (isAbortable)
config.handleAbort(result, failure, this);
else {
if (!success && retriesExceeded)
config.handleRetriesExceeded(result, failure, this);
if (completed)
config.handleComplete(result, failure, this, success);
}

return completed;
}

static long randomDelayInRange(long delayMin, long delayMax, double random) {
return (long) (random * (delayMax - delayMin)) + delayMin;
}

static long randomDelay(long delay, long jitter, double random) {
double randomAddend = (1 - random * 2) * jitter;
return (long) (delay + randomAddend);
}

static long randomDelay(long delay, double jitterFactor, double random) {
double randomFactor = 1 + (1 - random * 2) * jitterFactor;
return (long) (delay * randomFactor);
}
}

0 comments on commit 5d919d5

Please sign in to comment.