Skip to content

Commit

Permalink
Simplified getAsyncExecution threading and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Jan 21, 2019
1 parent 0245c3a commit bfa6c14
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 76 deletions.
19 changes: 11 additions & 8 deletions src/main/java/net/jodah/failsafe/AsyncExecution.java
Expand Up @@ -18,7 +18,9 @@
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;

import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -29,9 +31,6 @@
*/
@SuppressWarnings("WeakerAccess")
public final class AsyncExecution extends AbstractExecution {
/** Used to complete a promise for a scheduled execution */
static final Exception SCHEDULED = new Exception("The execution has been scheduled");

private Supplier<CompletableFuture<ExecutionResult>> executionSupplier;
final FailsafeFuture<Object> future;
final Scheduler scheduler;
Expand Down Expand Up @@ -159,9 +158,13 @@ void executeAsync(Supplier<CompletableFuture<ExecutionResult>> supplier) {
supplier.get().whenComplete(this::complete);
}

<T> void executeAsyncExecution(Supplier<T> supplier) {
executionSupplier = Functions.makeAsyncExecution(supplier, scheduler, future);
executionSupplier.get();
/**
* Performs an asynchronous execution where the execution must be manually completed via the provided AsyncExecution.
*/
@SuppressWarnings("unchecked")
void executeAsyncExecution(Supplier<CompletableFuture<ExecutionResult>> supplier) {
executionSupplier = supplier;
future.inject((Future) scheduler.schedule(supplier::get, 0, TimeUnit.NANOSECONDS));
}

/**
Expand Down Expand Up @@ -189,7 +192,7 @@ boolean completeOrHandle(Object result, Throwable failure) {
}

private void complete(ExecutionResult result, Throwable error) {
if (AsyncExecution.SCHEDULED.equals(error))
if (result == null && error == null)
return;

completed = true;
Expand Down
44 changes: 14 additions & 30 deletions src/main/java/net/jodah/failsafe/FailsafeExecutor.java
Expand Up @@ -87,7 +87,7 @@ public <T extends R> T get(ContextualSupplier<T> supplier) {
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getAsync(CheckedSupplier<T> supplier) {
return callAsync(execution -> Functions.promiseOf(supplier, execution));
return callAsync(execution -> Functions.promiseOf(supplier, execution), false);
}

/**
Expand All @@ -101,7 +101,7 @@ public <T extends R> CompletableFuture<T> getAsync(CheckedSupplier<T> supplier)
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getAsync(ContextualSupplier<T> supplier) {
return callAsync(execution -> Functions.promiseOf(supplier, execution));
return callAsync(execution -> Functions.promiseOf(supplier, execution), false);
}

/**
Expand All @@ -116,7 +116,7 @@ public <T extends R> CompletableFuture<T> getAsync(ContextualSupplier<T> supplie
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncSupplier<T> supplier) {
return callAsyncExecution(execution -> Functions.asyncOfExecution(supplier, execution));
return callAsync(execution -> Functions.asyncOfExecution(supplier, execution), true);
}

void handleComplete(ExecutionResult result, ExecutionContext context) {
Expand Down Expand Up @@ -167,7 +167,7 @@ public FailsafeExecutor<R> onSuccess(CheckedConsumer<? extends ExecutionComplete
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extends CompletionStage<T>> supplier) {
return callAsync(execution -> Functions.promiseOfStage(supplier, execution));
return callAsync(execution -> Functions.promiseOfStage(supplier, execution), false);
}

/**
Expand All @@ -181,7 +181,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(CheckedSupplier<? extend
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getStageAsync(ContextualSupplier<? extends CompletionStage<T>> supplier) {
return callAsync(execution -> Functions.promiseOfStage(supplier, execution));
return callAsync(execution -> Functions.promiseOfStage(supplier, execution), false);
}

/**
Expand All @@ -197,7 +197,7 @@ public <T extends R> CompletableFuture<T> getStageAsync(ContextualSupplier<? ext
*/
public <T extends R> CompletableFuture<T> getStageAsyncExecution(
AsyncSupplier<? extends CompletionStage<T>> supplier) {
return callAsyncExecution(execution -> Functions.asyncOfFutureExecution(supplier, execution));
return callAsync(execution -> Functions.asyncOfFutureExecution(supplier, execution), true);
}

/**
Expand Down Expand Up @@ -234,7 +234,7 @@ public void run(ContextualRunnable runnable) {
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public CompletableFuture<Void> runAsync(CheckedRunnable runnable) {
return callAsync(execution -> Functions.promiseOf(runnable, execution));
return callAsync(execution -> Functions.promiseOf(runnable, execution), false);
}

/**
Expand All @@ -247,7 +247,7 @@ public CompletableFuture<Void> runAsync(CheckedRunnable runnable) {
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public CompletableFuture<Void> runAsync(ContextualRunnable runnable) {
return callAsync(execution -> Functions.promiseOf(runnable, execution));
return callAsync(execution -> Functions.promiseOf(runnable, execution), false);
}

/**
Expand All @@ -262,7 +262,7 @@ public CompletableFuture<Void> runAsync(ContextualRunnable runnable) {
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public CompletableFuture<Void> runAsyncExecution(AsyncRunnable runnable) {
return callAsyncExecution(execution -> Functions.asyncOfExecution(runnable, execution));
return callAsync(execution -> Functions.asyncOfExecution(runnable, execution), true);
}

/**
Expand Down Expand Up @@ -317,30 +317,14 @@ private <T> T call(Function<Execution, CheckedSupplier<?>> supplierFn) {
*/
@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> callAsync(
Function<AsyncExecution, Supplier<CompletableFuture<ExecutionResult>>> supplierFn) {
Function<AsyncExecution, Supplier<CompletableFuture<ExecutionResult>>> supplierFn, boolean asyncExecution) {
FailsafeFuture<T> future = new FailsafeFuture(this);
AsyncExecution execution = new AsyncExecution(scheduler, future, this);
future.inject(execution);
execution.executeAsync(supplierFn.apply(execution));
return future;
}

/**
* Calls the asynchronous {@code supplier} via the configured Scheduler, handling results according to the configured
* policies, until any configured policy is exceeded or the AsyncExecution is completed.
* <p>
* If a configured circuit breaker is open, the resulting future is completed with {@link
* CircuitBreakerOpenException}.
*
* @throws NullPointerException if the {@code supplierFn} is null
* @throws RejectedExecutionException if the {@code supplierFn} cannot be scheduled for execution
*/
@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> callAsyncExecution(Function<AsyncExecution, Supplier<?>> supplierFn) {
FailsafeFuture<T> future = new FailsafeFuture(this);
AsyncExecution execution = new AsyncExecution(scheduler, future, this);
future.inject(execution);
execution.executeAsyncExecution(supplierFn.apply(execution));
if (!asyncExecution)
execution.executeAsync(supplierFn.apply(execution));
else
execution.executeAsyncExecution(supplierFn.apply(execution));
return future;
}
}
49 changes: 17 additions & 32 deletions src/main/java/net/jodah/failsafe/Functions.java
Expand Up @@ -28,6 +28,8 @@
* @author Jonathan Halterman
*/
final class Functions {
private static final CompletableFuture<ExecutionResult> NULL_FUTURE = CompletableFuture.completedFuture(null);

/** Returns a supplier that supplies the {@code result} once then uses the {@code supplier} for subsequent calls. */
static <T> Supplier<CompletableFuture<T>> supplyOnce(CompletableFuture<T> result,
Supplier<CompletableFuture<T>> supplier) {
Expand Down Expand Up @@ -72,25 +74,6 @@ static Supplier<CompletableFuture<ExecutionResult>> makeAsync(Supplier<Completab
};
}

/**
* Returns a Supplier that supplies a promose that is completed exceptionally with AsyncExecution.SCHEDULED by calling
* the {@code supplier} on the {code scheduler}.
*/
@SuppressWarnings("unchecked")
static <T> Supplier<CompletableFuture<ExecutionResult>> makeAsyncExecution(Supplier<T> supplier, Scheduler scheduler,
FailsafeFuture<Object> future) {
return () -> {
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
try {
future.inject((Future) scheduler.schedule(supplier::get, 0, TimeUnit.NANOSECONDS));
promise.completeExceptionally(AsyncExecution.SCHEDULED);
} catch (Exception e) {
promise.completeExceptionally(e);
}
return promise;
};
}

static <T> Supplier<CompletableFuture<ExecutionResult>> promiseOf(CheckedSupplier<T> supplier,
AbstractExecution execution) {
Assert.notNull(supplier, "supplier");
Expand Down Expand Up @@ -156,34 +139,36 @@ static Supplier<CompletableFuture<ExecutionResult>> promiseOf(ContextualRunnable
};
}

static <T> Supplier<T> asyncOfExecution(AsyncSupplier<T> supplier, AsyncExecution execution) {
static <T> Supplier<CompletableFuture<ExecutionResult>> asyncOfExecution(AsyncSupplier<T> supplier,
AsyncExecution execution) {
Assert.notNull(supplier, "supplier");
return new Supplier<T>() {
return new Supplier<CompletableFuture<ExecutionResult>>() {
@Override
public synchronized T get() {
public synchronized CompletableFuture<ExecutionResult> get() {
try {
execution.preExecute();
return supplier.get(execution);
supplier.get(execution);
} catch (Throwable e) {
execution.completeOrHandle(null, e);
return null;
}
return NULL_FUTURE;
}
};
}

static <T> Supplier<T> asyncOfExecution(AsyncRunnable runnable, AsyncExecution execution) {
static <T> Supplier<CompletableFuture<ExecutionResult>> asyncOfExecution(AsyncRunnable runnable,
AsyncExecution execution) {
Assert.notNull(runnable, "runnable");
return new Supplier<T>() {
return new Supplier<CompletableFuture<ExecutionResult>>() {
@Override
public synchronized T get() {
public synchronized CompletableFuture<ExecutionResult> get() {
try {
execution.preExecute();
runnable.run(execution);
} catch (Throwable e) {
execution.completeOrHandle(null, e);
}
return null;
return NULL_FUTURE;
}
};
}
Expand Down Expand Up @@ -236,14 +221,14 @@ static <T> Supplier<CompletableFuture<ExecutionResult>> promiseOfStage(
};
}

static <T> Supplier<CompletableFuture<T>> asyncOfFutureExecution(
static <T> Supplier<CompletableFuture<ExecutionResult>> asyncOfFutureExecution(
AsyncSupplier<? extends CompletionStage<? extends T>> supplier, AsyncExecution execution) {
Assert.notNull(supplier, "supplier");
return new Supplier<CompletableFuture<T>>() {
return new Supplier<CompletableFuture<ExecutionResult>>() {
Semaphore asyncFutureLock = new Semaphore(1);

@Override
public CompletableFuture<T> get() {
public CompletableFuture<ExecutionResult> get() {
try {
execution.preExecute();
asyncFutureLock.acquire();
Expand All @@ -264,7 +249,7 @@ public CompletableFuture<T> get() {
}
}

return null;
return NULL_FUTURE;
}
};
}
Expand Down
Expand Up @@ -122,8 +122,10 @@ else if (!future.isDone() && !future.isCancelled()) {
postExecuteAsync(result, scheduler, future).thenAccept(postExecutionHandler);
else
postExecutionHandler.accept(result);
} else
} else if (error != null)
promise.completeExceptionally(error);
else
promise.complete(null);

return result;
});
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java
Expand Up @@ -101,7 +101,7 @@ public void shouldRunAsyncContextual() throws Throwable {
});
}

public void shouldRunAsyncExecutor() throws Throwable {
public void shouldRunAsyncExecution() throws Throwable {
assertRunAsync((AsyncRunnable) exec -> {
try {
service.connect();
Expand Down Expand Up @@ -163,7 +163,7 @@ public void shouldGetAsyncContextual() throws Throwable {
});
}

public void shouldGetAsyncExecutor() throws Throwable {
public void shouldGetAsyncExecution() throws Throwable {
assertGetAsync((AsyncSupplier<?>) exec -> {
try {
boolean result = service.connect();
Expand Down Expand Up @@ -271,7 +271,7 @@ public void shouldCancelOnGetAsync() throws Throwable {
}));
}

public void shouldCancelOnGetAsyncExecutor() throws Throwable {
public void shouldCancelOnGetAsyncExecution() throws Throwable {
assertCancel(executor -> getAsync(executor, (AsyncSupplier<?>) (e) -> {
Thread.sleep(1000);
e.complete();
Expand All @@ -285,7 +285,7 @@ public void shouldCancelOnRunAsync() throws Throwable {
}));
}

public void shouldCancelOnRunAsyncExecutor() throws Throwable {
public void shouldCancelOnRunAsyncExecution() throws Throwable {
assertCancel(executor -> runAsync(executor, (AsyncRunnable) (e) -> {
Thread.sleep(1000);
e.complete();
Expand All @@ -299,7 +299,7 @@ public void shouldCancelOnGetStageAsync() throws Throwable {
}));
}

public void shouldCancelOnGetStageAsyncExecutor() throws Throwable {
public void shouldCancelOnGetStageAsyncExecution() throws Throwable {
assertCancel(executor -> getStageAsync(executor, (AsyncSupplier<?>) (e) -> {
Thread.sleep(1000);
CompletableFuture<?> result = CompletableFuture.completedFuture("test");
Expand Down

0 comments on commit bfa6c14

Please sign in to comment.