Skip to content

Commit

Permalink
Various API consolodations/simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Jul 1, 2015
1 parent 63acb91 commit af9a92b
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 165 deletions.
19 changes: 7 additions & 12 deletions README.md
Expand Up @@ -38,12 +38,8 @@ Asynchronous invocations are performed and retried on a scheduled executor. When

```java
Recurrent.withRetries(() -> connect(), retryPolicy, executor)
.whenComplete((connection, failure) -> {
if (connection != null)
log.info("Connection established");
else
log.error("Connection attempts failed", failure);
});
.whenSuccess((connection) -> log.info("Connection established to {}", connection)
.whenFailure((failure) -> log.error("Connection attempts failed", failure));
```

#### Integrating with Asynchronous Code
Expand All @@ -52,13 +48,12 @@ Asynchronous code often reports failures via callbacks rather than throwing an e

```java
Recurrent.withRetries((invocation) -> {
someService.connect(host, port).addListener((connection, failure) -> {
if (connection != null)
log.info("Connection established");
else if (!invocation.retry())
log.error("Connection attempts failed", failure)
someService.connect(host, port).addFailureListener((failure) -> {
// Manually retry invocation
if (!invocation.retry())
log.error("Connection attempts failed", failure)
}
}, retryPolicy, eventLoopGroup));
}, retryPolicy, executor));
```

## Notes
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -48,6 +48,12 @@
<version>0.3.4-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
85 changes: 85 additions & 0 deletions src/main/java/net/jodah/recurrent/AsyncCallable.java
@@ -0,0 +1,85 @@
package net.jodah.recurrent;

import java.util.concurrent.Callable;

import net.jodah.recurrent.event.CompletionListener;

/**
* A callable that performs async callbacks.
*
* @author Jonathan Halterman
* @param <T> result type
*/
abstract class AsyncCallable<T> implements Callable<T> {
protected Invocation invocation;
protected CompletionListener<T> listener;

static <T> AsyncCallable<T> of(final Callable<T> callable) {
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
T result = callable.call();
listener.onCompletion(result, null);
return result;
} catch (Exception e) {
listener.onCompletion(null, e);
return null;
}
}
};
}

static <T> AsyncCallable<T> of(final RetryableCallable<T> callable) {
return new AsyncCallable<T>() {
@Override
public T call() throws Exception {
try {
T result = callable.call(invocation);
listener.onCompletion(result, null);
return result;
} catch (Exception e) {
listener.onCompletion(null, e);
return null;
}
}
};
}

static AsyncCallable<?> of(final RetryableRunnable runnable) {
return new AsyncCallable<Object>() {
@Override
public Void call() throws Exception {
try {
runnable.run(invocation);
listener.onCompletion(null, null);
} catch (Exception e) {
listener.onCompletion(null, e);
}

return null;
}
};
}

static AsyncCallable<?> of(final Runnable runnable) {
return new AsyncCallable<Object>() {
@Override
public Void call() throws Exception {
try {
runnable.run();
listener.onCompletion(null, null);
} catch (Exception e) {
listener.onCompletion(null, e);
}

return null;
}
};
}

void initialize(Invocation invocation, CompletionListener<T> listener) {
this.invocation = invocation;
this.listener = listener;
}
}
28 changes: 26 additions & 2 deletions src/main/java/net/jodah/recurrent/Callables.java
Expand Up @@ -2,13 +2,17 @@

import java.util.concurrent.Callable;

import net.jodah.recurrent.event.CompletionListener;
import net.jodah.recurrent.event.FailureListener;
import net.jodah.recurrent.event.SuccessListener;

/**
* Utilities for creating callables.
*
* @author Jonathan Halterman
*/
final class Callables {
static <T> Callable<T> callable(CompletionListener<T> listener, final T result, final Throwable failure) {
static <T> Callable<T> of(CompletionListener<T> listener, final T result, final Throwable failure) {
return new Callable<T>() {
@Override
public T call() {
Expand All @@ -18,7 +22,17 @@ public T call() {
};
}

static Callable<?> callable(final Runnable runnable) {
static <T> Callable<T> of(FailureListener listener, final Throwable failure) {
return new Callable<T>() {
@Override
public T call() {
listener.onFailure(failure);
return null;
}
};
}

static Callable<?> of(final Runnable runnable) {
return new Callable<Void>() {
@Override
public Void call() {
Expand All @@ -27,4 +41,14 @@ public Void call() {
}
};
}

static <T> Callable<T> of(SuccessListener<T> listener, final T result) {
return new Callable<T>() {
@Override
public T call() {
listener.onSuccess(result);
return null;
}
};
}
}
84 changes: 81 additions & 3 deletions src/main/java/net/jodah/recurrent/CompletableFuture.java
Expand Up @@ -6,6 +6,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.jodah.recurrent.event.CompletionListener;
import net.jodah.recurrent.event.FailureListener;
import net.jodah.recurrent.event.SuccessListener;
import net.jodah.recurrent.internal.util.concurrent.InterruptableWaiter;

/**
Expand All @@ -17,6 +20,10 @@ class CompletableFuture<T> implements ListenableFuture<T> {
private volatile boolean done;
private volatile CompletionListener<T> completionListener;
private volatile CompletionListener<T> asyncCompletionListener;
private volatile SuccessListener<T> successListener;
private volatile SuccessListener<T> asyncSuccessListener;
private volatile FailureListener failureListener;
private volatile FailureListener asyncFailureListener;
private volatile InterruptableWaiter waiter;
private volatile T result;
private volatile Throwable failure;
Expand All @@ -36,10 +43,23 @@ void complete(T result, Throwable failure) {
if (waiter != null)
waiter.interruptWaiters();

// Async callbacks
if (asyncCompletionListener != null)
executor.schedule(Callables.callable(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.of(asyncCompletionListener, result, failure), 0, TimeUnit.MILLISECONDS);
if (failure == null) {
if (asyncSuccessListener != null)
executor.schedule(Callables.of(asyncSuccessListener, result), 0, TimeUnit.MILLISECONDS);
} else if (asyncFailureListener != null)
executor.schedule(Callables.of(asyncFailureListener, failure), 0, TimeUnit.MILLISECONDS);

// Sync callbacks
if (completionListener != null)
completionListener.onCompletion(result, failure);
if (failure == null) {
if (successListener != null)
successListener.onSuccess(result);
} else if (failureListener != null)
failureListener.onFailure(failure);
}

@Override
Expand Down Expand Up @@ -99,7 +119,7 @@ public ListenableFuture<T> whenComplete(CompletionListener<T> completionListener
@Override
public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener) {
if (done)
executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.of(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
else
this.completionListener = completionListener;
return this;
Expand All @@ -109,11 +129,69 @@ public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionLis
public ListenableFuture<T> whenCompleteAsync(CompletionListener<T> completionListener,
ScheduledExecutorService executor) {
if (done)
executor.schedule(Callables.callable(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
executor.schedule(Callables.of(completionListener, result, failure), 0, TimeUnit.MILLISECONDS);
else {
this.asyncCompletionListener = completionListener;
this.executor = executor;
}
return this;
}

@Override
public ListenableFuture<T> whenFailure(FailureListener failureListener) {
if (!done)
this.failureListener = failureListener;
else
failureListener.onFailure(failure);
return this;
}

@Override
public ListenableFuture<T> whenFailureAsync(FailureListener failureListener) {
if (done)
executor.schedule(Callables.of(failureListener, failure), 0, TimeUnit.MILLISECONDS);
else
this.failureListener = failureListener;
return this;
}

@Override
public ListenableFuture<T> whenFailureAsync(FailureListener failureListener, ScheduledExecutorService executor) {
if (done)
executor.schedule(Callables.of(failureListener, failure), 0, TimeUnit.MILLISECONDS);
else {
this.asyncFailureListener = failureListener;
this.executor = executor;
}
return this;
}

@Override
public ListenableFuture<T> whenSuccess(SuccessListener<T> successListener) {
if (!done)
this.successListener = successListener;
else
successListener.onSuccess(result);
return this;
}

@Override
public ListenableFuture<T> whenSuccessAsync(SuccessListener<T> successListener) {
if (done)
executor.schedule(Callables.of(successListener, result), 0, TimeUnit.MILLISECONDS);
else
this.successListener = successListener;
return this;
}

@Override
public ListenableFuture<T> whenSuccessAsync(SuccessListener<T> successListener, ScheduledExecutorService executor) {
if (done)
executor.schedule(Callables.of(successListener, result), 0, TimeUnit.MILLISECONDS);
else {
this.asyncSuccessListener = successListener;
this.executor = executor;
}
return this;
}
}
56 changes: 0 additions & 56 deletions src/main/java/net/jodah/recurrent/ContextualCallable.java

This file was deleted.

0 comments on commit af9a92b

Please sign in to comment.