Skip to content

Commit

Permalink
Dynamically adjust running source splits per task
Browse files Browse the repository at this point in the history
When the output buffer for a task is full, it's not useful to schedule
more splits. In fact, it harms the cluster's throughput as those splits
become blocked preventing other queries from using those slots. The
splits also consume system memory, which could otherwise be used for
other purposes.
This change introduces a feedback loop where each task will aim to run
only enough splits to keep its output buffer full.
  • Loading branch information
cberner committed Feb 18, 2016
1 parent 42fd8bd commit 597dc9b
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 28 deletions.
Expand Up @@ -54,6 +54,8 @@ public final class SystemSessionProperties
public static final String COLUMNAR_PROCESSING_DICTIONARY = "columnar_processing_dictionary"; public static final String COLUMNAR_PROCESSING_DICTIONARY = "columnar_processing_dictionary";
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation"; public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning"; public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
public static final String SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL = "split_concurrency_adjustment_interval";


private final List<PropertyMetadata<?>> sessionProperties; private final List<PropertyMetadata<?>> sessionProperties;


Expand Down Expand Up @@ -176,6 +178,19 @@ public SystemSessionProperties(
"Enable optimization for aggregations on dictionaries", "Enable optimization for aggregations on dictionaries",
featuresConfig.isDictionaryAggregation(), featuresConfig.isDictionaryAggregation(),
false), false),
integerSessionProperty(
INITIAL_SPLITS_PER_NODE,
"The number of splits each node will run per task, initially",
taskManagerConfig.getInitialSplitsPerNode(),
false),
new PropertyMetadata<>(
SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL,
"Experimental: Interval between changes to the number of concurrent splits per node",
VARCHAR,
Duration.class,
taskManagerConfig.getSplitConcurrencyAdjustmentInterval(),
false,
value -> Duration.valueOf((String) value)),
booleanSessionProperty( booleanSessionProperty(
PLAN_WITH_TABLE_NODE_PARTITIONING, PLAN_WITH_TABLE_NODE_PARTITIONING,
"Experimental: Adapt plan to pre-partitioned tables", "Experimental: Adapt plan to pre-partitioned tables",
Expand Down Expand Up @@ -292,4 +307,14 @@ public static boolean planWithTableNodePartitioning(Session session)
{ {
return session.getProperty(PLAN_WITH_TABLE_NODE_PARTITIONING, Boolean.class); return session.getProperty(PLAN_WITH_TABLE_NODE_PARTITIONING, Boolean.class);
} }

public static int getInitialSplitsPerNode(Session session)
{
return session.getProperty(INITIAL_SPLITS_PER_NODE, Integer.class);
}

public static Duration getSplitConcurrencyAdjustmentInterval(Session session)
{
return session.getProperty(SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL, Duration.class);
}
} }
Expand Up @@ -170,6 +170,11 @@ public void addStateChangeListener(StateChangeListener<BufferState> stateChangeL
state.addStateChangeListener(stateChangeListener); state.addStateChangeListener(stateChangeListener);
} }


public double getUtilization()
{
return memoryManager.getUtilization();
}

public boolean isFinished() public boolean isFinished()
{ {
return state.get() == FINISHED; return state.get() == FINISHED;
Expand Down
Expand Up @@ -38,6 +38,11 @@ public void updateMemoryUsage(long bytesAdded)
bufferedBytes.addAndGet(bytesAdded); bufferedBytes.addAndGet(bytesAdded);
} }


public double getUtilization()
{
return bufferedBytes.get() / (double) maxBufferedBytes;
}

public boolean isFull() public boolean isFull()
{ {
return bufferedBytes.get() >= maxBufferedBytes; return bufferedBytes.get() >= maxBufferedBytes;
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import io.airlift.units.Duration;

import javax.annotation.concurrent.NotThreadSafe;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Double.isFinite;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

@NotThreadSafe
public class SplitConcurrencyController
{
private static final double TARGET_UTILIZATION = 0.5;

private final long adjustmentIntervalNanos;
private int targetConcurrency;
private long threadNanosSinceLastAdjustment;

public SplitConcurrencyController(int initialConcurrency, Duration adjustmentInterval)
{
checkArgument(initialConcurrency > 0, "initial concurrency must be positive");
this.targetConcurrency = initialConcurrency;
this.adjustmentIntervalNanos = adjustmentInterval.roundTo(NANOSECONDS);
}

public void update(long nanos, double utilization, int currentConcurrency)
{
checkArgument(nanos >= 0, "nanos is negative");
checkArgument(isFinite(utilization), "utilization must be finite");
checkArgument(utilization >= 0, "utilization is negative");
checkArgument(currentConcurrency >= 0, "currentConcurrency is negative");

threadNanosSinceLastAdjustment += nanos;
if (threadNanosSinceLastAdjustment >= adjustmentIntervalNanos && utilization < TARGET_UTILIZATION && currentConcurrency >= targetConcurrency) {
threadNanosSinceLastAdjustment = 0;
targetConcurrency++;
}
}

public int getTargetConcurrency()
{
checkState(targetConcurrency > 0, "Possible deadlock detected. Target concurrency is zero");
return targetConcurrency;
}

public void splitFinished(long splitThreadNanos, double utilization, int currentConcurrency)
{
checkArgument(splitThreadNanos >= 0, "nanos is negative");
checkArgument(isFinite(utilization), "utilization must be finite");
checkArgument(utilization >= 0, "utilization is negative");
checkArgument(currentConcurrency >= 0, "currentConcurrency is negative");

if (threadNanosSinceLastAdjustment >= adjustmentIntervalNanos || threadNanosSinceLastAdjustment >= splitThreadNanos) {
if (utilization > TARGET_UTILIZATION && targetConcurrency > 1) {
threadNanosSinceLastAdjustment = 0;
targetConcurrency--;
}
else if (utilization < TARGET_UTILIZATION && currentConcurrency >= targetConcurrency) {
threadNanosSinceLastAdjustment = 0;
targetConcurrency++;
}
}
}
}
Expand Up @@ -54,6 +54,8 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import static com.facebook.presto.SystemSessionProperties.getInitialSplitsPerNode;
import static com.facebook.presto.SystemSessionProperties.getSplitConcurrencyAdjustmentInterval;
import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -180,7 +182,7 @@ private SqlTaskExecution(


// don't register the task if it is already completed (most likely failed during planning above) // don't register the task if it is already completed (most likely failed during planning above)
if (!taskStateMachine.getState().isDone()) { if (!taskStateMachine.getState().isDone()) {
taskHandle = taskExecutor.addTask(taskId); taskHandle = taskExecutor.addTask(taskId, sharedBuffer::getUtilization, getInitialSplitsPerNode(taskContext.getSession()), getSplitConcurrencyAdjustmentInterval(taskContext.getSession()));
taskStateMachine.addStateChangeListener(new RemoveTaskHandleWhenDone(taskExecutor, taskHandle)); taskStateMachine.addStateChangeListener(new RemoveTaskHandleWhenDone(taskExecutor, taskHandle));
taskStateMachine.addStateChangeListener(state -> { taskStateMachine.addStateChangeListener(state -> {
if (state.isDone()) { if (state.isDone()) {
Expand Down
Expand Up @@ -31,7 +31,6 @@
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject; import javax.inject.Inject;


Expand All @@ -55,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.DoubleSupplier;


import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -183,9 +183,11 @@ private synchronized void addRunnerThread()
} }
} }


public synchronized TaskHandle addTask(TaskId taskId) public synchronized TaskHandle addTask(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency)
{ {
TaskHandle taskHandle = new TaskHandle(requireNonNull(taskId, "taskId is null")); requireNonNull(taskId, "taskId is null");
requireNonNull(utilizationSupplier, "utilizationSupplier is null");
TaskHandle taskHandle = new TaskHandle(taskId, utilizationSupplier, initialSplitConcurrency, splitConcurrencyAdjustFrequency);
tasks.add(taskHandle); tasks.add(taskHandle);
return taskHandle; return taskHandle;
} }
Expand Down Expand Up @@ -327,43 +329,54 @@ private synchronized PrioritizedSplitRunner pollNextSplitWorker()
return null; return null;
} }


@NotThreadSafe @ThreadSafe
public static class TaskHandle public static class TaskHandle
{ {
private final TaskId taskId; private final TaskId taskId;
private final DoubleSupplier utilizationSupplier;
@GuardedBy("this")
private final Queue<PrioritizedSplitRunner> queuedSplits = new ArrayDeque<>(10); private final Queue<PrioritizedSplitRunner> queuedSplits = new ArrayDeque<>(10);
@GuardedBy("this")
private final List<PrioritizedSplitRunner> runningSplits = new ArrayList<>(10); private final List<PrioritizedSplitRunner> runningSplits = new ArrayList<>(10);
@GuardedBy("this")
private final List<PrioritizedSplitRunner> forcedRunningSplits = new ArrayList<>(10); private final List<PrioritizedSplitRunner> forcedRunningSplits = new ArrayList<>(10);
private final AtomicLong taskThreadUsageNanos = new AtomicLong(); @GuardedBy("this")

private long taskThreadUsageNanos;
private final AtomicBoolean destroyed = new AtomicBoolean(); @GuardedBy("this")
private boolean destroyed;
@GuardedBy("this")
private final SplitConcurrencyController concurrencyController;


private final AtomicInteger nextSplitId = new AtomicInteger(); private final AtomicInteger nextSplitId = new AtomicInteger();


private TaskHandle(TaskId taskId) private TaskHandle(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency)
{ {
this.taskId = taskId; this.taskId = taskId;
this.utilizationSupplier = utilizationSupplier;
this.concurrencyController = new SplitConcurrencyController(initialSplitConcurrency, splitConcurrencyAdjustFrequency);
} }


private long addThreadUsageNanos(long durationNanos) private synchronized long addThreadUsageNanos(long durationNanos)
{ {
return taskThreadUsageNanos.addAndGet(durationNanos); concurrencyController.update(durationNanos, utilizationSupplier.getAsDouble(), runningSplits.size());
taskThreadUsageNanos += durationNanos;
return taskThreadUsageNanos;
} }


private TaskId getTaskId() private TaskId getTaskId()
{ {
return taskId; return taskId;
} }


public boolean isDestroyed() public synchronized boolean isDestroyed()
{ {
return destroyed.get(); return destroyed;
} }


// Returns any remaining splits. The caller must destroy these. // Returns any remaining splits. The caller must destroy these.
private List<PrioritizedSplitRunner> destroy() private synchronized List<PrioritizedSplitRunner> destroy()
{ {
destroyed.set(true); destroyed = true;


ImmutableList.Builder<PrioritizedSplitRunner> builder = ImmutableList.builder(); ImmutableList.Builder<PrioritizedSplitRunner> builder = ImmutableList.builder();
builder.addAll(forcedRunningSplits); builder.addAll(forcedRunningSplits);
Expand All @@ -375,32 +388,36 @@ private List<PrioritizedSplitRunner> destroy()
return builder.build(); return builder.build();
} }


private void enqueueSplit(PrioritizedSplitRunner split) private synchronized void enqueueSplit(PrioritizedSplitRunner split)
{ {
checkState(!destroyed.get(), "Can not add split to destroyed task handle"); checkState(!destroyed, "Can not add split to destroyed task handle");
queuedSplits.add(split); queuedSplits.add(split);
} }


private void recordForcedRunningSplit(PrioritizedSplitRunner split) private synchronized void recordForcedRunningSplit(PrioritizedSplitRunner split)
{ {
checkState(!destroyed.get(), "Can not add split to destroyed task handle"); checkState(!destroyed, "Can not add split to destroyed task handle");
forcedRunningSplits.add(split); forcedRunningSplits.add(split);
} }


@VisibleForTesting @VisibleForTesting
int getRunningSplits() synchronized int getRunningSplits()
{ {
return runningSplits.size(); return runningSplits.size();
} }


private long getThreadUsageNanos() private synchronized long getThreadUsageNanos()
{ {
return taskThreadUsageNanos.get(); return taskThreadUsageNanos;
} }


private PrioritizedSplitRunner pollNextSplit() private synchronized PrioritizedSplitRunner pollNextSplit()
{ {
if (destroyed.get()) { if (destroyed) {
return null;
}

if (runningSplits.size() >= concurrencyController.getTargetConcurrency()) {
return null; return null;
} }


Expand All @@ -411,8 +428,9 @@ private PrioritizedSplitRunner pollNextSplit()
return split; return split;
} }


private void splitComplete(PrioritizedSplitRunner split) private synchronized void splitComplete(PrioritizedSplitRunner split)
{ {
concurrencyController.splitFinished(split.getSplitThreadUsageNanos(), utilizationSupplier.getAsDouble(), runningSplits.size());
forcedRunningSplits.remove(split); forcedRunningSplits.remove(split);
runningSplits.remove(split); runningSplits.remove(split);
} }
Expand Down Expand Up @@ -449,6 +467,7 @@ private static class PrioritizedSplitRunner


private final AtomicInteger priorityLevel = new AtomicInteger(); private final AtomicInteger priorityLevel = new AtomicInteger();
private final AtomicLong threadUsageNanos = new AtomicLong(); private final AtomicLong threadUsageNanos = new AtomicLong();
private final AtomicLong splitThreadUsageNanos = new AtomicLong();
private final AtomicLong lastRun = new AtomicLong(); private final AtomicLong lastRun = new AtomicLong();
private final AtomicLong start = new AtomicLong(); private final AtomicLong start = new AtomicLong();


Expand Down Expand Up @@ -494,6 +513,11 @@ public boolean isFinished()
return finished || destroyed.get() || taskHandle.isDestroyed(); return finished || destroyed.get() || taskHandle.isDestroyed();
} }


public long getSplitThreadUsageNanos()
{
return splitThreadUsageNanos.get();
}

public ListenableFuture<?> process() public ListenableFuture<?> process()
throws Exception throws Exception
{ {
Expand All @@ -508,6 +532,7 @@ public ListenableFuture<?> process()


// update priority level base on total thread usage of task // update priority level base on total thread usage of task
long durationNanos = elapsed.getWall().roundTo(NANOSECONDS); long durationNanos = elapsed.getWall().roundTo(NANOSECONDS);
this.splitThreadUsageNanos.addAndGet(durationNanos);
long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos); long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos);
this.threadUsageNanos.set(threadUsageNanos); this.threadUsageNanos.set(threadUsageNanos);
priorityLevel.set(calculatePriorityLevel(threadUsageNanos)); priorityLevel.set(calculatePriorityLevel(threadUsageNanos));
Expand Down
Expand Up @@ -39,6 +39,8 @@ public class TaskManagerConfig
private boolean shareIndexLoading; private boolean shareIndexLoading;
private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 4; private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 4;
private Integer minDrivers; private Integer minDrivers;
private Integer initialSplitsPerNode;
private Duration splitConcurrencyAdjustmentInterval = new Duration(100, TimeUnit.MILLISECONDS);


private DataSize sinkMaxBufferSize = new DataSize(32, Unit.MEGABYTE); private DataSize sinkMaxBufferSize = new DataSize(32, Unit.MEGABYTE);


Expand Down Expand Up @@ -156,6 +158,35 @@ public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads)
return this; return this;
} }


@Min(1)
public int getInitialSplitsPerNode()
{
if (initialSplitsPerNode == null) {
return maxWorkerThreads;
}
return initialSplitsPerNode;
}

@Config("task.initial-splits-per-node")
public TaskManagerConfig setInitialSplitsPerNode(int initialSplitsPerNode)
{
this.initialSplitsPerNode = initialSplitsPerNode;
return this;
}

@MinDuration("1ms")
public Duration getSplitConcurrencyAdjustmentInterval()
{
return splitConcurrencyAdjustmentInterval;
}

@Config("task.split-concurrency-adjustment-interval")
public TaskManagerConfig setSplitConcurrencyAdjustmentInterval(Duration splitConcurrencyAdjustmentInterval)
{
this.splitConcurrencyAdjustmentInterval = splitConcurrencyAdjustmentInterval;
return this;
}

@Min(1) @Min(1)
public int getMinDrivers() public int getMinDrivers()
{ {
Expand Down

0 comments on commit 597dc9b

Please sign in to comment.