Skip to content

Commit

Permalink
Contextual callable and runnable invocations must be manually retried…
Browse files Browse the repository at this point in the history
… and manually completed on non-failures.
  • Loading branch information
jhalterman committed Aug 2, 2015
1 parent 7d6936e commit 6f30621
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 178 deletions.
16 changes: 7 additions & 9 deletions README.md
Expand Up @@ -83,14 +83,14 @@ Recurrent.get(() -> connect(), retryPolicy, executor)
Asynchronous code reports completion via indirect callbacks. Recurrent provides [ContextualRunnable] and [ContextualCallable] classes that can be used with a callback to manually perform retries or completion:

```java
Recurrent.get(invocation -> {
someService.connect(host, port).whenComplete((result, failure) -> {
if (failure == null)
invocation.complete(result);
Recurrent.get(invocation ->
service.connect().whenComplete((result, failure) -> {
if (invocation.complete(result, failure))
log.info("Connected");
else if (!invocation.retryOn(failure))
log.error("Connection attempts failed", failure);
}
}, retryPolicy, executor));
), retryPolicy, executor);
```

#### CompletableFuture Integration
Expand All @@ -115,15 +115,13 @@ Function<String, Connection> connect =
We can retry streams:

```java
Recurrent.run(() -> Stream.of("foo")
.map(value -> value + "bar"), retryPolicy);
Recurrent.run(() -> Stream.of("foo").map(value -> value + "bar"), retryPolicy);
```

Individual Stream operations:

```java
Stream.of("foo")
.map(value -> Recurrent.get(() -> value + "bar", retryPolicy));
Stream.of("foo").map(value -> Recurrent.get(() -> value + "bar", retryPolicy));
```

Or individual CompletableFuture stages:
Expand Down
88 changes: 36 additions & 52 deletions src/main/java/net/jodah/recurrent/AsyncCallable.java
@@ -1,8 +1,6 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
Expand All @@ -13,19 +11,17 @@
*/
abstract class AsyncCallable<T> implements Callable<T> {
protected Invocation invocation;
protected RecurrentFuture<T> future;
protected Scheduler scheduler;

static <T> AsyncCallable<T> of(final Callable<T> callable) {
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
T result = callable.call();
recordResult(result, null);
invocation.retryOrComplete(result, null);
return result;
} catch (Exception e) {
recordResult(null, e);
invocation.retryOrComplete(null, e);
return null;
}
}
Expand All @@ -35,13 +31,12 @@ public T call() throws Exception {
static <T> AsyncCallable<T> of(final ContextualCallable<T> callable) {
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
public synchronized T call() throws Exception {
try {
T result = callable.call(invocation);
recordResult(result, null);
return result;
invocation.reset();
return callable.call(invocation);
} catch (Exception e) {
recordResult(null, e);
invocation.retryOrComplete(null, e);
return null;
}
}
Expand All @@ -51,12 +46,12 @@ public T call() throws Exception {
static AsyncCallable<?> of(final ContextualRunnable runnable) {
return new AsyncCallable<Object>() {
@Override
public Void call() throws Exception {
public synchronized Void call() throws Exception {
try {
invocation.reset();
runnable.run(invocation);
recordResult(null, null);
} catch (Exception e) {
recordResult(null, e);
invocation.retryOrComplete(null, e);
}

return null;
Expand All @@ -70,9 +65,29 @@ static AsyncCallable<?> of(final Runnable runnable) {
public Void call() throws Exception {
try {
runnable.run();
recordResult(null, null);
invocation.retryOrComplete(null, null);
} catch (Exception e) {
invocation.retryOrComplete(null, e);
}

return null;
}
};
}

static <T> AsyncCallable<T> ofFuture(final Callable<java.util.concurrent.CompletableFuture<T>> callable) {
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
callable.call().whenComplete(new BiConsumer<T, Throwable>() {
@Override
public void accept(T innerResult, Throwable failure) {
invocation.retryOrComplete(innerResult, failure);
}
});
} catch (Exception e) {
recordResult(null, e);
invocation.retryOrComplete(null, e);
}

return null;
Expand All @@ -85,55 +100,24 @@ static <T> AsyncCallable<T> ofFuture(final ContextualCallable<java.util.concurre
@Override
public synchronized T call() throws Exception {
try {
invocation.reset();
callable.call(invocation).whenComplete(new BiConsumer<T, Throwable>() {
@Override
public void accept(T innerResult, Throwable failure) {
recordResult(innerResult, failure);
if (failure != null)
invocation.retryOrComplete(innerResult, failure);
}
});
} catch (Exception e) {
recordResult(null, e);
invocation.retryOrComplete(null, e);
}

return null;
}
};
}

void initialize(Invocation invocation, RecurrentFuture<T> future, Scheduler scheduler) {
void initialize(Invocation invocation) {
this.invocation = invocation;
this.future = future;
this.scheduler = scheduler;
}

/**
* Records an invocation result if necessary, else schedules a retry if necessary.
*/
@SuppressWarnings("unchecked")
void recordResult(T result, Throwable failure) {
if (invocation.retryRequested) {
invocation.reset();
invocation.adjustForMaxDuration();
scheduleRetry();
} else if (invocation.completionRequested) {
future.complete((T) invocation.result, invocation.failure);
invocation.reset();
} else {
if (invocation.canRetryWhen(result, failure))
scheduleRetry();
else
future.complete(result, failure);
}
}

/**
* Schedules a retry if the future is not done or cancelled.
*/
@SuppressWarnings("unchecked")
void scheduleRetry() {
synchronized (future) {
if (!future.isDone() && !future.isCancelled())
future.setFuture((Future<T>) scheduler.schedule(this, invocation.waitTime, TimeUnit.NANOSECONDS));
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/net/jodah/recurrent/ContextualCallable.java
@@ -1,5 +1,11 @@
package net.jodah.recurrent;

/**
* A callable that can manually trigger retries or completion for an invocation.
*
* @author Jonathan Halterman
* @param <T> result type
*/
public interface ContextualCallable<T> {
T call(Invocation invocation) throws Exception;
}
5 changes: 5 additions & 0 deletions src/main/java/net/jodah/recurrent/ContextualRunnable.java
@@ -1,5 +1,10 @@
package net.jodah.recurrent;

/**
* A runnable that can manually trigger retries or completion for an invocation.
*
* @author Jonathan Halterman
*/
public interface ContextualRunnable {
void run(Invocation invocation) throws Exception;
}

0 comments on commit 6f30621

Please sign in to comment.