Skip to content

Commit

Permalink
[FLINK-6555] [futures] Generalize ConjunctFuture to return results
Browse files Browse the repository at this point in the history
The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future values
are discarded making it more efficient than the ResultConjunctFuture which returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection<Future>).

This closes apache#3873.
  • Loading branch information
tillrohrmann committed May 16, 2017
1 parent c4af1b5 commit 9fbacb0
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 44 deletions.
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.runtime.concurrent;

import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.Preconditions;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -106,43 +109,60 @@ public RetryException(Throwable cause) {

/**
* Creates a future that is complete once multiple other futures completed.
* The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
* conjunction fails.
* The future fails (completes exceptionally) once one of the futures in the
* conjunction fails. Upon successful completion, the future returns the
* collection of the futures' results.
*
* <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
* completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
*
* @param futures The futures that make up the conjunction. No null entries are allowed.
* @return The ConjunctFuture that completes once all given futures are complete (or one fails).
*/
public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) {
checkNotNull(futures, "futures");

final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());

if (futures.isEmpty()) {
conjunct.complete(null);
conjunct.complete(Collections.<T>emptyList());
}
else {
for (Future<?> future : futures) {
for (Future<? extends T> future : futures) {
future.handle(conjunct.completionHandler);
}
}

return conjunct;
}

/**
* Creates a future that is complete once all of the given futures have completed.
* The future fails (completes exceptionally) once one of the given futures
* fails.
*
* <p>The ConjunctFuture gives access to how many Futures have already
* completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
*
* @param futures The futures to wait on. No null entries are allowed.
* @return The WaitingFuture that completes once all given futures are complete (or one fails).
*/
public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) {
checkNotNull(futures, "futures");

return new WaitingConjunctFuture(futures);
}

/**
* A future that is complete once multiple other futures completed. The futures are not
* necessarily of the same type, which is why the type of this Future is {@code Void}.
* The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
* conjunction fails.
* necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once
* one of the Futures in the conjunction fails.
*
* <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
* {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
* many of the Futures are already complete.
*/
public interface ConjunctFuture extends CompletableFuture<Void> {
public interface ConjunctFuture<T> extends CompletableFuture<T> {

/**
* Gets the total number of Futures in the conjunction.
Expand All @@ -158,39 +178,102 @@ public interface ConjunctFuture extends CompletableFuture<Void> {
}

/**
* The implementation of the {@link ConjunctFuture}.
*
* <p>Implementation notice: The member fields all have package-private access, because they are
* either accessed by an inner subclass or by the enclosing class.
* The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection.
*/
private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> {

/** The total number of futures in the conjunction */
final int numTotal;
private final int numTotal;

/** The next free index in the results arrays */
private final AtomicInteger nextIndex = new AtomicInteger(0);

/** The number of futures in the conjunction that are already complete */
final AtomicInteger numCompleted = new AtomicInteger();
private final AtomicInteger numCompleted = new AtomicInteger(0);

/** The set of collected results so far */
private volatile T[] results;

/** The function that is attached to all futures in the conjunction. Once a future
* is complete, this function tracks the completion or fails the conjunct.
* is complete, this function tracks the completion or fails the conjunct.
*/
final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {

@Override
public Void apply(Object o, Throwable throwable) {
public Void apply(T o, Throwable throwable) {
if (throwable != null) {
completeExceptionally(throwable);
}
else if (numTotal == numCompleted.incrementAndGet()) {
complete(null);
} else {
int index = nextIndex.getAndIncrement();

results[index] = o;

if (numCompleted.incrementAndGet() == numTotal) {
complete(Arrays.asList(results));
}
}

return null;
}
};

ConjunctFutureImpl(int numTotal) {
@SuppressWarnings("unchecked")
ResultConjunctFuture(int numTotal) {
this.numTotal = numTotal;
results = (T[])new Object[numTotal];
}

@Override
public int getNumFuturesTotal() {
return numTotal;
}

@Override
public int getNumFuturesCompleted() {
return numCompleted.get();
}
}

/**
* Implementation of the {@link ConjunctFuture} interface which waits only for the completion
* of its futures and does not return their values.
*/
private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void> implements ConjunctFuture<Void> {

/** Number of completed futures */
private final AtomicInteger numCompleted = new AtomicInteger(0);

/** Total number of futures to wait on */
private final int numTotal;

/** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
@Override
public Void apply(Object o, Throwable throwable) {
if (throwable == null) {
if (numTotal == numCompleted.incrementAndGet()) {
complete(null);
}
} else {
completeExceptionally(throwable);
}

return null;
}
};

private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
Preconditions.checkNotNull(futures, "Futures must not be null.");

this.numTotal = futures.size();

if (futures.isEmpty()) {
complete(null);
} else {
for (Future<?> future : futures) {
future.handle(completionHandler);
}
}
}

@Override
Expand Down
Expand Up @@ -871,7 +871,7 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {

// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures);

// make sure that we fail if the allocation timeout was exceeded
final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
Expand All @@ -892,7 +892,7 @@ public void run() {
allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {

@Override
public Void apply(Void ignored, Throwable throwable) {
public Void apply(Void slots, Throwable throwable) {
try {
// we do not need the cancellation timeout any more
timeoutCancelHandle.cancel(false);
Expand Down Expand Up @@ -973,7 +973,7 @@ public void cancel() {
}

// we build a future that is complete once all vertices have reached a terminal state
final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
Expand Down Expand Up @@ -1102,7 +1102,7 @@ else if (transitionState(current, JobStatus.FAILING, t)) {
futures.add(ejv.cancelWithFuture());
}

final ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
Expand Down
Expand Up @@ -509,15 +509,15 @@ public void cancel() {
*/
public Future<Void> cancelWithFuture() {
// we collect all futures from the task cancellations
ArrayList<Future<?>> futures = new ArrayList<>(parallelism);
ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism);

// cancel each vertex
for (ExecutionVertex ev : getTaskVertices()) {
futures.add(ev.cancel());
}

// return a conjunct future, which is complete once all individual tasks are canceled
return FutureUtils.combineAll(futures);
return FutureUtils.waitForAll(futures);
}

public void fail(Throwable t) {
Expand Down
Expand Up @@ -150,7 +150,7 @@ private void cancel(final long globalModVersionOfFailover) {
futures.add(vertex.cancel());
}

final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures);
final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
Expand Down

0 comments on commit 9fbacb0

Please sign in to comment.