Skip to content

Commit

Permalink
Added support for synchronous and asynchronous listeners!
Browse files Browse the repository at this point in the history
Added contextual callbacks to RecurrentFuture
Added argument validation in all public APIs
Added better completion checking for synchronous retries
  • Loading branch information
jhalterman committed Aug 22, 2015
1 parent 7ec2a64 commit 1e62f09
Show file tree
Hide file tree
Showing 31 changed files with 1,532 additions and 288 deletions.
6 changes: 4 additions & 2 deletions README.md
Expand Up @@ -6,7 +6,7 @@

## Introduction

Recurrent is a simple, zero-dependency library for performing retries on Java 1.6+. It features:
Recurrent is a simple, zero-dependency library for performing retries. It features:

* [Flexible retry policies](#retry-policies)
* [Synchronous](synchronous-retries) and [asynchronous retries](#asynchronous-retries)
Expand All @@ -15,6 +15,8 @@ Recurrent is a simple, zero-dependency library for performing retries on Java 1.
* [Invocation Tracking](#invocation-tracking)
* [Public API integration](#public-api-integration)

Supports Java 6+ though the documentation uses lambdas for simplicity.

## Usage

#### Retry Policies
Expand Down Expand Up @@ -77,7 +79,7 @@ Asynchronous invocations can be performed and retried on a scheduled executor an
```java
Recurrent.get(() -> connect(), retryPolicy, executor)
.whenSuccess(connection -> log.info("Connected to {}", connection))
.whenFailure(failure -> log.error("Connection attempts failed", failure));
.whenFailure((result, failure) -> log.error("Connection attempts failed", failure));
```

#### Asynchronous API Integration
Expand Down
43 changes: 27 additions & 16 deletions src/main/java/net/jodah/recurrent/AsyncCallable.java
Expand Up @@ -3,6 +3,8 @@
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;

import net.jodah.recurrent.internal.util.Assert;

/**
* An asynchronous callable with references to recurrent invocation information.
*
Expand All @@ -13,61 +15,67 @@ abstract class AsyncCallable<T> implements Callable<T> {
protected AsyncInvocation invocation;

static <T> AsyncCallable<T> of(final Callable<T> callable) {
Assert.notNull(callable, "callable");
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
invocation.reset();
T result = callable.call();
invocation.retryOrComplete(result, null);
invocation.completeOrRetry(result, null);
return result;
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
return null;
}
}
};
}

static <T> AsyncCallable<T> of(final ContextualCallable<T> callable) {
Assert.notNull(callable, "callable");
return new AsyncCallable<T>() {
@Override
public synchronized T call() throws Exception {
try {
invocation.reset();
return callable.call(invocation);
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
return null;
}
}
};
}

static AsyncCallable<?> of(final ContextualRunnable runnable) {
return new AsyncCallable<Object>() {
static <T> AsyncCallable<T> of(final ContextualRunnable runnable) {
Assert.notNull(runnable, "runnable");
return new AsyncCallable<T>() {
@Override
public synchronized Void call() throws Exception {
public synchronized T call() throws Exception {
try {
invocation.reset();
runnable.run(invocation);
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
}

return null;
}
};
}

static AsyncCallable<?> of(final Runnable runnable) {
return new AsyncCallable<Object>() {
static <T> AsyncCallable<T> of(final Runnable runnable) {
Assert.notNull(runnable, "runnable");
return new AsyncCallable<T>() {
@Override
public Void call() throws Exception {
public T call() throws Exception {
try {
invocation.reset();
runnable.run();
invocation.retryOrComplete(null, null);
invocation.completeOrRetry(null, null);
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
}

return null;
Expand All @@ -76,18 +84,20 @@ public Void call() throws Exception {
}

static <T> AsyncCallable<T> ofFuture(final Callable<java.util.concurrent.CompletableFuture<T>> callable) {
Assert.notNull(callable, "callable");
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
invocation.reset();
callable.call().whenComplete(new BiConsumer<T, Throwable>() {
@Override
public void accept(T innerResult, Throwable failure) {
invocation.retryOrComplete(innerResult, failure);
invocation.completeOrRetry(innerResult, failure);
}
});
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
}

return null;
Expand All @@ -96,6 +106,7 @@ public void accept(T innerResult, Throwable failure) {
}

static <T> AsyncCallable<T> ofFuture(final ContextualCallable<java.util.concurrent.CompletableFuture<T>> callable) {
Assert.notNull(callable, "callable");
return new AsyncCallable<T>() {
@Override
public synchronized T call() throws Exception {
Expand All @@ -105,11 +116,11 @@ public synchronized T call() throws Exception {
@Override
public void accept(T innerResult, Throwable failure) {
if (failure != null)
invocation.retryOrComplete(innerResult, failure);
invocation.completeOrRetry(innerResult, failure);
}
});
} catch (Exception e) {
invocation.retryOrComplete(null, e);
invocation.completeOrRetry(null, e);
}

return null;
Expand Down
95 changes: 66 additions & 29 deletions src/main/java/net/jodah/recurrent/AsyncInvocation.java
Expand Up @@ -13,16 +13,19 @@
public class AsyncInvocation extends Invocation {
private final AsyncCallable<Object> callable;
private final RecurrentFuture<Object> future;
private final AsyncListeners<Object> listeners;
private final Scheduler scheduler;
volatile boolean retried;
volatile boolean completeCalled;
volatile boolean retryCalled;

@SuppressWarnings("unchecked")
<T> AsyncInvocation(AsyncCallable<T> callable, RetryPolicy retryPolicy, Scheduler scheduler,
RecurrentFuture<T> future) {
RecurrentFuture<T> future, AsyncListeners<T> listeners) {
super(retryPolicy);
this.callable = (AsyncCallable<Object>) callable;
this.scheduler = scheduler;
this.future = (RecurrentFuture<Object>) future;
this.listeners = (AsyncListeners<Object>) listeners;
}

/**
Expand All @@ -49,6 +52,9 @@ public boolean complete(Object result) {
* Attempts to complete the invocation and the associated {@code RecurrentFuture} with the {@code result} and
* {@code failure}. Returns true on success, else false if completion failed and should be retried via
* {@link #retry()}.
* <p>
* Note: the invocation may be completed even when the {@code failure} is not {@code null}, such as when the
* RetryPolicy does not allow retries for the {@code failure}.
*
* @throws IllegalStateException if the invocation is already complete
*/
Expand All @@ -63,7 +69,9 @@ public boolean complete(Object result, Throwable failure) {
* @throws IllegalStateException if a retry method has already been called or the invocation is already complete
*/
public boolean retry() {
return retryInternal(lastResult, lastFailure);
Assert.state(!retryCalled, "Retry has already been called");
retryCalled = true;
return completeOrRetry(lastResult, lastFailure);
}

/**
Expand All @@ -73,7 +81,7 @@ public boolean retry() {
* @throws IllegalStateException if a retry method has already been called or the invocation is already complete
*/
public boolean retryFor(Object result) {
return retryInternal(result, null);
return retryFor(result, null);
}

/**
Expand All @@ -83,7 +91,9 @@ public boolean retryFor(Object result) {
* @throws IllegalStateException if a retry method has already been called or the invocation is already complete
*/
public boolean retryFor(Object result, Throwable failure) {
return retryInternal(result, failure);
Assert.state(!retryCalled, "Retry has already been called");
retryCalled = true;
return completeOrRetry(result, failure);
}

/**
Expand All @@ -95,45 +105,72 @@ public boolean retryFor(Object result, Throwable failure) {
*/
public boolean retryOn(Throwable failure) {
Assert.notNull(failure, "failure");
return retryInternal(null, failure);
return retryFor(null, failure);
}

/**
* Resets the retry flag.
*/
void reset() {
retried = false;
completeCalled = false;
retryCalled = false;
}

/**
* Retries the invocation if necessary else completes it.
* Attempts to complete the parent invocation followed by the future.
*
* @throws IllegalStateException if the invocation is already complete
*/
void retryOrComplete(Object result, Throwable failure) {
if (!retry(result, failure))
future.complete(result, failure);
}

private boolean completeInternal(Object result, Throwable failure, boolean checkArgs) {
boolean complete = super.complete(result, failure, checkArgs);
if (complete)
future.complete(result, failure);
return complete;
boolean completed = super.complete(result, failure, checkArgs);
boolean success = completed && failure == null;

// Handle failure
if (!success && !completeCalled && listeners != null)
listeners.handleFailedAttempt(result, failure, this, scheduler);

// Handle completed
if (completed) {
if (listeners != null)
listeners.complete(result, failure, this, success);
future.complete(result, failure, success);
}

completeCalled = true;
return completed;
}

/**
* Attempts to complete the invocation else schedule a retry.
*
* @throws IllegalStateException if the invocation is already complete
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean retry(Object result, Throwable failure) {
boolean canRetry = canRetryFor(result, failure);
if (canRetry && !future.isDone() && !future.isCancelled())
boolean completeOrRetry(Object result, Throwable failure) {
boolean completed = super.complete(result, failure, true);
boolean success = completed && failure == null;
boolean shouldRetry = completed ? false
: canRetryForInternal(result, failure) && !future.isDone() && !future.isCancelled();

// Handle failure
if (!success && !completeCalled && listeners != null)
listeners.handleFailedAttempt(result, failure, this, scheduler);

// Handle retry needed
if (shouldRetry) {
if (listeners != null)
listeners.handleRetry(result, failure, this, scheduler);
future.setFuture((Future) scheduler.schedule(callable, waitTime, TimeUnit.NANOSECONDS));
return canRetry;
}
}

// Handle completed
if (completed || !shouldRetry) {
if (listeners != null)
listeners.complete(result, failure, this, success);
future.complete(result, failure, success);
}

private boolean retryInternal(Object result, Throwable failure) {
Assert.state(!retried, "Retry has already been called");
retried = true;
boolean retrying = retry(result, failure);
if (!retrying)
future.complete(result, failure);
return retrying;
completeCalled = true;
return shouldRetry;
}
}

0 comments on commit 1e62f09

Please sign in to comment.