Skip to content

Commit

Permalink
Reimplemented futures with better primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
spkrka committed Nov 6, 2015
1 parent 2febcd5 commit 20b8d21
Showing 1 changed file with 29 additions and 51 deletions.
80 changes: 29 additions & 51 deletions src/main/java/com/spotify/futures/CompletableFuturesExtra.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -98,36 +97,7 @@ public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable th
public static <T, U> CompletionStage<U> handleCompose(
CompletionStage<T> future,
final BiFunction<? super T, Throwable, ? extends CompletionStage<U>> fn) {

final CompletableFuture<U> result = new CompletableFuture<U>();

future.whenComplete(new BiConsumer<T, Throwable>() {
@Override
public void accept(T value, Throwable throwable) {
final CompletionStage<U> newStage;
try {
newStage = fn.apply(value, throwable);
} catch (Throwable e) {
result.completeExceptionally(e);
return;
}
if (newStage == null) {
result.completeExceptionally(new NullPointerException("fn returned null"));
} else {
newStage.whenComplete(new BiConsumer<U, Throwable>() {
@Override
public void accept(U value, Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});
}
}
});
return result;
return dereference(future.handle(fn));
}

/**
Expand All @@ -147,19 +117,10 @@ public void accept(U value, Throwable throwable) {
* exceptionally
* @return the new CompletionStage
*/
public static <U, T extends U> CompletionStage<U> exceptionallyCompose(
public static <T> CompletionStage<T> exceptionallyCompose(
CompletionStage<T> future,
final Function<Throwable, ? extends CompletionStage<U>> fn) {
return handleCompose(future, new BiFunction<T, Throwable, CompletionStage<U>>() {
@Override
public CompletionStage<U> apply(T value, Throwable throwable) {
if (throwable != null) {
return fn.apply(throwable);
} else {
return CompletableFuture.<U>completedFuture(value);
}
}
});
final Function<Throwable, ? extends CompletionStage<T>> fn) {
return dereference(wrap(future).exceptionally(fn));
}

/**
Expand All @@ -170,14 +131,8 @@ public CompletionStage<U> apply(T value, Throwable throwable) {
*/
public static <T> CompletionStage<T> dereference(
CompletionStage<? extends CompletionStage<T>> future) {
return handleCompose(future,
new BiFunction<CompletionStage<T>, Throwable, CompletionStage<T>>() {
@Override
public CompletionStage<T> apply(
CompletionStage<T> value, Throwable throwable) {
return value;
}
});
//noinspection unchecked
return future.thenCompose(Identity.INSTANCE);
}

/**
Expand Down Expand Up @@ -210,4 +165,27 @@ public static <T> T getCompleted(CompletionStage<T> stage) {
throw Throwables.propagate(e.getCause());
}
}

private enum Identity implements Function {
INSTANCE;

@Override
public Object apply(Object o) {
return o;
}
}

private enum WrapFunction implements Function {
INSTANCE;

@Override
public Object apply(Object o) {
return CompletableFuture.completedFuture(o);
}
}

private static <T> CompletionStage<CompletionStage<T>> wrap(CompletionStage<T> future) {
//noinspection unchecked
return future.thenApply((Function<T, CompletionStage<T>>) WrapFunction.INSTANCE);
}
}

0 comments on commit 20b8d21

Please sign in to comment.