diff --git a/pom.xml b/pom.xml index 0943766..5b97cf1 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,11 @@ guava 26.0-jre + + com.google.api + gax + 1.35.0 + diff --git a/src/main/java/com/spotify/futures/ApiFutureToCompletableFutureWrapper.java b/src/main/java/com/spotify/futures/ApiFutureToCompletableFutureWrapper.java new file mode 100644 index 0000000..4ee0d2e --- /dev/null +++ b/src/main/java/com/spotify/futures/ApiFutureToCompletableFutureWrapper.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2013-2018 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.futures; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +class ApiFutureToCompletableFutureWrapper + extends CompletableFuture + implements ApiFutureCallback { + + private final ApiFuture future; + + ApiFutureToCompletableFutureWrapper(final ApiFuture future, Executor executor) { + this.future = checkNotNull(future, "future"); + ApiFutures.addCallback(future, this, executor); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + future.cancel(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + + ApiFuture unwrap() { + return future; + } + + @Override + public void onSuccess(final V result) { + complete(result); + } + + @Override + public void onFailure(final Throwable t) { + completeExceptionally(t); + } +} diff --git a/src/main/java/com/spotify/futures/CompletableFuturesExtra.java b/src/main/java/com/spotify/futures/CompletableFuturesExtra.java index 18b434f..87fd0cc 100644 --- a/src/main/java/com/spotify/futures/CompletableFuturesExtra.java +++ b/src/main/java/com/spotify/futures/CompletableFuturesExtra.java @@ -15,10 +15,12 @@ */ package com.spotify.futures; +import com.google.api.core.ApiFuture; import com.google.common.util.concurrent.ListenableFuture; - +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; @@ -71,6 +73,45 @@ public static ListenableFuture toListenableFuture( return new CompletableToListenableFutureWrapper<>(future); } + /** + * Converts an {@link ApiFuture} to a {@link CompletableFuture}. + * + * @param future the {@link ApiFuture} to wrap. + * @return a {@link CompletableFuture} that completes when the original future completes. + */ + public static CompletableFuture toCompletableFuture(ApiFuture future) { + return toCompletableFuture(future, MoreExecutors.directExecutor()); + } + + /** + * Converts an {@link ApiFuture} to a {@link CompletableFuture}. + * + * @param future the {@link ApiFuture} to wrap. + * @param executor the executor where the listener is running. + * @return a {@link CompletableFuture} that completes when the original future completes. + */ + public static CompletableFuture toCompletableFuture(ApiFuture future, + Executor executor) { + if (future instanceof CompletableToApiFutureWrapper) { + return ((CompletableToApiFutureWrapper) future).unwrap(); + } + return new ApiFutureToCompletableFutureWrapper<>(future, executor); + } + + /** + * Wrap a {@link CompletionStage} in a {@link ApiFuture}. The returned future will + * complete with the same result or failure as the original future. + * + * @param future The {@link CompletionStage} to wrap in a {@link ApiFuture}. + * @return A {@link ApiFuture} that completes when the original future completes. + */ + public static ApiFuture toApiFuture(CompletionStage future) { + if (future instanceof ApiFutureToCompletableFutureWrapper) { + return ((ApiFutureToCompletableFutureWrapper) future).unwrap(); + } + return new CompletableToApiFutureWrapper<>(future); + } + /** * Returns a new CompletableFuture that is already exceptionally completed with * the given exception. @@ -187,7 +228,6 @@ public Object apply(Object o) { } } - private enum WrapFunction implements Function { INSTANCE; @@ -196,5 +236,4 @@ public Object apply(Object o) { return CompletableFuture.completedFuture(o); } } - } diff --git a/src/main/java/com/spotify/futures/CompletableToApiFutureWrapper.java b/src/main/java/com/spotify/futures/CompletableToApiFutureWrapper.java new file mode 100644 index 0000000..f719c0f --- /dev/null +++ b/src/main/java/com/spotify/futures/CompletableToApiFutureWrapper.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2013-2018 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.futures; + +import com.google.api.core.ApiFuture; +import com.google.common.util.concurrent.AbstractFuture; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; + +class CompletableToApiFutureWrapper + extends AbstractFuture + implements ApiFuture, BiConsumer { + + private final CompletionStage future; + + CompletableToApiFutureWrapper(final CompletionStage future) { + this.future = future; + this.future.whenComplete(this); + } + + public CompletableFuture unwrap() { + return future.toCompletableFuture(); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + future.toCompletableFuture().cancel(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + + @Override + public void accept(V v, Throwable throwable) { + if (throwable != null) { + if (throwable instanceof CancellationException) { + cancel(false); + } else { + setException(CompletableToListenableFutureWrapper.unwrap(throwable)); + } + } else { + set(v); + } + } +} diff --git a/src/main/java/com/spotify/futures/CompletableToListenableFutureWrapper.java b/src/main/java/com/spotify/futures/CompletableToListenableFutureWrapper.java index 8dd25ce..b7f8e2b 100644 --- a/src/main/java/com/spotify/futures/CompletableToListenableFutureWrapper.java +++ b/src/main/java/com/spotify/futures/CompletableToListenableFutureWrapper.java @@ -17,7 +17,6 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; - import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -58,7 +57,7 @@ public void accept(final V v, final Throwable throwable) { } } - private Throwable unwrap(Throwable throwable) { + static Throwable unwrap(Throwable throwable) { // Don't go too deep in case there is recursive exceptions for (int i = 0; i < 100; i++) { if (throwable instanceof CompletionException) { diff --git a/src/test/java/com/spotify/futures/CompletableFuturesExtraTest.java b/src/test/java/com/spotify/futures/CompletableFuturesExtraTest.java index 570c672..1f07c8f 100644 --- a/src/test/java/com/spotify/futures/CompletableFuturesExtraTest.java +++ b/src/test/java/com/spotify/futures/CompletableFuturesExtraTest.java @@ -1,12 +1,32 @@ package com.spotify.futures; +import static com.spotify.futures.CompletableFuturesExtra.toApiFuture; +import static com.spotify.futures.CompletableFuturesExtra.toCompletableFuture; +import static com.spotify.futures.CompletableFuturesExtra.toListenableFuture; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -15,23 +35,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.function.BiConsumer; - -import static com.spotify.futures.CompletableFuturesExtra.toCompletableFuture; -import static com.spotify.futures.CompletableFuturesExtra.toListenableFuture; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - @RunWith(MockitoJUnitRunner.class) public class CompletableFuturesExtraTest { @@ -39,6 +42,7 @@ public class CompletableFuturesExtraTest { public ExpectedException exception = ExpectedException.none(); @Mock FutureCallback callback; + @Mock ApiFutureCallback apiFutureCallback; private final SettableFuture settable = SettableFuture.create(); private final ListenableFuture listenable = settable; @@ -97,7 +101,23 @@ public void testToListenableFutureUnwrap() { } @Test - public void testException() throws Exception { + public void testToApiFutureUnwrap() { + final CompletableFuture completable = new CompletableFuture<>(); + final ApiFuture wrapped = toApiFuture(completable); + final CompletableFuture unwrapped = toCompletableFuture(wrapped); + assertThat(unwrapped, is(sameInstance(completable))); + } + + @Test + public void testToCompletableFutureFromApiFutureUnwrap() { + final ApiFuture apiFuture = SettableApiFuture.create(); + final CompletableFuture wrapped = toCompletableFuture(apiFuture); + final ApiFuture unwrapped = toApiFuture(wrapped); + assertThat(unwrapped, is(sameInstance(apiFuture))); + } + + @Test + public void testExceptionListenableFuture() throws Exception { final CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new IllegalStateException()); final ListenableFuture converted = toListenableFuture(future); @@ -121,6 +141,28 @@ public void testToListenableFutureSuccess() throws ExecutionException, Interrupt assertThat(listenable.get(), is("done")); } + @Test + public void testApiFutureSuccess() throws ExecutionException, InterruptedException { + final SettableApiFuture apiFuture = SettableApiFuture.create(); + final CompletableFuture completable = toCompletableFuture(apiFuture); + assertThat(completable.isDone(), is(false)); + apiFuture.set("done"); + assertThat(completable.isDone(), is(true)); + assertThat(completable.get(), is("done")); + } + + @Test + public void testApiFutureFailure() throws ExecutionException, InterruptedException { + final SettableApiFuture apiFuture = SettableApiFuture.create(); + final CompletableFuture completable = toCompletableFuture(apiFuture); + assertThat(completable.isDone(), is(false)); + final Exception failure = new Exception("failure"); + apiFuture.setException(failure); + assertThat(completable.isDone(), is(true)); + exception.expect(ExecutionException.class); + completable.get(); + } + @Test public void testToListenableFutureFailure() throws ExecutionException, InterruptedException { final CompletableFuture completable = new CompletableFuture<>(); @@ -135,6 +177,33 @@ public void testToListenableFutureFailure() throws ExecutionException, Interrupt wrapped.get(); } + @Test + public void testToApiFutureSuccess() throws ExecutionException, InterruptedException { + final CompletableFuture completable = new CompletableFuture<>(); + final ApiFuture wrapped = toApiFuture(completable); + ApiFutures.addCallback(wrapped, apiFutureCallback, MoreExecutors.directExecutor()); + assertThat(wrapped.isDone(), is(false)); + final String value = "value"; + completable.complete(value); + assertThat(wrapped.isDone(), is(true)); + wrapped.get(); + assertThat(wrapped.get(), is(value)); + } + + @Test + public void testToApiFutureFailure() throws ExecutionException, InterruptedException { + final CompletableFuture completable = new CompletableFuture<>(); + final ApiFuture wrapped = toApiFuture(completable); + ApiFutures.addCallback(wrapped, apiFutureCallback, MoreExecutors.directExecutor()); + assertThat(wrapped.isDone(), is(false)); + final Exception failure = new Exception("failure"); + completable.completeExceptionally(failure); + assertThat(wrapped.isDone(), is(true)); + verify(apiFutureCallback).onFailure(failure); + exception.expect(ExecutionException.class); + wrapped.get(); + } + private static boolean hasCompletableFuture() { try { Class.forName("java.util.concurrent.CompletableFuture");