Skip to content

Commit

Permalink
FailsafeFuture.cancel calls completion handlers. Fixes #53.
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Sep 8, 2016
1 parent cfaae84 commit 62ed9f6
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/main/java/net/jodah/failsafe/AsyncExecution.java
Expand Up @@ -154,7 +154,7 @@ synchronized boolean complete(Object result, Throwable failure, boolean checkArg
synchronized boolean completeOrRetry(Object result, Throwable failure) {
if (!complete(result, failure, true) && !future.isDone() && !future.isCancelled()) {
try {
future.setFuture((Future) scheduler.schedule(callable, delayNanos, TimeUnit.NANOSECONDS));
future.inject((Future) scheduler.schedule(callable, delayNanos, TimeUnit.NANOSECONDS));
return true;
} catch (Throwable t) {
failure = t;
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/net/jodah/failsafe/AsyncFailsafe.java
Expand Up @@ -144,10 +144,11 @@ public FailsafeFuture<Void> runAsync(AsyncRunnable runnable) {
* @throws NullPointerException if any argument is null
* @throws CircuitBreakerOpenException if a configured circuit breaker is open
*/
@SuppressWarnings("unchecked")
private <T> java.util.concurrent.CompletableFuture<T> call(AsyncCallableWrapper<T> callable) {
FailsafeFuture<T> future = new FailsafeFuture<T>();
FailsafeFuture<T> future = new FailsafeFuture<T>((FailsafeConfig<T, ?>) this);
java.util.concurrent.CompletableFuture<T> response = Functions.cancellableFutureOf(future);
future.setCompletableFuture(response);
future.inject(response);
call(callable, future);
return response;
}
Expand All @@ -162,7 +163,7 @@ private <T> java.util.concurrent.CompletableFuture<T> call(AsyncCallableWrapper<
@SuppressWarnings("unchecked")
private <T> FailsafeFuture<T> call(AsyncCallableWrapper<T> callable, FailsafeFuture<T> future) {
if (future == null)
future = new FailsafeFuture<T>();
future = new FailsafeFuture<T>((FailsafeConfig<T, ?>) this);

if (circuitBreaker != null && !circuitBreaker.allowsExecution()) {
CircuitBreakerOpenException e = new CircuitBreakerOpenException();
Expand All @@ -174,9 +175,10 @@ private <T> FailsafeFuture<T> call(AsyncCallableWrapper<T> callable, FailsafeFut

AsyncExecution execution = new AsyncExecution(callable, scheduler, future, (FailsafeConfig<Object, ?>) this);
callable.inject(execution);
future.inject(execution);

try {
future.setFuture((Future<T>) scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS));
future.inject((Future<T>) scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS));
} catch (Throwable t) {
handleComplete(null, t, execution, false);
future.complete(null, t, (CheckedBiFunction<T, Throwable, T>) fallback, false);
Expand Down
Expand Up @@ -5,6 +5,6 @@
*
* @author Jonathan Halterman
*/
public class CircuitBreakerOpenException extends RuntimeException {
public class CircuitBreakerOpenException extends FailsafeException {
private static final long serialVersionUID = 1L;
}
3 changes: 3 additions & 0 deletions src/main/java/net/jodah/failsafe/FailsafeException.java
Expand Up @@ -9,6 +9,9 @@
public class FailsafeException extends RuntimeException {
private static final long serialVersionUID = 1L;

FailsafeException() {
}

FailsafeException(Throwable t) {
super(t);
}
Expand Down
34 changes: 26 additions & 8 deletions src/main/java/net/jodah/failsafe/FailsafeFuture.java
Expand Up @@ -18,6 +18,8 @@
*/
public class FailsafeFuture<T> implements Future<T> {
private final ReentrantCircuit circuit = new ReentrantCircuit();
private final FailsafeConfig<T, ?> config;
private ExecutionContext execution;
private java.util.concurrent.CompletableFuture<T> completableFuture;

// Mutable state
Expand All @@ -27,7 +29,8 @@ public class FailsafeFuture<T> implements Future<T> {
private volatile T result;
private volatile Throwable failure;

FailsafeFuture() {
FailsafeFuture(FailsafeConfig<T, ?> config) {
this.config = config;
circuit.open();
}

Expand All @@ -49,10 +52,15 @@ public class FailsafeFuture<T> implements Future<T> {
*/
@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
boolean result = delegate.cancel(mayInterruptIfRunning);
if (done)
return false;

boolean cancelResult = delegate.cancel(mayInterruptIfRunning);
failure = new CancellationException();
cancelled = true;
circuit.close();
return result;
config.handleComplete(null, failure, execution, false);
complete(null, failure, config.fallback, false);
return cancelResult;
}

/**
Expand All @@ -66,8 +74,11 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
@Override
public T get() throws InterruptedException, ExecutionException {
circuit.await();
if (failure != null)
if (failure != null) {
if (failure instanceof CancellationException)
throw (CancellationException) failure;
throw new ExecutionException(failure);
}
return result;
}

Expand Down Expand Up @@ -120,6 +131,9 @@ public boolean isDone() {

synchronized void complete(T result, Throwable failure, CheckedBiFunction<T, Throwable, T> fallback,
boolean success) {
if (done)
return;

if (fallback == null) {
this.result = result;
this.failure = failure;
Expand All @@ -137,14 +151,18 @@ synchronized void complete(T result, Throwable failure, CheckedBiFunction<T, Thr
circuit.close();
}

void setCompletableFuture(java.util.concurrent.CompletableFuture<T> future) {
completableFuture = future;
void inject(java.util.concurrent.CompletableFuture<T> completableFuture) {
this.completableFuture = completableFuture;
}

void setFuture(Future<T> delegate) {
void inject(Future<T> delegate) {
this.delegate = delegate;
}

void inject(ExecutionContext execution) {
this.execution = execution;
}

private void completeFuture() {
if (failure == null)
completableFuture.complete(result);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/net/jodah/failsafe/SyncFailsafe.java
Expand Up @@ -156,8 +156,7 @@ private <T> T fallbackFor(R result, Throwable failure) {
try {
return (T) fallback.apply(result, failure);
} catch (Exception e) {
throw e instanceof CircuitBreakerOpenException ? (CircuitBreakerOpenException) e
: e instanceof FailsafeException ? (FailsafeException) e : new FailsafeException(e);
throw e instanceof FailsafeException ? (FailsafeException) e : new FailsafeException(e);
}
}
}
45 changes: 45 additions & 0 deletions src/test/java/net/jodah/failsafe/FailsafeFutureTest.java
@@ -1,12 +1,19 @@
package net.jodah.failsafe;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.testng.annotations.Test;

import net.jodah.concurrentunit.Waiter;

@Test
public class FailsafeFutureTest {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
Expand All @@ -19,4 +26,42 @@ public void shouldGetWithTimeout() throws Throwable {

Thread.sleep(1000);
}

public void shouldCompleteFutureOnCancel() throws Throwable {
Waiter waiter = new Waiter();
FailsafeFuture<String> future = Failsafe.with(new RetryPolicy()).with(executor).onComplete((r, f) -> {
waiter.assertNull(r);
waiter.assertTrue(f instanceof CancellationException);
waiter.resume();
}).get(() -> {
Thread.sleep(5000);
return "test";
});

Testing.sleep(300);
future.cancel(true);
waiter.await(1000);

assertTrue(future.isCancelled());
assertTrue(future.isDone());
Asserts.assertThrows(() -> future.get(), CancellationException.class);
}

/**
* Asserts that completion handlers are not called again if a completed execution is cancelled.
*/
public void shouldNotCancelCompletedExecution() throws Throwable {
Waiter waiter = new Waiter();
FailsafeFuture<String> future = Failsafe.with(new RetryPolicy()).with(executor).onComplete((r, f) -> {
waiter.assertEquals("test", r);
waiter.assertNull(f);
waiter.resume();
}).get(() -> "test");

waiter.await(500);
assertFalse(future.cancel(true));
assertFalse(future.isCancelled());
assertTrue(future.isDone());
assertEquals(future.get(), "test");
}
}

0 comments on commit 62ed9f6

Please sign in to comment.