Skip to content

Commit

Permalink
Add coordinator execution support for grouped execution
Browse files Browse the repository at this point in the history
  • Loading branch information
haozhun committed Dec 9, 2017
1 parent 4e55aad commit fa3e1b7
Show file tree
Hide file tree
Showing 12 changed files with 616 additions and 126 deletions.
Expand Up @@ -27,6 +27,7 @@
import javax.inject.Inject;

import java.util.List;
import java.util.OptionalInt;

import static com.facebook.presto.spi.session.PropertyMetadata.booleanSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty;
Expand Down Expand Up @@ -61,6 +62,7 @@ public final class SystemSessionProperties
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning";
public static final String COLOCATED_JOIN = "colocated_join";
public static final String CONCURRENT_LIFESPANS_PER_NODE = "concurrent_lifespans_per_task";
public static final String REORDER_JOINS = "reorder_joins";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
public static final String SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL = "split_concurrency_adjustment_interval";
Expand Down Expand Up @@ -285,6 +287,11 @@ public SystemSessionProperties(
"Experimental: Use a colocated join when possible",
featuresConfig.isColocatedJoinsEnabled(),
false),
integerSessionProperty(
CONCURRENT_LIFESPANS_PER_NODE,
"Experimental: Run a fixed number of groups concurrently for eligible JOINs",
-1,
false),
new PropertyMetadata<>(
SPILL_ENABLED,
"Experimental: Enable spilling",
Expand Down Expand Up @@ -506,6 +513,18 @@ public static boolean isColocatedJoinEnabled(Session session)
return session.getSystemProperty(COLOCATED_JOIN, Boolean.class);
}

public static OptionalInt getConcurrentLifespansPerNode(Session session)
{
Integer result = session.getSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, Integer.class);
if (result == -1) {
return OptionalInt.empty();
}
else {
checkArgument(result > 0, "Concurrent lifespans per node must be positive if set");
return OptionalInt.of(result);
}
}

public static int getInitialSplitsPerNode(Session session)
{
return session.getSystemProperty(INITIAL_SPLITS_PER_NODE, Integer.class);
Expand Down
Expand Up @@ -36,6 +36,8 @@ public interface RemoteTask

void noMoreSplits(PlanNodeId sourceId);

void noMoreSplits(PlanNodeId sourceId, Lifespan lifespan);

void setOutputBuffers(OutputBuffers outputBuffers);

void addStateChangeListener(StateChangeListener<TaskStatus> stateChangeListener);
Expand Down
Expand Up @@ -34,22 +34,27 @@
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;

import javax.annotation.concurrent.ThreadSafe;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.facebook.presto.failureDetector.FailureDetector.State.GONE;
import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
Expand All @@ -68,6 +73,7 @@ public final class SqlStageExecution
private final RemoteTaskFactory remoteTaskFactory;
private final NodeTaskMap nodeTaskMap;
private final boolean summarizeTaskInfo;
private final Executor executor;
private final FailureDetector failureDetector;

private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources;
Expand All @@ -84,6 +90,8 @@ public final class SqlStageExecution

private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>();

private final ListenerManager<Set<Lifespan>> completedLifespansChangeListeners = new ListenerManager<>();

public SqlStageExecution(
StageId stageId,
URI location,
Expand All @@ -106,15 +114,17 @@ public SqlStageExecution(
remoteTaskFactory,
nodeTaskMap,
summarizeTaskInfo,
executor,
failureDetector);
}

public SqlStageExecution(StageStateMachine stateMachine, RemoteTaskFactory remoteTaskFactory, NodeTaskMap nodeTaskMap, boolean summarizeTaskInfo, FailureDetector failureDetector)
public SqlStageExecution(StageStateMachine stateMachine, RemoteTaskFactory remoteTaskFactory, NodeTaskMap nodeTaskMap, boolean summarizeTaskInfo, Executor executor, FailureDetector failureDetector)
{
this.stateMachine = stateMachine;
this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.summarizeTaskInfo = summarizeTaskInfo;
this.executor = requireNonNull(executor, "executor is null");
this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");

ImmutableMap.Builder<PlanFragmentId, RemoteSourceNode> fragmentToExchangeSource = ImmutableMap.builder();
Expand All @@ -141,6 +151,11 @@ public void addStateChangeListener(StateChangeListener<StageState> stateChangeLi
stateMachine.addStateChangeListener(stateChangeListener);
}

public void addCompletedDriverGroupsChangedListener(Consumer<Set<Lifespan>> newlyCompletedDriverGroupConsumer)
{
completedLifespansChangeListeners.addListener(newlyCompletedDriverGroupConsumer);
}

public PlanFragment getFragment()
{
return stateMachine.getFragment();
Expand Down Expand Up @@ -293,7 +308,7 @@ public synchronized RemoteTask scheduleTask(Node node, int partition)
return scheduleTask(node, new TaskId(stateMachine.getStageId(), partition), ImmutableMultimap.of());
}

public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits)
public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
Expand All @@ -304,16 +319,27 @@ public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeI

ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
RemoteTask task;
if (tasks == null) {
// The output buffer depends on the task id starting from 0 and being sequential, since each
// task is assigned a private buffer based on task id.
TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
newTasks.add(scheduleTask(node, taskId, splits));
task = scheduleTask(node, taskId, splits);
newTasks.add(task);
}
else {
RemoteTask task = tasks.iterator().next();
task = tasks.iterator().next();
task.addSplits(splits);
}
if (noMoreSplitsNotification.size() > 1) {
// The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
// If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
// These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
}
for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
task.noMoreSplits(entry.getKey(), entry.getValue());
}
return newTasks.build();
}

Expand Down Expand Up @@ -386,11 +412,13 @@ private class StageTaskListener
implements StateChangeListener<TaskStatus>
{
private long previousMemory;
private final Set<Lifespan> completedDriverGroups = new HashSet<>();

@Override
public void stateChanged(TaskStatus taskStatus)
{
updateMemoryUsage(taskStatus);
updateCompletedDriverGroups(taskStatus);

StageState stageState = getState();
if (stageState.isDone()) {
Expand Down Expand Up @@ -432,6 +460,23 @@ private synchronized void updateMemoryUsage(TaskStatus taskStatus)
stateMachine.updateMemoryUsage(deltaMemoryInBytes);
}

private synchronized void updateCompletedDriverGroups(TaskStatus taskStatus)
{
// Sets.difference returns a view.
// Once we add the difference into `completedDriverGroups`, the view will be empty.
// `completedLifespansChangeListeners.invoke` happens asynchronously.
// As a result, calling the listeners before updating `completedDriverGroups` doesn't make a difference.
// That's why a copy must be made here.
Set<Lifespan> newlyCompletedDriverGroups = ImmutableSet.copyOf(Sets.difference(taskStatus.getCompletedDriverGroups(), this.completedDriverGroups));
if (newlyCompletedDriverGroups.isEmpty()) {
return;
}
completedLifespansChangeListeners.invoke(newlyCompletedDriverGroups, executor);
// newlyCompletedDriverGroups is a view.
// Making changes to completedDriverGroups will change newlyCompletedDriverGroups.
completedDriverGroups.addAll(newlyCompletedDriverGroups);
}

private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo)
{
if (executionFailureInfo.getRemoteHost() != null &&
Expand All @@ -451,4 +496,24 @@ private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo execut
}
}
}

private static class ListenerManager<T>
{
private final List<Consumer<T>> listeners = new ArrayList<>();
private boolean freezed;

public synchronized void addListener(Consumer<T> listener)
{
checkState(!freezed, "Listeners have been invoked");
listeners.add(listener);
}

public synchronized void invoke(T payload, Executor executor)
{
freezed = true;
for (Consumer<T> listener : listeners) {
executor.execute(() -> listener.accept(payload));
}
}
}
}

0 comments on commit fa3e1b7

Please sign in to comment.