From 224e66e14335bda70bf6083a0cf5f80fd21f87cc Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 11 May 2017 17:36:17 +0200 Subject: [PATCH] [FLINK-6555] [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed. Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture The WaitingConjunctFuture waits for the completion of its futures. The future values are discarded making it more efficient than the ResultConjunctFuture which returns the futures' values. The WaitingConjunctFuture is instantiated via FutureUtils.waitForAll(Collection). This closes #3873. --- .../flink/runtime/concurrent/FutureUtils.java | 131 ++++++++++++++---- .../executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionJobVertex.java | 4 +- .../failover/FailoverRegion.java | 2 +- .../runtime/concurrent/FutureUtilsTest.java | 83 +++++++++-- 5 files changed, 184 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 4948147b9a410e..a27af5666fd25d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.Preconditions; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -106,8 +109,9 @@ public RetryException(Throwable cause) { /** * Creates a future that is complete once multiple other futures completed. - * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the - * conjunction fails. + * The future fails (completes exceptionally) once one of the futures in the + * conjunction fails. Upon successful completion, the future returns the + * collection of the futures' results. * *

The ConjunctFuture gives access to how many Futures in the conjunction have already * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. @@ -115,16 +119,16 @@ public RetryException(Throwable cause) { * @param futures The futures that make up the conjunction. No null entries are allowed. * @return The ConjunctFuture that completes once all given futures are complete (or one fails). */ - public static ConjunctFuture combineAll(Collection> futures) { + public static ConjunctFuture> combineAll(Collection> futures) { checkNotNull(futures, "futures"); - final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + final ResultConjunctFuture conjunct = new ResultConjunctFuture<>(futures.size()); if (futures.isEmpty()) { - conjunct.complete(null); + conjunct.complete(Collections.emptyList()); } else { - for (Future future : futures) { + for (Future future : futures) { future.handle(conjunct.completionHandler); } } @@ -132,17 +136,33 @@ public static ConjunctFuture combineAll(Collection> futures) return conjunct; } + /** + * Creates a future that is complete once all of the given futures have completed. + * The future fails (completes exceptionally) once one of the given futures + * fails. + * + *

The ConjunctFuture gives access to how many Futures have already + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. + * + * @param futures The futures to wait on. No null entries are allowed. + * @return The WaitingFuture that completes once all given futures are complete (or one fails). + */ + public static ConjunctFuture waitForAll(Collection> futures) { + checkNotNull(futures, "futures"); + + return new WaitingConjunctFuture(futures); + } + /** * A future that is complete once multiple other futures completed. The futures are not - * necessarily of the same type, which is why the type of this Future is {@code Void}. - * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the - * conjunction fails. + * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once + * one of the Futures in the conjunction fails. * *

The advantage of using the ConjunctFuture over chaining all the futures (such as via * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how * many of the Futures are already complete. */ - public interface ConjunctFuture extends CompletableFuture { + public interface ConjunctFuture extends CompletableFuture { /** * Gets the total number of Futures in the conjunction. @@ -158,39 +178,102 @@ public interface ConjunctFuture extends CompletableFuture { } /** - * The implementation of the {@link ConjunctFuture}. - * - *

Implementation notice: The member fields all have package-private access, because they are - * either accessed by an inner subclass or by the enclosing class. + * The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection. */ - private static class ConjunctFutureImpl extends FlinkCompletableFuture implements ConjunctFuture { + private static class ResultConjunctFuture extends FlinkCompletableFuture> implements ConjunctFuture> { /** The total number of futures in the conjunction */ - final int numTotal; + private final int numTotal; + + /** The next free index in the results arrays */ + private final AtomicInteger nextIndex = new AtomicInteger(0); /** The number of futures in the conjunction that are already complete */ - final AtomicInteger numCompleted = new AtomicInteger(); + private final AtomicInteger numCompleted = new AtomicInteger(0); + + /** The set of collected results so far */ + private volatile T[] results; /** The function that is attached to all futures in the conjunction. Once a future - * is complete, this function tracks the completion or fails the conjunct. + * is complete, this function tracks the completion or fails the conjunct. */ - final BiFunction completionHandler = new BiFunction() { + final BiFunction completionHandler = new BiFunction() { @Override - public Void apply(Object o, Throwable throwable) { + public Void apply(T o, Throwable throwable) { if (throwable != null) { completeExceptionally(throwable); - } - else if (numTotal == numCompleted.incrementAndGet()) { - complete(null); + } else { + int index = nextIndex.getAndIncrement(); + + results[index] = o; + + if (numCompleted.incrementAndGet() == numTotal) { + complete(Arrays.asList(results)); + } } return null; } }; - ConjunctFutureImpl(int numTotal) { + @SuppressWarnings("unchecked") + ResultConjunctFuture(int numTotal) { this.numTotal = numTotal; + results = (T[])new Object[numTotal]; + } + + @Override + public int getNumFuturesTotal() { + return numTotal; + } + + @Override + public int getNumFuturesCompleted() { + return numCompleted.get(); + } + } + + /** + * Implementation of the {@link ConjunctFuture} interface which waits only for the completion + * of its futures and does not return their values. + */ + private static final class WaitingConjunctFuture extends FlinkCompletableFuture implements ConjunctFuture { + + /** Number of completed futures */ + private final AtomicInteger numCompleted = new AtomicInteger(0); + + /** Total number of futures to wait on */ + private final int numTotal; + + /** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */ + private final BiFunction completionHandler = new BiFunction() { + @Override + public Void apply(Object o, Throwable throwable) { + if (throwable == null) { + if (numTotal == numCompleted.incrementAndGet()) { + complete(null); + } + } else { + completeExceptionally(throwable); + } + + return null; + } + }; + + private WaitingConjunctFuture(Collection> futures) { + Preconditions.checkNotNull(futures, "Futures must not be null."); + + this.numTotal = futures.size(); + + if (futures.isEmpty()) { + complete(null); + } else { + for (Future future : futures) { + future.handle(completionHandler); + } + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 5eaa637a9314fe..7c13936f2cab4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -871,7 +871,7 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // this future is complete once all slot futures are complete. // the future fails once one slot future fails. - final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures); + final ConjunctFuture allAllocationsComplete = FutureUtils.waitForAll(slotFutures); // make sure that we fail if the allocation timeout was exceeded final ScheduledFuture timeoutCancelHandle = futureExecutor.schedule(new Runnable() { @@ -892,7 +892,7 @@ public void run() { allAllocationsComplete.handleAsync(new BiFunction() { @Override - public Void apply(Void ignored, Throwable throwable) { + public Void apply(Void slots, Throwable throwable) { try { // we do not need the cancellation timeout any more timeoutCancelHandle.cancel(false); @@ -973,7 +973,7 @@ public void cancel() { } // we build a future that is complete once all vertices have reached a terminal state - final ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final ConjunctFuture allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAccept(new AcceptFunction() { @Override public void accept(Void value) { @@ -1102,7 +1102,7 @@ else if (transitionState(current, JobStatus.FAILING, t)) { futures.add(ejv.cancelWithFuture()); } - final ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final ConjunctFuture allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAccept(new AcceptFunction() { @Override public void accept(Void value) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 3a98e0a5a1f597..f5a592a205eb4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -509,7 +509,7 @@ public void cancel() { */ public Future cancelWithFuture() { // we collect all futures from the task cancellations - ArrayList> futures = new ArrayList<>(parallelism); + ArrayList> futures = new ArrayList<>(parallelism); // cancel each vertex for (ExecutionVertex ev : getTaskVertices()) { @@ -517,7 +517,7 @@ public Future cancelWithFuture() { } // return a conjunct future, which is complete once all individual tasks are canceled - return FutureUtils.combineAll(futures); + return FutureUtils.waitForAll(futures); } public void fail(Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index b36cfcf1a4f13f..6066c7701b07f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -150,7 +150,7 @@ private void cancel(final long globalModVersionOfFailover) { futures.add(vertex.cancel()); } - final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + final FutureUtils.ConjunctFuture allTerminal = FutureUtils.waitForAll(futures); allTerminal.thenAcceptAsync(new AcceptFunction() { @Override public void accept(Void value) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java index 43710cb4b9a8c1..e262459ef0bf51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java @@ -21,10 +21,15 @@ import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.TestLogger; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.ExecutionException; @@ -33,17 +38,26 @@ /** * Tests for the utility methods in {@link FutureUtils} */ -public class FutureUtilsTest { +@RunWith(Parameterized.class) +public class FutureUtilsTest extends TestLogger{ + + @Parameterized.Parameters + public static Collection parameters (){ + return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory()); + } + + @Parameterized.Parameter + public FutureFactory futureFactory; @Test public void testConjunctFutureFailsOnEmptyAndNull() throws Exception { try { - FutureUtils.combineAll(null); + futureFactory.createFuture(null); fail(); } catch (NullPointerException ignored) {} try { - FutureUtils.combineAll(Arrays.asList( + futureFactory.createFuture(Arrays.asList( new FlinkCompletableFuture(), null, new FlinkCompletableFuture())); @@ -63,11 +77,11 @@ public void testConjunctFutureCompletion() throws Exception { future2.complete(new Object()); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); - Future resultMapped = result.thenAccept(new AcceptFunction() { + Future resultMapped = result.thenAccept(new AcceptFunction() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); assertEquals(4, result.getNumFuturesTotal()); @@ -108,11 +122,11 @@ public void testConjunctFutureFailureOnFirst() throws Exception { CompletableFuture future4 = new FlinkCompletableFuture<>(); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); - Future resultMapped = result.thenAccept(new AcceptFunction() { + Future resultMapped = result.thenAccept(new AcceptFunction() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); assertEquals(4, result.getNumFuturesTotal()); @@ -150,12 +164,12 @@ public void testConjunctFutureFailureOnSuccessive() throws Exception { CompletableFuture future4 = new FlinkCompletableFuture<>(); // build the conjunct future - ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4)); + ConjunctFuture result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4)); assertEquals(4, result.getNumFuturesTotal()); - Future resultMapped = result.thenAccept(new AcceptFunction() { + Future resultMapped = result.thenAccept(new AcceptFunction() { @Override - public void accept(Void value) {} + public void accept(Object value) {} }); future1.complete(new Object()); @@ -183,12 +197,55 @@ public void accept(Void value) {} } } + /** + * Tests that the conjunct future returns upon completion the collection of all future values + */ + @Test + public void testConjunctFutureValue() throws ExecutionException, InterruptedException { + CompletableFuture future1 = FlinkCompletableFuture.completed(1); + CompletableFuture future2 = FlinkCompletableFuture.completed(2L); + CompletableFuture future3 = new FlinkCompletableFuture<>(); + + ConjunctFuture> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3)); + + assertFalse(result.isDone()); + + future3.complete(.1); + + assertTrue(result.isDone()); + + assertThat(result.get(), IsIterableContainingInAnyOrder.containsInAnyOrder(1, 2L, .1)); + } + @Test public void testConjunctOfNone() throws Exception { - final ConjunctFuture result = FutureUtils.combineAll(Collections.>emptyList()); + final ConjunctFuture result = futureFactory.createFuture(Collections.>emptyList()); assertEquals(0, result.getNumFuturesTotal()); assertEquals(0, result.getNumFuturesCompleted()); assertTrue(result.isDone()); } + + /** + * Factory to create {@link ConjunctFuture} for testing. + */ + private interface FutureFactory { + ConjunctFuture createFuture(Collection> futures); + } + + private static class ConjunctFutureFactory implements FutureFactory { + + @Override + public ConjunctFuture createFuture(Collection> futures) { + return FutureUtils.combineAll(futures); + } + } + + private static class WaitingFutureFactory implements FutureFactory { + + @Override + public ConjunctFuture createFuture(Collection> futures) { + return FutureUtils.waitForAll(futures); + } + } }