Skip to content

Commit

Permalink
Conversions from ApiFuture to CompletableFuture (#32)
Browse files Browse the repository at this point in the history
* conversions from ApiFuture to CompletableFuture

* adding tests for ApiFuture to CompletableFuture

* make apifuture wrapper more inline with listenablefuture wrapper

* implemented tests for CompletableFuture to ApiFuture
  • Loading branch information
emmmile authored and spkrka committed Nov 16, 2018
1 parent 879a5e1 commit 005129e
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 24 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<artifactId>guava</artifactId>
<version>26.0-jre</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>1.35.0</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2013-2018 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.spotify.futures;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

class ApiFutureToCompletableFutureWrapper<V>
extends CompletableFuture<V>
implements ApiFutureCallback<V> {

private final ApiFuture<V> future;

ApiFutureToCompletableFutureWrapper(final ApiFuture<V> future, Executor executor) {
this.future = checkNotNull(future, "future");
ApiFutures.addCallback(future, this, executor);
}

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
future.cancel(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}

ApiFuture<V> unwrap() {
return future;
}

@Override
public void onSuccess(final V result) {
complete(result);
}

@Override
public void onFailure(final Throwable t) {
completeExceptionally(t);
}
}
45 changes: 42 additions & 3 deletions src/main/java/com/spotify/futures/CompletableFuturesExtra.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package com.spotify.futures;

import com.google.api.core.ApiFuture;
import com.google.common.util.concurrent.ListenableFuture;

import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -71,6 +73,45 @@ public static <V> ListenableFuture<V> toListenableFuture(
return new CompletableToListenableFutureWrapper<>(future);
}

/**
* Converts an {@link ApiFuture} to a {@link CompletableFuture}.
*
* @param future the {@link ApiFuture} to wrap.
* @return a {@link CompletableFuture} that completes when the original future completes.
*/
public static <V> CompletableFuture<V> toCompletableFuture(ApiFuture<V> future) {
return toCompletableFuture(future, MoreExecutors.directExecutor());
}

/**
* Converts an {@link ApiFuture} to a {@link CompletableFuture}.
*
* @param future the {@link ApiFuture} to wrap.
* @param executor the executor where the listener is running.
* @return a {@link CompletableFuture} that completes when the original future completes.
*/
public static <V> CompletableFuture<V> toCompletableFuture(ApiFuture<V> future,
Executor executor) {
if (future instanceof CompletableToApiFutureWrapper) {
return ((CompletableToApiFutureWrapper<V>) future).unwrap();
}
return new ApiFutureToCompletableFutureWrapper<>(future, executor);
}

/**
* Wrap a {@link CompletionStage} in a {@link ApiFuture}. The returned future will
* complete with the same result or failure as the original future.
*
* @param future The {@link CompletionStage} to wrap in a {@link ApiFuture}.
* @return A {@link ApiFuture} that completes when the original future completes.
*/
public static <V> ApiFuture<V> toApiFuture(CompletionStage<V> future) {
if (future instanceof ApiFutureToCompletableFutureWrapper) {
return ((ApiFutureToCompletableFutureWrapper<V>) future).unwrap();
}
return new CompletableToApiFutureWrapper<>(future);
}

/**
* Returns a new CompletableFuture that is already exceptionally completed with
* the given exception.
Expand Down Expand Up @@ -187,7 +228,6 @@ public Object apply(Object o) {
}
}


private enum WrapFunction implements Function {
INSTANCE;

Expand All @@ -196,5 +236,4 @@ public Object apply(Object o) {
return CompletableFuture.completedFuture(o);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2013-2018 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.spotify.futures;

import com.google.api.core.ApiFuture;
import com.google.common.util.concurrent.AbstractFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

class CompletableToApiFutureWrapper<V>
extends AbstractFuture<V>
implements ApiFuture<V>, BiConsumer<V, Throwable> {

private final CompletionStage<V> future;

CompletableToApiFutureWrapper(final CompletionStage<V> future) {
this.future = future;
this.future.whenComplete(this);
}

public CompletableFuture<V> unwrap() {
return future.toCompletableFuture();
}

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
future.toCompletableFuture().cancel(mayInterruptIfRunning);
return super.cancel(mayInterruptIfRunning);
}

@Override
public void accept(V v, Throwable throwable) {
if (throwable != null) {
if (throwable instanceof CancellationException) {
cancel(false);
} else {
setException(CompletableToListenableFutureWrapper.unwrap(throwable));
}
} else {
set(v);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -58,7 +57,7 @@ public void accept(final V v, final Throwable throwable) {
}
}

private Throwable unwrap(Throwable throwable) {
static Throwable unwrap(Throwable throwable) {
// Don't go too deep in case there is recursive exceptions
for (int i = 0; i < 100; i++) {
if (throwable instanceof CompletionException) {
Expand Down
107 changes: 88 additions & 19 deletions src/test/java/com/spotify/futures/CompletableFuturesExtraTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
package com.spotify.futures;

import static com.spotify.futures.CompletableFuturesExtra.toApiFuture;
import static com.spotify.futures.CompletableFuturesExtra.toCompletableFuture;
import static com.spotify.futures.CompletableFuturesExtra.toListenableFuture;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -15,30 +35,14 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

import static com.spotify.futures.CompletableFuturesExtra.toCompletableFuture;
import static com.spotify.futures.CompletableFuturesExtra.toListenableFuture;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class CompletableFuturesExtraTest {

@Rule
public ExpectedException exception = ExpectedException.none();

@Mock FutureCallback<String> callback;
@Mock ApiFutureCallback<String> apiFutureCallback;

private final SettableFuture<String> settable = SettableFuture.create();
private final ListenableFuture<String> listenable = settable;
Expand Down Expand Up @@ -97,7 +101,23 @@ public void testToListenableFutureUnwrap() {
}

@Test
public void testException() throws Exception {
public void testToApiFutureUnwrap() {
final CompletableFuture<String> completable = new CompletableFuture<>();
final ApiFuture<String> wrapped = toApiFuture(completable);
final CompletableFuture<String> unwrapped = toCompletableFuture(wrapped);
assertThat(unwrapped, is(sameInstance(completable)));
}

@Test
public void testToCompletableFutureFromApiFutureUnwrap() {
final ApiFuture<String> apiFuture = SettableApiFuture.create();
final CompletableFuture<String> wrapped = toCompletableFuture(apiFuture);
final ApiFuture<String> unwrapped = toApiFuture(wrapped);
assertThat(unwrapped, is(sameInstance(apiFuture)));
}

@Test
public void testExceptionListenableFuture() throws Exception {
final CompletableFuture<Object> future = new CompletableFuture<>();
future.completeExceptionally(new IllegalStateException());
final ListenableFuture<Object> converted = toListenableFuture(future);
Expand All @@ -121,6 +141,28 @@ public void testToListenableFutureSuccess() throws ExecutionException, Interrupt
assertThat(listenable.get(), is("done"));
}

@Test
public void testApiFutureSuccess() throws ExecutionException, InterruptedException {
final SettableApiFuture<String> apiFuture = SettableApiFuture.create();
final CompletableFuture<String> completable = toCompletableFuture(apiFuture);
assertThat(completable.isDone(), is(false));
apiFuture.set("done");
assertThat(completable.isDone(), is(true));
assertThat(completable.get(), is("done"));
}

@Test
public void testApiFutureFailure() throws ExecutionException, InterruptedException {
final SettableApiFuture<String> apiFuture = SettableApiFuture.create();
final CompletableFuture<String> completable = toCompletableFuture(apiFuture);
assertThat(completable.isDone(), is(false));
final Exception failure = new Exception("failure");
apiFuture.setException(failure);
assertThat(completable.isDone(), is(true));
exception.expect(ExecutionException.class);
completable.get();
}

@Test
public void testToListenableFutureFailure() throws ExecutionException, InterruptedException {
final CompletableFuture<String> completable = new CompletableFuture<>();
Expand All @@ -135,6 +177,33 @@ public void testToListenableFutureFailure() throws ExecutionException, Interrupt
wrapped.get();
}

@Test
public void testToApiFutureSuccess() throws ExecutionException, InterruptedException {
final CompletableFuture<String> completable = new CompletableFuture<>();
final ApiFuture<String> wrapped = toApiFuture(completable);
ApiFutures.addCallback(wrapped, apiFutureCallback, MoreExecutors.directExecutor());
assertThat(wrapped.isDone(), is(false));
final String value = "value";
completable.complete(value);
assertThat(wrapped.isDone(), is(true));
wrapped.get();
assertThat(wrapped.get(), is(value));
}

@Test
public void testToApiFutureFailure() throws ExecutionException, InterruptedException {
final CompletableFuture<String> completable = new CompletableFuture<>();
final ApiFuture<String> wrapped = toApiFuture(completable);
ApiFutures.addCallback(wrapped, apiFutureCallback, MoreExecutors.directExecutor());
assertThat(wrapped.isDone(), is(false));
final Exception failure = new Exception("failure");
completable.completeExceptionally(failure);
assertThat(wrapped.isDone(), is(true));
verify(apiFutureCallback).onFailure(failure);
exception.expect(ExecutionException.class);
wrapped.get();
}

private static boolean hasCompletableFuture() {
try {
Class.forName("java.util.concurrent.CompletableFuture");
Expand Down

0 comments on commit 005129e

Please sign in to comment.