Skip to content

Commit

Permalink
Add support for scheduler
Browse files Browse the repository at this point in the history
Update Vert.x example to use scheduler
  • Loading branch information
jhalterman committed Jul 27, 2015
1 parent 19fc5cc commit 7d6936e
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 72 deletions.
13 changes: 7 additions & 6 deletions src/main/java/net/jodah/recurrent/AsyncCallable.java
@@ -1,7 +1,7 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

Expand All @@ -14,7 +14,7 @@
abstract class AsyncCallable<T> implements Callable<T> {
protected Invocation invocation;
protected RecurrentFuture<T> future;
protected ScheduledExecutorService executor;
protected Scheduler scheduler;

static <T> AsyncCallable<T> of(final Callable<T> callable) {
return new AsyncCallable<T>() {
Expand Down Expand Up @@ -100,10 +100,10 @@ public void accept(T innerResult, Throwable failure) {
};
}

void initialize(Invocation invocation, RecurrentFuture<T> future, ScheduledExecutorService executor) {
void initialize(Invocation invocation, RecurrentFuture<T> future, Scheduler scheduler) {
this.invocation = invocation;
this.future = future;
this.executor = executor;
this.scheduler = scheduler;
}

/**
Expand All @@ -122,17 +122,18 @@ void recordResult(T result, Throwable failure) {
if (invocation.canRetryWhen(result, failure))
scheduleRetry();
else
future.complete(result, failure);
future.complete(result, failure);
}
}

/**
* Schedules a retry if the future is not done or cancelled.
*/
@SuppressWarnings("unchecked")
void scheduleRetry() {
synchronized (future) {
if (!future.isDone() && !future.isCancelled())
future.setFuture(executor.schedule(this, invocation.waitTime, TimeUnit.NANOSECONDS));
future.setFuture((Future<T>) scheduler.schedule(this, invocation.waitTime, TimeUnit.NANOSECONDS));
}
}
}
70 changes: 59 additions & 11 deletions src/main/java/net/jodah/recurrent/Recurrent.java
@@ -1,6 +1,7 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -23,7 +24,15 @@ private Recurrent() {
public static <T> java.util.concurrent.CompletableFuture<T> future(
Callable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return future(contextual(callable), retryPolicy, executor);
return future(contextual(callable), retryPolicy, Schedulers.of(executor));
}

/**
* Invokes the {@code callable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static <T> java.util.concurrent.CompletableFuture<T> future(
Callable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy, Scheduler scheduler) {
return future(contextual(callable), retryPolicy, scheduler);
}

/**
Expand All @@ -32,8 +41,17 @@ public static <T> java.util.concurrent.CompletableFuture<T> future(
public static <T> java.util.concurrent.CompletableFuture<T> future(
ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return future(callable, retryPolicy, Schedulers.of(executor));
}

/**
* Invokes the {@code callable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static <T> java.util.concurrent.CompletableFuture<T> future(
ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable, RetryPolicy retryPolicy,
Scheduler scheduler) {
final java.util.concurrent.CompletableFuture<T> response = new java.util.concurrent.CompletableFuture<T>();
RecurrentFuture<T> future = new RecurrentFuture<T>(executor).whenComplete(new CompletionListener<T>() {
RecurrentFuture<T> future = new RecurrentFuture<T>(scheduler).whenComplete(new CompletionListener<T>() {
@Override
public void onCompletion(T result, Throwable failure) {
if (failure == null)
Expand All @@ -43,7 +61,7 @@ public void onCompletion(T result, Throwable failure) {
}
});

call(AsyncCallable.ofFuture(callable), retryPolicy, executor, future);
call(AsyncCallable.ofFuture(callable), retryPolicy, scheduler, future);
return response;
}

Expand All @@ -63,23 +81,45 @@ public static <T> T get(Callable<T> callable, RetryPolicy retryPolicy) {
*/
public static <T> RecurrentFuture<T> get(Callable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(AsyncCallable.of(callable), retryPolicy, executor, null);
return call(AsyncCallable.of(callable), retryPolicy, Schedulers.of(executor), null);
}

/**
* Invokes the {@code callable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static <T> RecurrentFuture<T> get(Callable<T> callable, RetryPolicy retryPolicy, Scheduler scheduler) {
return call(AsyncCallable.of(callable), retryPolicy, scheduler, null);
}

/**
* Invokes the {@code callable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static <T> RecurrentFuture<T> get(ContextualCallable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(AsyncCallable.of(callable), retryPolicy, executor, null);
return call(AsyncCallable.of(callable), retryPolicy, Schedulers.of(executor), null);
}

/**
* Invokes the {@code callable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static <T> RecurrentFuture<T> get(ContextualCallable<T> callable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(AsyncCallable.of(callable), retryPolicy, scheduler, null);
}

/**
* Invokes the {@code runnable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static RecurrentFuture<?> run(ContextualRunnable runnable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(AsyncCallable.of(runnable), retryPolicy, executor, null);
return call(AsyncCallable.of(runnable), retryPolicy, Schedulers.of(executor), null);
}

/**
* Invokes the {@code runnable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static RecurrentFuture<?> run(ContextualRunnable runnable, RetryPolicy retryPolicy, Scheduler scheduler) {
return call(AsyncCallable.of(runnable), retryPolicy, scheduler, null);
}

/**
Expand All @@ -97,18 +137,26 @@ public static void run(Runnable runnable, RetryPolicy retryPolicy) {
* Invokes the {@code runnable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static RecurrentFuture<?> run(Runnable runnable, RetryPolicy retryPolicy, ScheduledExecutorService executor) {
return call(AsyncCallable.of(runnable), retryPolicy, executor, null);
return call(AsyncCallable.of(runnable), retryPolicy, Schedulers.of(executor), null);
}

/**
* Invokes the {@code runnable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
*/
public static RecurrentFuture<?> run(Runnable runnable, RetryPolicy retryPolicy, Scheduler scheduler) {
return call(AsyncCallable.of(runnable), retryPolicy, scheduler, null);
}

/**
* Calls the {@code callable} via the {@code executor}, performing retries according to the {@code retryPolicy}.
*/
@SuppressWarnings("unchecked")
private static <T> RecurrentFuture<T> call(final AsyncCallable<T> callable, final RetryPolicy retryPolicy,
final ScheduledExecutorService executor, RecurrentFuture<T> future) {
Scheduler scheduler, RecurrentFuture<T> future) {
if (future == null)
future = new RecurrentFuture<T>(executor);
callable.initialize(new Invocation(retryPolicy), future, executor);
future.setFuture(executor.submit(callable));
future = new RecurrentFuture<T>(scheduler);
callable.initialize(new Invocation(retryPolicy), future, scheduler);
future.setFuture((Future<T>) scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS));
return future;
}

Expand Down
51 changes: 32 additions & 19 deletions src/main/java/net/jodah/recurrent/RecurrentFuture.java
@@ -1,5 +1,6 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -11,8 +12,14 @@
import net.jodah.recurrent.event.SuccessListener;
import net.jodah.recurrent.internal.util.concurrent.ReentrantCircuit;

/**
* A future result of an asynchronous operation.
*
* @author Jonathan Halterman
* @param <T> result type
*/
public class RecurrentFuture<T> implements Future<T> {
private final ExecutorService executor;
private final Scheduler scheduler;
private volatile Future<T> delegate;
private volatile boolean done;
private volatile boolean cancelled;
Expand All @@ -31,8 +38,8 @@ public class RecurrentFuture<T> implements Future<T> {
private volatile FailureListener asyncFailureListener;
private volatile ExecutorService failureExecutor;

RecurrentFuture(ExecutorService executor) {
this.executor = executor;
RecurrentFuture(Scheduler scheduler) {
this.scheduler = scheduler;
circuit.open();
}

Expand Down Expand Up @@ -72,16 +79,16 @@ public boolean isDone() {
}

public RecurrentFuture<T> whenComplete(CompletionListener<T> completionListener) {
if (!done)
this.completionListener = completionListener;
else
if (done)
completionListener.onCompletion(result, failure);
else
this.completionListener = completionListener;
return this;
}

public RecurrentFuture<T> whenCompleteAsync(CompletionListener<T> completionListener) {
if (done)
executor.submit(Callables.of(completionListener, result, failure));
scheduler.schedule(Callables.of(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
else
this.completionListener = completionListener;
return this;
Expand All @@ -98,16 +105,16 @@ public RecurrentFuture<T> whenCompleteAsync(CompletionListener<T> completionList
}

public RecurrentFuture<T> whenFailure(FailureListener failureListener) {
if (!done)
this.failureListener = failureListener;
else
if (done)
failureListener.onFailure(failure);
else
this.failureListener = failureListener;
return this;
}

public RecurrentFuture<T> whenFailureAsync(FailureListener failureListener) {
if (done)
executor.submit(Callables.of(failureListener, failure));
scheduler.schedule(Callables.of(failureListener, failure), 0, TimeUnit.MILLISECONDS);
else
this.failureListener = failureListener;
return this;
Expand All @@ -124,16 +131,16 @@ public RecurrentFuture<T> whenFailureAsync(FailureListener failureListener, Exec
}

public RecurrentFuture<T> whenSuccess(SuccessListener<T> successListener) {
if (!done)
this.successListener = successListener;
else
if (done)
successListener.onSuccess(result);
else
this.successListener = successListener;
return this;
}

public RecurrentFuture<T> whenSuccessAsync(SuccessListener<T> successListener) {
if (done)
executor.submit(Callables.of(successListener, result));
scheduler.schedule(Callables.of(successListener, result), 0, TimeUnit.MILLISECONDS);
else
this.successListener = successListener;
return this;
Expand All @@ -157,13 +164,12 @@ synchronized void complete(T result, Throwable failure) {

// Async callbacks
if (asyncCompletionListener != null)
(completionExecutor == null ? executor : completionExecutor)
.submit(Callables.of(asyncCompletionListener, result, failure));
performAsyncCallback(Callables.of(asyncCompletionListener, result, failure), completionExecutor);
if (failure == null) {
if (asyncSuccessListener != null)
(successExecutor == null ? executor : successExecutor).submit(Callables.of(asyncSuccessListener, result));
performAsyncCallback(Callables.of(asyncSuccessListener, result), successExecutor);
} else if (asyncFailureListener != null)
(failureExecutor == null ? executor : failureExecutor).submit(Callables.of(asyncFailureListener, failure));
performAsyncCallback(Callables.of(asyncFailureListener, failure), failureExecutor);

// Sync callbacks
if (completionListener != null)
Expand All @@ -178,4 +184,11 @@ synchronized void complete(T result, Throwable failure) {
void setFuture(Future<T> delegate) {
this.delegate = delegate;
}

private void performAsyncCallback(Callable<T> callable, ExecutorService executor) {
if (executor != null)
executor.submit(callable);
else
scheduler.schedule(callable, 0, TimeUnit.MILLISECONDS);
}
}
17 changes: 17 additions & 0 deletions src/main/java/net/jodah/recurrent/Scheduler.java
@@ -0,0 +1,17 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Schedules invocations.
*
* @author Jonathan Halterman
*/
public interface Scheduler {
/**
* Schedules the {@code callable} to be called after the {@code delay} for the {@code unit}.
*/
ScheduledFuture<?> schedule(Callable<?> callable, long delay, TimeUnit unit);
}
25 changes: 25 additions & 0 deletions src/main/java/net/jodah/recurrent/Schedulers.java
@@ -0,0 +1,25 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* {@link Scheduler} utilities.
*
* @author Jonathan Halterman
*/
public final class Schedulers {
private Schedulers() {
}

public static Scheduler of(final ScheduledExecutorService executor) {
return new Scheduler() {
@Override
public ScheduledFuture<?> schedule(Callable<?> callable, long delay, TimeUnit unit) {
return executor.schedule(callable, delay, unit);
}
};
}
}

0 comments on commit 7d6936e

Please sign in to comment.