Skip to content

Commit

Permalink
Merge pull request #17 from spotify/krka/completable_future_extra
Browse files Browse the repository at this point in the history
Add more utilities for completable futures
  • Loading branch information
spkrka committed Nov 11, 2015
2 parents 0939ee9 + cafcd41 commit ac0e9da
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 0 deletions.
137 changes: 137 additions & 0 deletions src/main/java/com/spotify/futures/CompletableFuturesExtra.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
*/
package com.spotify.futures;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;

public class CompletableFuturesExtra {

Expand Down Expand Up @@ -68,4 +72,137 @@ public static <V> ListenableFuture<V> toListenableFuture(
}
return new CompletableToListenableFutureWrapper<V>(future);
}

/**
* Returns a new CompletableFuture that is already exceptionally completed with
* the given exception.
*
* @param throwable the exception
* @return the exceptionally completed CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable throwable) {
final CompletableFuture<T> future = new CompletableFuture<T>();
future.completeExceptionally(throwable);
return future;
}

/**
* Returns a new stage that, when this stage completes
* either normally or exceptionally, is executed with this stage's
* result and exception as arguments to the supplied function.
*
* <p>When this stage is complete, the given function is invoked
* with the result (or {@code null} if none) and the exception (or
* {@code null} if none) of this stage as arguments, and the
* function's result is used to complete the returned stage.
*
* This differs from
* {@link java.util.concurrent.CompletionStage#handle(java.util.function.BiFunction)}
* in that the function should return a {@link java.util.concurrent.CompletionStage} rather than
* the value directly.
*
* @param stage the {@link CompletionStage} to compose
* @param fn the function to use to compute the value of the
* returned {@link CompletionStage}
* @param <U> the function's return type
* @return the new {@link CompletionStage}
*/
public static <T, U> CompletionStage<U> handleCompose(
CompletionStage<T> stage,
BiFunction<? super T, Throwable, ? extends CompletionStage<U>> fn) {
return dereference(stage.handle(fn));
}

/**
* Returns a new stage that, when this stage completes
* exceptionally, is executed with this stage's exception as the
* argument to the supplied function. Otherwise, if this stage
* completes normally, then the returned stage also completes
* normally with the same value.
*
* This differs from
* {@link java.util.concurrent.CompletionStage#exceptionally(java.util.function.Function)}
* in that the function should return a {@link java.util.concurrent.CompletionStage} rather than
* the value directly.
*
* @param stage the {@link CompletionStage} to compose
* @param fn the function to use to compute the value of the
* returned {@link CompletionStage} if this stage completed
* exceptionally
* @return the new {@link CompletionStage}
*/
public static <T> CompletionStage<T> exceptionallyCompose(
CompletionStage<T> stage,
Function<Throwable, ? extends CompletionStage<T>> fn) {
return dereference(wrap(stage).exceptionally(fn));
}

/**
* check that a stage is completed.
* @param stage a {@link CompletionStage}.
* @throws IllegalStateException if the stage is not completed.
*/
public static void checkCompleted(CompletionStage<?> stage) {
if (!stage.toCompletableFuture().isDone()) {
throw new IllegalStateException("future was not completed");
}
}

/**
* Get the value of a completed stage.
*
* @param stage a completed {@link CompletionStage}.
* @return the value of the stage if it has one.
* @throws IllegalStateException if the stage is not completed.
* @throws com.google.common.util.concurrent.UncheckedExecutionException
* if the future has failed with a non-runtime exception, otherwise
* the actual exception
*/
public static <T> T getCompleted(CompletionStage<T> stage) {
CompletableFuture<T> future = stage.toCompletableFuture();
checkCompleted(future);
try {
return future.join();
} catch (CompletionException e) {
throw Throwables.propagate(e.getCause());
}
}

/**
* This takes a stage of a stage of a value and
* returns a plain stage of a value.
*
* @param stage a {@link CompletionStage} of a {@link CompletionStage} of a value
* @return the {@link CompletionStage} of the value
*/
public static <T> CompletionStage<T> dereference(
CompletionStage<? extends CompletionStage<T>> stage) {
//noinspection unchecked
return stage.thenCompose(Identity.INSTANCE);
}

private static <T> CompletionStage<CompletionStage<T>> wrap(CompletionStage<T> future) {
//noinspection unchecked
return future.thenApply((Function<T, CompletionStage<T>>) WrapFunction.INSTANCE);
}

private enum Identity implements Function {
INSTANCE;

@Override
public Object apply(Object o) {
return o;
}
}


private enum WrapFunction implements Function {
INSTANCE;

@Override
public Object apply(Object o) {
return CompletableFuture.completedFuture(o);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;

import com.spotify.futures.CompletableFuturesExtra;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -17,12 +19,16 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

import static com.spotify.futures.CompletableFuturesExtra.toCompletableFuture;
import static com.spotify.futures.CompletableFuturesExtra.toListenableFuture;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -127,4 +133,176 @@ private static boolean hasCompletableFuture() {
return false;
}
}

@Test(expected = IllegalArgumentException.class)
public void testImmediateFailed() throws Exception {
final CompletionStage<Object> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());
CompletableFuturesExtra.getCompleted(future);
fail();
}

@Test
public void testGetCompleted() throws Exception {
final CompletionStage<String> future = CompletableFuture.completedFuture("hello");
assertEquals("hello", CompletableFuturesExtra.getCompleted(future));
}

@Test(expected = IllegalStateException.class)
public void testGetCompletedFails() throws Exception {
final CompletionStage<String> future = new CompletableFuture<String>();
CompletableFuturesExtra.getCompleted(future);
fail();
}

@Test(expected = IllegalArgumentException.class)
public void testDereferenceFailure() throws Exception {
final CompletionStage<Object> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());
final CompletionStage<CompletionStage<Object>> future2 = CompletableFuture.completedFuture(future);
final CompletionStage<Object> dereferenced = CompletableFuturesExtra.dereference(future2);
CompletableFuturesExtra.getCompleted(dereferenced.toCompletableFuture());
fail();
}

@Test(expected = NullPointerException.class)
public void testDereferenceNull() throws Exception {
final CompletionStage<CompletableFuture<Object>> future2 = CompletableFuture.completedFuture(null);
final CompletionStage<Object> dereferenced = CompletableFuturesExtra.dereference(future2);
CompletableFuturesExtra.getCompleted(dereferenced);
fail();
}

@Test
public void testDereferenceSuccess() throws Exception {
final CompletionStage<String> future = CompletableFuture.completedFuture("hello");
final CompletionStage<CompletionStage<String>> future2 = CompletableFuture.completedFuture(future);
final CompletionStage<String> dereferenced = CompletableFuturesExtra.dereference(future2);
assertEquals("hello", CompletableFuturesExtra.getCompleted(dereferenced));
}

@Test
public void testExceptionallyCompose() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.exceptionallyCompose(future, new Function<Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Throwable throwable) {
return CompletableFuture.completedFuture("hello");
}
});

assertEquals("hello", CompletableFuturesExtra.getCompleted(composed));

}

@Test(expected = IllegalStateException.class)
public void testExceptionallyComposeFailure() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.exceptionallyCompose(future, new Function<Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Throwable throwable) {
return CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalStateException());
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}

@Test
public void testExceptionallyComposeUnused() throws Exception {
final CompletionStage<String> future = CompletableFuture.completedFuture("hello");

final CompletionStage<String> composed = CompletableFuturesExtra.exceptionallyCompose(future, new Function<Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Throwable throwable) {
return CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalStateException());
}
});
assertEquals("hello", CompletableFuturesExtra.getCompleted(composed));
}

@Test(expected = IllegalStateException.class)
public void testExceptionallyComposeThrows() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.exceptionallyCompose(future, new Function<Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Throwable throwable) {
throw new IllegalStateException();
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}

@Test(expected = NullPointerException.class)
public void testExceptionallyComposeReturnsNull() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.exceptionallyCompose(future, new Function<Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(Throwable throwable) {
return null;
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}

@Test
public void testHandleCompose() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.handleCompose(future, new BiFunction<String, Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s, Throwable throwable) {
return CompletableFuture.completedFuture("hello");
}
});

assertEquals("hello", CompletableFuturesExtra.getCompleted(composed));

}

@Test(expected = IllegalStateException.class)
public void testHandleComposeFailure() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.handleCompose(future, new BiFunction<String, Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s, Throwable throwable) {
return CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalStateException());
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}

@Test(expected = IllegalStateException.class)
public void testHandleComposeThrows() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.handleCompose(future, new BiFunction<String, Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s, Throwable throwable) {
throw new IllegalStateException();
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}

@Test(expected = NullPointerException.class)
public void testHandleComposeReturnsNull() throws Exception {
final CompletionStage<String> future = CompletableFuturesExtra.exceptionallyCompletedFuture(new IllegalArgumentException());

final CompletionStage<String> composed = CompletableFuturesExtra.handleCompose(future, new BiFunction<String, Throwable, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s, Throwable throwable) {
return null;
}
});
CompletableFuturesExtra.getCompleted(composed);
fail();
}
}

0 comments on commit ac0e9da

Please sign in to comment.