Skip to content

Commit

Permalink
Change local scheduling to guarantee a time share to levels
Browse files Browse the repository at this point in the history
Previously, lower levels had absolute priority over higher levels,
which caused longer running queries to starve for periods of time.

The scheduler now allocates shares of time to each level, with the
fraction of time configured using the level priority multiplier config
property. For example, if the multiplier is set to 2, level 0 will get
2x the scheduled time of level 1, and 4x the time of level 2. This
honors the design principle of prioritizing shorter queries, while
guaranteeing that no level starves.
  • Loading branch information
raghavsethi committed Jun 30, 2017
1 parent d743fb0 commit 2395e96
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 36 deletions.
Expand Up @@ -27,6 +27,7 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

@DefunctConfig({
Expand Down Expand Up @@ -64,6 +65,9 @@ public class TaskManagerConfig

private int taskNotificationThreads = 5;

private boolean levelAbsolutePriority = true;
private BigDecimal levelTimeMultiplier = new BigDecimal(2.0);

@MinDuration("1ms")
@MaxDuration("10s")
@NotNull
Expand Down Expand Up @@ -158,6 +162,35 @@ public TaskManagerConfig setShareIndexLoading(boolean shareIndexLoading)
return this;
}

@Deprecated
@NotNull
public boolean isLevelAbsolutePriority()
{
return levelAbsolutePriority;
}

@Deprecated
@Config("task.level-absolute-priority")
public TaskManagerConfig setLevelAbsolutePriority(boolean levelAbsolutePriority)
{
this.levelAbsolutePriority = levelAbsolutePriority;
return this;
}

public BigDecimal getLevelTimeMultiplier()
{
return levelTimeMultiplier;
}

@Config("task.level-time-multiplier")
@ConfigDescription("Factor that determines the target scheduled time for a level relative to the next")
@Min(0)
public TaskManagerConfig setLevelTimeMultiplier(BigDecimal levelTimeMultiplier)
{
this.levelTimeMultiplier = levelTimeMultiplier;
return this;
}

@Min(1)
public int getMaxWorkerThreads()
{
Expand Down
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.execution.executor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.stats.CounterStat;

Expand All @@ -28,23 +29,31 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

@ThreadSafe
public class MultilevelSplitQueue
{
static final int[] LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);

@GuardedBy("lock")
private final List<PriorityQueue<PrioritizedSplitRunner>> levelWaitingSplits;
@GuardedBy("lock")
private final long[] levelScheduledTime = new long[LEVEL_THRESHOLD_SECONDS.length];

private final AtomicLong[] levelMinPriority;
private final List<CounterStat> selectedLevelCounters;

private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();

public MultilevelSplitQueue()
private final boolean levelAbsolutePriority;
private final double levelTimeMultiplier;

public MultilevelSplitQueue(boolean levelAbsolutePriority, double levelTimeMultiplier)
{
this.levelMinPriority = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
this.levelWaitingSplits = new ArrayList<>(LEVEL_THRESHOLD_SECONDS.length);
Expand All @@ -57,6 +66,20 @@ public MultilevelSplitQueue()
}

this.selectedLevelCounters = counters.build();

this.levelAbsolutePriority = levelAbsolutePriority;
this.levelTimeMultiplier = levelTimeMultiplier;
}

private void addLevelTime(int level, long nanos)
{
lock.lock();
try {
levelScheduledTime[level] += nanos;
}
finally {
lock.unlock();
}
}

public void offer(PrioritizedSplitRunner split)
Expand All @@ -81,7 +104,7 @@ public PrioritizedSplitRunner take()
lock.lockInterruptibly();
try {
PrioritizedSplitRunner result;
while ((result = pollFirstSplit()) == null) {
while ((result = pollSplit()) == null) {
notEmpty.await();
}

Expand All @@ -102,6 +125,86 @@ public PrioritizedSplitRunner take()
}
}

/**
* Presto attempts to give each level a target amount of scheduled time, which is configurable
* using levelTimeMultiplier.
*
* This function selects the level that has the the lowest ratio of actual to the target time
* with the objective of minimizing deviation from the target scheduled time. From this level,
* we pick the split with the lowest priority.
*/
@GuardedBy("lock")
private PrioritizedSplitRunner pollSplit()
{
if (levelAbsolutePriority) {
return pollFirstSplit();
}

long targetScheduledTime = updateLevelTimes();
double worstRatio = 1;
int selectedLevel = -1;
for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
if (!levelWaitingSplits.get(level).isEmpty()) {
double ratio = levelScheduledTime[level] == 0 ? 0 : targetScheduledTime / (1.0 * levelScheduledTime[level]);
if (selectedLevel == -1 || ratio > worstRatio) {
worstRatio = ratio;
selectedLevel = level;
}
}

targetScheduledTime /= levelTimeMultiplier;
}

if (selectedLevel == -1) {
return null;
}

PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll();
checkState(result != null, "pollSplit cannot return null");

return result;
}

/**
* During periods of time when a level has no waiting splits, it will not accumulate
* accumulate scheduled time and will fall behind relative to other levels.
*
* This can cause temporary starvation for other levels when splits do reach the
* previously-empty level.
*
* To prevent this we set the scheduled time for levels which are empty to the expected
* scheduled time.
*
* @return target scheduled time for level 0
*/
@GuardedBy("lock")
private long updateLevelTimes()
{
long level0ExpectedTime = levelScheduledTime[0];
boolean updated;
do {
double currentMultiplier = levelTimeMultiplier;
updated = false;
for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
currentMultiplier /= levelTimeMultiplier;
long levelExpectedTime = (long) (level0ExpectedTime * currentMultiplier);

if (levelWaitingSplits.get(level).isEmpty()) {
levelScheduledTime[level] = levelExpectedTime;
continue;
}

if (levelScheduledTime[level] > levelExpectedTime) {
level0ExpectedTime = (long) (levelScheduledTime[level] / currentMultiplier);
updated = true;
break;
}
}
} while (updated && level0ExpectedTime != 0);

return level0ExpectedTime;
}

@GuardedBy("lock")
private PrioritizedSplitRunner pollFirstSplit()
{
Expand All @@ -115,6 +218,48 @@ private PrioritizedSplitRunner pollFirstSplit()
return null;
}

/**
* Presto 'charges' the quanta run time to the task <i>and</i> the level it belongs to in
* an effort to maintain the target thread utilization ratios between levels and to
* maintain fairness within a level.
*
* Consider an example split where a read hung for several minutes. This is either a bug
* or a failing dependency. In either case we do not want to charge the task too much,
* and we especially do not want to charge the level too much - i.e. cause other queries
* in this level to starve.
*
* @return the new priority for the task
*/
public Priority updatePriority(Priority oldPriority, long quantaNanos, long scheduledNanos)
{
int oldLevel = oldPriority.getLevel();
int newLevel = computeLevel(scheduledNanos);

long levelContribution = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);

if (oldLevel == newLevel) {
addLevelTime(oldLevel, levelContribution);
return new Priority(oldLevel, oldPriority.getLevelPriority() + quantaNanos);
}

long remainingLevelContribution = levelContribution;
long remainingTaskTime = quantaNanos;

// a task normally slowly accrues scheduled time in a level and then moves to the next, but
// if the split had a particularly long quanta, accrue time to each level as if it had run
// in that level up to the level limit.
for (int currentLevel = oldLevel; currentLevel < newLevel; currentLevel++) {
long timeAccruedToLevel = Math.min(SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[currentLevel + 1] - LEVEL_THRESHOLD_SECONDS[currentLevel]), remainingLevelContribution);
addLevelTime(currentLevel, timeAccruedToLevel);
remainingLevelContribution -= timeAccruedToLevel;
remainingTaskTime -= timeAccruedToLevel;
}

addLevelTime(newLevel, remainingLevelContribution);
long newLevelMinPriority = getLevelMinPriority(newLevel, scheduledNanos);
return new Priority(newLevel, newLevelMinPriority + remainingTaskTime);
}

public void remove(PrioritizedSplitRunner split)
{
checkArgument(split != null, "split is null");
Expand Down Expand Up @@ -179,4 +324,16 @@ public static int computeLevel(long threadUsageNanos)

return LEVEL_THRESHOLD_SECONDS.length - 1;
}

@VisibleForTesting
long[] getLevelScheduledTime()
{
lock.lock();
try {
return levelScheduledTime;
}
finally {
lock.unlock();
}
}
}
Expand Up @@ -89,6 +89,8 @@ class PrioritizedSplitRunner
this.globalScheduledTimeMicros = globalScheduledTimeMicros;
this.blockedQuantaWallTime = blockedQuantaWallTime;
this.unblockedQuantaWallTime = unblockedQuantaWallTime;

this.updateLevelPriority();
}

public TaskHandle getTaskHandle()
Expand Down
Expand Up @@ -151,16 +151,21 @@ public class TaskExecutor
@Inject
public TaskExecutor(TaskManagerConfig config)
{
this(requireNonNull(config, "config is null").getMaxWorkerThreads(), config.getMinDrivers());
this(requireNonNull(config, "config is null").getMaxWorkerThreads(), config.getMinDrivers(), config.getLevelTimeMultiplier().doubleValue(), config.isLevelAbsolutePriority(), Ticker.systemTicker());
}

public TaskExecutor(int runnerThreads, int minDrivers)
{
this(runnerThreads, minDrivers, Ticker.systemTicker());
}

@VisibleForTesting
public TaskExecutor(int runnerThreads, int minDrivers, Ticker ticker)
{
this(runnerThreads, minDrivers, 2, false, ticker);
}

@VisibleForTesting
public TaskExecutor(int runnerThreads, int minDrivers, double levelTimeMultiplier, boolean levelAbsolutePriority, Ticker ticker)
{
checkArgument(runnerThreads > 0, "runnerThreads must be at least 1");

Expand All @@ -172,7 +177,7 @@ public TaskExecutor(int runnerThreads, int minDrivers, Ticker ticker)
this.ticker = requireNonNull(ticker, "ticker is null");

this.minimumNumberOfDrivers = minDrivers;
this.waitingSplits = new MultilevelSplitQueue();
this.waitingSplits = new MultilevelSplitQueue(levelAbsolutePriority, levelTimeMultiplier);
this.tasks = new LinkedList<>();
}

Expand Down Expand Up @@ -611,31 +616,31 @@ public long getCompletedSplitsLevel4()
@Managed
public long getRunningTasksLevel0()
{
return calculateRunningTasksForLevel(0);
return getRunningTasksForLevel(0);
}

@Managed
public long getRunningTasksLevel1()
{
return calculateRunningTasksForLevel(1);
return getRunningTasksForLevel(1);
}

@Managed
public long getRunningTasksLevel2()
{
return calculateRunningTasksForLevel(2);
return getRunningTasksForLevel(2);
}

@Managed
public long getRunningTasksLevel3()
{
return calculateRunningTasksForLevel(3);
return getRunningTasksForLevel(3);
}

@Managed
public long getRunningTasksLevel4()
{
return calculateRunningTasksForLevel(4);
return getRunningTasksForLevel(4);
}

@Managed
Expand Down Expand Up @@ -757,11 +762,11 @@ public CounterStat getGlobalCpuTimeMicros()
return globalCpuTimeMicros;
}

private synchronized int calculateRunningTasksForLevel(int level)
private synchronized int getRunningTasksForLevel(int level)
{
int count = 0;
for (TaskHandle task : tasks) {
if (computeLevel(task.getScheduledNanos()) == level) {
if (task.getPriority().getLevel() == level) {
count++;
}
}
Expand Down

0 comments on commit 2395e96

Please sign in to comment.