Skip to content

Commit

Permalink
Misc refactorings
Browse files Browse the repository at this point in the history
- Renamed PolicyResult to ExecutionResult
- Moved ExecutionResult to be a top level class
- Simplified internal event handlers
- Improved handling of no result executions
- Simplified fallback methods in FailsafeConfig
- Documented RejectedExecutionException on AsyncFailsafe
- Add explicit handling for scheduling errors
  • Loading branch information
jhalterman committed Dec 26, 2018
1 parent 8e312e8 commit db2bf79
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 265 deletions.
37 changes: 16 additions & 21 deletions src/main/java/net/jodah/failsafe/AbstractExecution.java
Expand Up @@ -15,7 +15,6 @@
*/
package net.jodah.failsafe;

import net.jodah.failsafe.PolicyExecutor.PolicyResult;
import net.jodah.failsafe.event.EventHandler;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.Duration;
Expand All @@ -24,21 +23,19 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("WeakerAccess")
public abstract class AbstractExecution extends ExecutionContext {
final EventHandler<Object> eventHandler;
final EventHandler eventHandler;
Callable<Object> callable;

// Internally mutable state
volatile Object lastResult;
volatile Throwable lastFailure;
PolicyExecutor head;
volatile PolicyExecutor lastExecuted;

// Externally mutable state
/** The wait time in nanoseconds. */
private volatile long waitNanos;
volatile boolean completed;
volatile boolean success;

/**
* Creates a new AbstractExecution for the {@code callable} and {@code config}.
Expand Down Expand Up @@ -84,11 +81,11 @@ private PolicyExecutor buildPolicyExecutor(FailsafePolicy policy, PolicyExecutor
*
* @throws IllegalStateException if the execution is already complete
*/
void record(PolicyResult pr) {
void record(ExecutionResult result) {
Assert.state(!completed, "Execution has already been completed");
executions++;
lastResult = pr.noResult ? null : pr.result;
lastFailure = pr.failure;
lastResult = result.result;
lastFailure = result.failure;
}

void preExecute() {
Expand All @@ -99,16 +96,15 @@ void preExecute() {
*
* @throws IllegalStateException if the execution is already complete
*/
synchronized boolean postExecute(PolicyResult pr) {
record(pr);
pr = postExecute(pr, head);
waitNanos = pr.waitNanos;
completed = pr.completed;
success = pr.success;
synchronized boolean postExecute(ExecutionResult result) {
record(result);
result = postExecute(result, head);
waitNanos = result.waitNanos;
completed = result.completed;
return completed;
}

private PolicyResult postExecute(PolicyResult result, PolicyExecutor policyExecutor) {
private ExecutionResult postExecute(ExecutionResult result, PolicyExecutor policyExecutor) {
// Traverse to the last executor
if (policyExecutor.next != null)
postExecute(result, policyExecutor.next);
Expand All @@ -119,12 +115,11 @@ private PolicyResult postExecute(PolicyResult result, PolicyExecutor policyExecu
/**
* Performs a synchronous execution.
*/
PolicyResult executeSync() {
PolicyResult pr = head.executeSync(null);
completed = pr.completed;
success = pr.success;
eventHandler.handleComplete(pr.result, pr.failure, this, success);
return pr;
ExecutionResult executeSync() {
ExecutionResult result = head.executeSync(null);
completed = result.completed;
eventHandler.handleComplete(result, this);
return result;
}

/**
Expand Down
44 changes: 22 additions & 22 deletions src/main/java/net/jodah/failsafe/AsyncExecution.java
Expand Up @@ -15,7 +15,6 @@
*/
package net.jodah.failsafe;

import net.jodah.failsafe.PolicyExecutor.PolicyResult;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;

Expand All @@ -26,6 +25,7 @@
*
* @author Jonathan Halterman
*/
@SuppressWarnings("WeakerAccess")
public final class AsyncExecution extends AbstractExecution {
private final FailsafeFuture<Object> future;
private final Scheduler scheduler;
Expand All @@ -50,7 +50,7 @@ <T> AsyncExecution(Callable<T> callable, Scheduler scheduler, FailsafeFuture<T>
* @throws IllegalStateException if the execution is already complete
*/
public void complete() {
complete(null, null, true);
postExecute(ExecutionResult.noResult());
}

/**
Expand All @@ -60,7 +60,7 @@ public void complete() {
* @throws IllegalStateException if the execution is already complete
*/
public boolean complete(Object result) {
return complete(result, null, false);
return postExecute(new ExecutionResult(result, null));
}

/**
Expand All @@ -74,7 +74,7 @@ public boolean complete(Object result) {
* @throws IllegalStateException if the execution is already complete
*/
public boolean complete(Object result, Throwable failure) {
return complete(result, failure, false);
return postExecute(new ExecutionResult(result, failure));
}

/**
Expand Down Expand Up @@ -106,7 +106,7 @@ public boolean retryFor(Object result) {
public boolean retryFor(Object result, Throwable failure) {
Assert.state(!retryCalled, "Retry has already been called");
retryCalled = true;
return completeOrRetry(result, failure);
return completeOrHandle(result, failure);
}

/**
Expand Down Expand Up @@ -134,12 +134,13 @@ void preExecute() {
*
* @throws IllegalStateException if the execution is already complete
*/
boolean complete(Object result, Throwable failure, boolean noResult) {
@Override
boolean postExecute(ExecutionResult result) {
synchronized (future) {
if (!completeCalled) {
if (super.postExecute(new PolicyResult(result, failure, noResult))) {
future.complete(result, failure);
eventHandler.handleComplete(result, failure, this, success);
if (super.postExecute(result)) {
future.complete(result.result, result.failure);
eventHandler.handleComplete(result, this);
}
completeCalled = true;
}
Expand All @@ -149,39 +150,38 @@ boolean complete(Object result, Throwable failure, boolean noResult) {
}

/**
* Attempts to complete the execution else schedule a retry, returning whether a retry has been scheduled or not.
* Attempts to complete the execution else handle according to the configured policies.
*
* @throws IllegalStateException if the execution is already complete
*/
boolean completeOrRetry(Object result, Throwable failure) {
boolean completeOrHandle(Object result, Throwable failure) {
synchronized (future) {
PolicyResult pr = new PolicyResult(result, failure);
ExecutionResult er = new ExecutionResult(result, failure);

if (!completeCalled) {
completeCalled = true;
record(pr);
record(er);
}

return executeAsync(pr, scheduler, future) == null;
return executeAsync(er, scheduler, future) == null;
}
}

/**
* Begins or continues an asynchronous execution from the last PolicyExecutor given the {@code pr}.
* Begins or continues an asynchronous execution from the last PolicyExecutor given the {@code result}.
*
* @return null if an execution has been scheduled
*/
PolicyResult executeAsync(PolicyResult pr, Scheduler scheduler, FailsafeFuture<Object> future) {
ExecutionResult executeAsync(ExecutionResult result, Scheduler scheduler, FailsafeFuture<Object> future) {
boolean shouldExecute = lastExecuted == null;
pr = head.executeAsync(pr, scheduler, future, shouldExecute);
result = head.executeAsync(result, scheduler, future, shouldExecute);

if (pr != null) {
if (result != null) {
completed = true;
success = !pr.noResult && pr.success;
eventHandler.handleComplete(pr.result, pr.failure, this, success);
future.complete(pr.result, pr.failure);
eventHandler.handleComplete(result, this);
future.complete(result.result, result.failure);
}

return pr;
return result;
}
}
24 changes: 16 additions & 8 deletions src/main/java/net/jodah/failsafe/AsyncFailsafe.java
Expand Up @@ -19,10 +19,7 @@
import net.jodah.failsafe.internal.util.CancellableFuture;
import net.jodah.failsafe.util.concurrent.Scheduler;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.function.Function;

/**
Expand All @@ -45,6 +42,7 @@ public class AsyncFailsafe<R> extends AsyncFailsafeConfig<R, AsyncFailsafe<R>> {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> CompletableFuture<T> future(Callable<? extends CompletionStage<T>> callable) {
return call(execution -> Functions.asyncOfFuture(callable, execution));
Expand All @@ -58,6 +56,7 @@ public <T> CompletableFuture<T> future(Callable<? extends CompletionStage<T>> ca
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> CompletableFuture<T> future(ContextualCallable<? extends CompletionStage<T>> callable) {
return call(execution -> Functions.asyncOfFuture(callable, execution));
Expand All @@ -72,6 +71,7 @@ public <T> CompletableFuture<T> future(ContextualCallable<? extends CompletionSt
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> CompletableFuture<T> futureAsync(AsyncCallable<? extends CompletionStage<T>> callable) {
return call(execution -> Functions.asyncOfFuture(callable, execution));
Expand All @@ -85,6 +85,7 @@ public <T> CompletableFuture<T> futureAsync(AsyncCallable<? extends CompletionSt
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> Future<T> get(Callable<T> callable) {
return call(execution -> Functions.asyncOf(callable, execution), null);
Expand All @@ -98,6 +99,7 @@ public <T> Future<T> get(Callable<T> callable) {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> Future<T> get(ContextualCallable<T> callable) {
return call(execution -> Functions.asyncOf(callable, execution), null);
Expand All @@ -112,6 +114,7 @@ public <T> Future<T> get(ContextualCallable<T> callable) {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code callable} is null
* @throws RejectedExecutionException if the {@code callable} cannot be scheduled for execution
*/
public <T> Future<T> getAsync(AsyncCallable<T> callable) {
return call(execution -> Functions.asyncOf(callable, execution), null);
Expand All @@ -125,6 +128,7 @@ public <T> Future<T> getAsync(AsyncCallable<T> callable) {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code runnable} is null
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public Future<Void> run(CheckedRunnable runnable) {
return call(execution -> Functions.asyncOf(runnable, execution), null);
Expand All @@ -138,6 +142,7 @@ public Future<Void> run(CheckedRunnable runnable) {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code runnable} is null
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public Future<Void> run(ContextualRunnable runnable) {
return call(execution -> Functions.asyncOf(runnable, execution), null);
Expand All @@ -152,6 +157,7 @@ public Future<Void> run(ContextualRunnable runnable) {
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code runnable} is null
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public Future<Void> runAsync(AsyncRunnable runnable) {
return call(execution -> Functions.asyncOf(runnable, execution), null);
Expand All @@ -164,11 +170,12 @@ public Future<Void> runAsync(AsyncRunnable runnable) {
* If a configured circuit breaker is open, the resulting future is completed with
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if any argument is null
* @throws NullPointerException if the {@code callableFn} is null
* @throws RejectedExecutionException if the {@code callableFn} cannot be scheduled for execution
*/
@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> call(Function<AsyncExecution, Callable<T>> callableFn) {
FailsafeFuture<T> future = new FailsafeFuture(this);
FailsafeFuture<T> future = new FailsafeFuture(eventHandler);
CompletableFuture<T> response = CancellableFuture.of(future);
future.inject(response);
call(callableFn, future);
Expand All @@ -182,12 +189,13 @@ private <T> CompletableFuture<T> call(Function<AsyncExecution, Callable<T>> call
* If a configured circuit breaker is open, the resulting future is completed with
* {@link CircuitBreakerOpenException}.
*
* @throws NullPointerException if any argument is null
* @throws NullPointerException if the {@code callableFn} is null
* @throws RejectedExecutionException if the {@code callableFn} cannot be scheduled for execution
*/
@SuppressWarnings("unchecked")
private <T> FailsafeFuture<T> call(Function<AsyncExecution, Callable<T>> callableFn, FailsafeFuture<T> future) {
if (future == null)
future = new FailsafeFuture(this);
future = new FailsafeFuture(eventHandler);

AsyncExecution execution = new AsyncExecution(scheduler, future, this);
Callable<T> callable = callableFn.apply(execution);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/jodah/failsafe/CircuitBreaker.java
Expand Up @@ -39,7 +39,7 @@
@SuppressWarnings("WeakerAccess")
public class CircuitBreaker implements FailsafePolicy {
/** Writes guarded by "this" */
private final AtomicReference<CircuitState> state = new AtomicReference<CircuitState>();
private final AtomicReference<CircuitState> state = new AtomicReference<>();
private final AtomicInteger currentExecutions = new AtomicInteger();
private final CircuitBreakerStats stats = currentExecutions::get;
private Duration delay = Duration.NONE;
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/net/jodah/failsafe/Execution.java
Expand Up @@ -15,7 +15,6 @@
*/
package net.jodah.failsafe;

import net.jodah.failsafe.PolicyExecutor.PolicyResult;
import net.jodah.failsafe.internal.util.Assert;

import java.util.Arrays;
Expand Down Expand Up @@ -50,7 +49,7 @@ public Execution(FailsafePolicy... policies) {
* @throws IllegalStateException if the execution is already complete
*/
public boolean canRetryFor(Object result) {
return !complete(result, null, false);
return !postExecute(new ExecutionResult(result, null));
}

/**
Expand All @@ -60,7 +59,7 @@ public boolean canRetryFor(Object result) {
* @throws IllegalStateException if the execution is already complete
*/
public boolean canRetryFor(Object result, Throwable failure) {
return !complete(result, failure, false);
return !postExecute(new ExecutionResult(result, failure));
}

/**
Expand All @@ -72,16 +71,16 @@ public boolean canRetryFor(Object result, Throwable failure) {
*/
public boolean canRetryOn(Throwable failure) {
Assert.notNull(failure, "failure");
return !complete(null, failure, false);
return !postExecute(new ExecutionResult(null, failure));
}

/**
* Records and completes the execution.
* Records and completes the execution successfully.
*
* @throws IllegalStateException if the execution is already complete
*/
public void complete() {
complete(null, null, true);
postExecute(ExecutionResult.noResult());
}

/**
Expand All @@ -91,17 +90,12 @@ public void complete() {
* @throws IllegalStateException if the execution is already complete
*/
public boolean complete(Object result) {
return complete(result, null, false);
}

private boolean complete(Object result, Throwable failure, boolean noResult) {
return postExecute(new PolicyResult(result, failure, noResult));
return postExecute(new ExecutionResult(result, null));
}

/**
* Records a failed execution and returns true if a retry can be performed for the {@code failure}, else returns false
* and completes the execution.
*
* <p>
* Alias of {@link #canRetryOn(Throwable)}
*
Expand Down

0 comments on commit db2bf79

Please sign in to comment.