Skip to content

Commit

Permalink
Removed schedulers
Browse files Browse the repository at this point in the history
Simplified recurrent API.
Updated Dox
  • Loading branch information
jhalterman committed Jun 30, 2015
1 parent 30f52c1 commit a83357d
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 140 deletions.
57 changes: 38 additions & 19 deletions README.md
@@ -1,52 +1,71 @@
# Recurrent

*Sophisticated retries made simple.*
*Simple, sophisticated retries.*

## Introduction

Every developer has rolled their own retries.

Recurrent is a simple yet sophisticated library for performing retries. It features:
Recurrent is a simple, zero-dependency library for performing retries. It features:

* Zero external dependencies
* Synchronous and asynchronous retries
* Transparent integration into existing APIs
* Java 8 friendly functional interfaces
* Java 6/7/8 supported with Java 8 friendly functional interfaces
* Simple integration with asynchronous libraries

## Usage

Create a RetryPolicy:
#### Retry Policies

Recurrent supports flexible retry policies that allow you to express the number of retries, delay between attempts, delay between attempts including backoff, and maximum duration:

```java
RetryPolicy retryPolicy = new RetryPolicy()
.withBackoff(1, 30, TimeUnit.SECONDS)
.withMaxRetries(100);
```

Make a call with retries according to your RetryPolicy:
#### Synchronous Retries

```
Recurrent.doWithRetries(() -> connect(), retryPolicy, scheduler)
.whenComplete((connection, failure) -> {
if (connect != null)
log.info("Connection established", connection);
else
log.error("Connection attempt failed", failure);
});
Synchronous invocations are performed and retried in the calling thread until the invocation succeeds or the retry policy is exceeded:

```java
Connection connection = Recurrent.withRetries(() -> connect(), retryPolicy);
```

## Integrations
#### Asynchronous Retries

Recurrent was designed to play nicely with asynchronous libraries. Here are some example integrations:
Asynchronous invocations are performed and retried on a scheduled executor. When the invocation succeeds or the retry policy is exceeded, the resulting ListenableFuture is completed and any CompletionListeners registered against it are called:

### Netty
```java
Recurrent.withRetries(() -> connect(), retryPolicy, executor)
.whenComplete((connection, failure) -> {
if (connection != null)
log.info("Connection established");
else
log.error("Connection attempts failed", failure);
});
```

#### Integrating with Asynchronous Code

Asynchronous code often reports failures via callbacks rather than throwing an exception. Recurrent provides nice integration with asynchronous code by allowing you to manually trigger retries as necessary:

```java
Recurrent.withRetries((invocation) -> {
someService.connect(host, port).addListener((connection, failure) -> {
if (connection != null)
log.info("Connection established");
else if (!invocation.retry())
log.error("Connection attempts failed", failure)
}
}, retryPolicy, eventLoopGroup));
```

### RxJava
## Notes

#### On API Integration

A great way to add support for retries to your project's public API is to subclass the RetryPolicy class. This can allow you to offer retry support and policies to your users while keeping Recurrent hidden.
## Docs
Expand Down
18 changes: 10 additions & 8 deletions src/main/java/net/jodah/recurrent/CompletableFuture.java
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -11,7 +12,7 @@
* A future that is completable.
*/
class CompletableFuture<T> implements ListenableFuture<T> {
private Scheduler scheduler;
private ScheduledExecutorService executor;
private volatile Future<T> delegate;
private volatile boolean done;
private volatile CompletionListener<T> completionListener;
Expand All @@ -20,8 +21,8 @@ class CompletableFuture<T> implements ListenableFuture<T> {
private volatile T result;
private volatile Throwable failure;

CompletableFuture(Scheduler scheduler) {
this.scheduler = scheduler;
CompletableFuture(ScheduledExecutorService executor) {
this.executor = executor;
}

void setFuture(Future<T> delegate) {
Expand All @@ -36,7 +37,7 @@ public void complete(T result, Throwable failure) {
waiter.interruptWaiters();

if (asyncCompletionListener != null)
scheduler.schedule(Callables.callable(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.callable(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS);
if (completionListener != null)
completionListener.onCompletion(result, failure);
}
Expand Down Expand Up @@ -98,19 +99,20 @@ public ListenableFuture<T> whenComplete(CompletionListener<T> completionListener
@Override
public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener) {
if (done)
scheduler.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
else
this.completionListener = completionListener;
return this;
}

@Override
public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener, Scheduler scheduler) {
public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener,
ScheduledExecutorService executor) {
if (done)
scheduler.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
else {
this.asyncCompletionListener = completionListener;
this.scheduler = scheduler;
this.executor = executor;
}
return this;
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/net/jodah/recurrent/Invocation.java
@@ -1,6 +1,7 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import net.jodah.recurrent.util.Duration;
Expand All @@ -14,22 +15,22 @@
public class Invocation {
private RetryPolicy retryPolicy;
private Callable<?> callable;
private Scheduler scheduler;
private ScheduledExecutorService executor;
private long startTime;

/** count of failed attempts */
int attemptCount;
/** wait time in nanoseconds */
long waitTime;

Invocation(Callable<?> callable, RetryPolicy retryPolicy, Scheduler scheduler) {
initialize(callable, retryPolicy, scheduler);
Invocation(Callable<?> callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) {
initialize(callable, retryPolicy, executor);
}

void initialize(Callable<?> callable, RetryPolicy retryPolicy, Scheduler scheduler) {
void initialize(Callable<?> callable, RetryPolicy retryPolicy, ScheduledExecutorService executor) {
this.callable = callable;
this.retryPolicy = retryPolicy;
this.scheduler = scheduler;
this.executor = executor;

waitTime = retryPolicy.getDelay().toNanos();
startTime = System.nanoTime();
Expand All @@ -41,7 +42,7 @@ void initialize(Callable<?> callable, RetryPolicy retryPolicy, Scheduler schedul
public boolean retry() {
recordFailedAttempt();
if (!isPolicyExceeded()) {
scheduler.schedule(callable, waitTime, TimeUnit.NANOSECONDS);
executor.schedule(callable, waitTime, TimeUnit.NANOSECONDS);
return true;
}
return false;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/net/jodah/recurrent/ListenableFuture.java
@@ -1,11 +1,12 @@
package net.jodah.recurrent;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

public interface ListenableFuture<T> extends Future<T> {
ListenableFuture<T> whenComplete(CompletionListener<T> completionListener);

ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener);
ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener, Scheduler scheduler);

ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener, ScheduledExecutorService executor);
}
118 changes: 60 additions & 58 deletions src/main/java/net/jodah/recurrent/Recurrent.java
@@ -1,87 +1,89 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public final class Recurrent {
private Recurrent() {
}

public static ListenableFuture<?> doScheduledWithRetries(RetryableRunnable runnable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(runnable), retryPolicy, scheduler, true);
}

public static ListenableFuture<?> doScheduledWithRetries(Runnable runnable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(retryable(runnable)), retryPolicy, scheduler, true);
}

public static ListenableFuture<?> doWithRetries(RetryableRunnable runnable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(runnable), retryPolicy, scheduler, false);
}

/**
* Invokes the {@code runnable}, sleeping between invocation attempts according to the {@code retryPolicy}.
* Returns a RetryableCallable for the {@code callable}.
*/
public static void doWithRetries(Runnable runnable, RetryPolicy retryPolicy) {
call(Callables.callable(runnable), retryPolicy);
public static <T> RetryableCallable<T> retryable(final Callable<T> callable) {
return new RetryableCallable<T>() {
@Override
public T call(Invocation invocation) throws Exception {
return callable.call();
}
};
}

/**
* Invokes the {@code runnable}, scheduling retries with the {@code scheduler} according to the {@code retryPolicy}.
* Returns a RetryableRunnable for the {@code runnable}.
*/
public static ListenableFuture<?> doWithRetries(Runnable runnable, RetryPolicy retryPolicy, Scheduler scheduler) {
return call(ContextualCallable.of(retryable(runnable)), retryPolicy, scheduler, false);
public static RetryableRunnable retryable(final Runnable runnable) {
return new RetryableRunnable() {
@Override
public void run(Invocation invocation) {
runnable.run();
}
};
}

public static <T> T withRetries(Callable<T> callable, RetryPolicy retryPolicy) {
return call(callable, retryPolicy);
}

public static <T> ListenableFuture<T> getScheduledGetWithRetries(RetryableCallable<T> callable,
RetryPolicy retryPolicy, Scheduler scheduler) {
return call(ContextualCallable.of(callable), retryPolicy, scheduler, true);
public static <T> ListenableFuture<T> withRetries(Callable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(ContextualCallable.of(retryable(callable)), retryPolicy, executor, false);
}

public static <T> ListenableFuture<T> getScheduledWithRetries(Callable<T> callable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(retryable(callable)), retryPolicy, scheduler, true);
public static <T> ListenableFuture<T> withRetries(Callable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor, boolean initialCallSynchronous) {
return call(ContextualCallable.of(retryable(callable)), retryPolicy, executor, true);
}

public static <T> T getWithRetries(Callable<T> callable, RetryPolicy retryPolicy) {
return call(callable, retryPolicy);
public static <T> ListenableFuture<T> withRetries(RetryableCallable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(ContextualCallable.of(callable), retryPolicy, executor, false);
}

public static <T> ListenableFuture<T> getWithRetries(Callable<T> callable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(retryable(callable)), retryPolicy, scheduler, false);
public static <T> ListenableFuture<T> withRetries(RetryableCallable<T> callable, RetryPolicy retryPolicy,
ScheduledExecutorService executor, boolean initialCallSynchronous) {
return call(ContextualCallable.of(callable), retryPolicy, executor, true);
}

public static <T> ListenableFuture<T> getWithRetries(RetryableCallable<T> callable, RetryPolicy retryPolicy,
Scheduler scheduler) {
return call(ContextualCallable.of(callable), retryPolicy, scheduler, false);
public static ListenableFuture<?> withRetries(RetryableRunnable runnable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(ContextualCallable.of(runnable), retryPolicy, executor, false);
}

public static ListenableFuture<?> withRetries(RetryableRunnable runnable, RetryPolicy retryPolicy,
ScheduledExecutorService executor, boolean initialCallSynchronous) {
return call(ContextualCallable.of(runnable), retryPolicy, executor, true);
}

/**
* Returns a RetryableCallable for the {@code callable}.
* Invokes the {@code runnable}, sleeping between invocation attempts according to the {@code retryPolicy}.
*/
public static <T> RetryableCallable<T> retryable(final Callable<T> callable) {
return new RetryableCallable<T>() {
@Override
public T call(Invocation invocation) throws Exception {
return callable.call();
}
};
public static void withRetries(Runnable runnable, RetryPolicy retryPolicy) {
call(Callables.callable(runnable), retryPolicy);
}

/**
* Returns a RetryableRunnable for the {@code runnable}.
* Invokes the {@code runnable}, scheduling retries with the {@code executor} according to the {@code retryPolicy}.
*/
public static RetryableRunnable retryable(final Runnable runnable) {
return new RetryableRunnable() {
@Override
public void run(Invocation invocation) {
runnable.run();
}
};
public static ListenableFuture<?> withRetries(Runnable runnable, RetryPolicy retryPolicy,
ScheduledExecutorService executor) {
return call(ContextualCallable.of(retryable(runnable)), retryPolicy, executor, false);
}

public static ListenableFuture<?> withRetries(Runnable runnable, RetryPolicy retryPolicy,
ScheduledExecutorService executor, boolean initialCallSynchronous) {
return call(ContextualCallable.of(retryable(runnable)), retryPolicy, executor, true);
}

/**
Expand Down Expand Up @@ -111,12 +113,12 @@ private static <T> T call(Callable<T> callable, RetryPolicy retryPolicy) {
}

/**
* Calls the {@code callable} via the {@code scheduler}, performing retries according to the {@code retryPolicy}.
* Calls the {@code callable} via the {@code executor}, performing retries according to the {@code retryPolicy}.
*/
private static <T> ListenableFuture<T> call(final ContextualCallable<T> callable, final RetryPolicy retryPolicy,
final Scheduler scheduler, boolean scheduleInitialCall) {
final CompletableFuture<T> future = new CompletableFuture<T>(scheduler);
final Invocation invocation = new Invocation(callable, retryPolicy, scheduler);
final ScheduledExecutorService executor, boolean scheduleInitialCall) {
final CompletableFuture<T> future = new CompletableFuture<T>(executor);
final Invocation invocation = new Invocation(callable, retryPolicy, executor);

callable.initialize(invocation, new CompletionListener<T>() {
public void onCompletion(T result, Throwable failure) {
Expand All @@ -131,12 +133,12 @@ public void onCompletion(T result, Throwable failure) {
if (invocation.isPolicyExceeded())
future.complete(result, failure);
else
future.setFuture(scheduler.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
}
});

if (scheduleInitialCall) {
future.setFuture(scheduler.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
} else {
try {
callable.call();
Expand All @@ -146,7 +148,7 @@ public void onCompletion(T result, Throwable failure) {
future.complete(null, t);

// Asynchronous retry
future.setFuture(scheduler.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
future.setFuture(executor.schedule(callable, invocation.waitTime, TimeUnit.NANOSECONDS));
}
}

Expand Down

0 comments on commit a83357d

Please sign in to comment.