Skip to content

Commit

Permalink
Adjust scheduler assignment queue for node
Browse files Browse the repository at this point in the history
Add splits assignment adjustment based on node statistics. If the node is able to process all splits in the queue,
the amount of max pending splits to be assigned is doubled in the next iteration of the computeAssignments method.
Scaling down is done after interval from previous scaling up.
Queue adjustment is limited to be in range of newly created `min-pending-splits-per-task` and changed `max-pending-splits-per-task` to `max-adjusted-pending-splits-per-task`.
  • Loading branch information
Dith3r authored and sopel39 committed Dec 21, 2022
1 parent 6879708 commit ac18d5b
Show file tree
Hide file tree
Showing 15 changed files with 574 additions and 61 deletions.
Expand Up @@ -66,7 +66,12 @@ public long getTotalSplitsWeight(InternalNode node)

public long getQueuedSplitsWeightForStage(InternalNode node)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return getQueuedSplitsWeightForStage(node.getNodeIdentifier());
}

public long getQueuedSplitsWeightForStage(String nodeId)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(nodeId);
return stageInfo == null ? 0 : stageInfo.getQueuedSplitsWeight();
}

Expand Down
Expand Up @@ -157,7 +157,7 @@ public static SplitPlacementResult selectDistributionNodes(
NodeMap nodeMap,
NodeTaskMap nodeTaskMap,
long maxSplitsWeightPerNode,
long maxPendingSplitsWeightPerTask,
long minPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
Set<Split> splits,
List<RemoteTask> existingTasks,
Expand All @@ -173,7 +173,7 @@ public static SplitPlacementResult selectDistributionNodes(
SplitWeight splitWeight = split.getSplitWeight();

// if node is full, don't schedule now, which will push back on the scheduling of splits
if (canAssignSplitToDistributionNode(assignmentStats, node, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splitWeight)) {
if (canAssignSplitToDistributionNode(assignmentStats, node, maxSplitsWeightPerNode, minPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splitWeight)) {
assignments.put(node, split);
assignmentStats.addAssignedSplit(node, splitWeight);
}
Expand All @@ -182,15 +182,15 @@ public static SplitPlacementResult selectDistributionNodes(
}
}

ListenableFuture<Void> blocked = toWhenHasSplitQueueSpaceFuture(blockedNodes, existingTasks, calculateLowWatermark(maxPendingSplitsWeightPerTask));
ListenableFuture<Void> blocked = toWhenHasSplitQueueSpaceFuture(blockedNodes, existingTasks, calculateLowWatermark(minPendingSplitsWeightPerTask));
return new SplitPlacementResult(blocked, ImmutableMultimap.copyOf(assignments));
}

private static boolean canAssignSplitToDistributionNode(NodeAssignmentStats assignmentStats, InternalNode node, long maxSplitsWeightPerNode, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitWeight splitWeight)
private static boolean canAssignSplitToDistributionNode(NodeAssignmentStats assignmentStats, InternalNode node, long maxSplitsWeightPerNode, long minPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, SplitWeight splitWeight)
{
return assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask &&
(canAssignSplitBasedOnWeight(assignmentStats.getTotalSplitsWeight(node), maxSplitsWeightPerNode, splitWeight) ||
canAssignSplitBasedOnWeight(assignmentStats.getQueuedSplitsWeightForStage(node), maxPendingSplitsWeightPerTask, splitWeight));
canAssignSplitBasedOnWeight(assignmentStats.getQueuedSplitsWeightForStage(node), minPendingSplitsWeightPerTask, splitWeight));
}

public static boolean canAssignSplitBasedOnWeight(long currentWeight, long weightLimit, SplitWeight splitWeight)
Expand All @@ -200,9 +200,9 @@ public static boolean canAssignSplitBasedOnWeight(long currentWeight, long weigh
return addExact(currentWeight, splitWeight.getRawValue()) <= weightLimit || (currentWeight == 0 && weightLimit > 0);
}

public static long calculateLowWatermark(long maxPendingSplitsWeightPerTask)
public static long calculateLowWatermark(long minPendingSplitsWeightPerTask)
{
return (long) Math.ceil(maxPendingSplitsWeightPerTask * 0.5);
return (long) Math.ceil(minPendingSplitsWeightPerTask * 0.5);
}

public static ListenableFuture<Void> toWhenHasSplitQueueSpaceFuture(Set<InternalNode> blockedNodes, List<RemoteTask> existingTasks, long weightSpaceThreshold)
Expand Down
Expand Up @@ -46,7 +46,8 @@ public enum SplitsBalancingPolicy
private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private int minPendingSplitsPerTask = 10;
private int maxAdjustedPendingSplitsWeightPerTask = 500;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;
private SplitsBalancingPolicy splitsBalancingPolicy = SplitsBalancingPolicy.STAGE;
Expand Down Expand Up @@ -108,17 +109,30 @@ public NodeSchedulerConfig setIncludeCoordinator(boolean includeCoordinator)
return this;
}

@Config("node-scheduler.max-pending-splits-per-task")
@LegacyConfig({"node-scheduler.max-pending-splits-per-node-per-task", "node-scheduler.max-pending-splits-per-node-per-stage"})
public NodeSchedulerConfig setMaxPendingSplitsPerTask(int maxPendingSplitsPerTask)
@Config("node-scheduler.min-pending-splits-per-task")
@LegacyConfig({"node-scheduler.max-pending-splits-per-task", "node-scheduler.max-pending-splits-per-node-per-task", "node-scheduler.max-pending-splits-per-node-per-stage"})
public NodeSchedulerConfig setMinPendingSplitsPerTask(int minPendingSplitsPerTask)
{
this.maxPendingSplitsPerTask = maxPendingSplitsPerTask;
this.minPendingSplitsPerTask = minPendingSplitsPerTask;
return this;
}

public int getMaxPendingSplitsPerTask()
public int getMinPendingSplitsPerTask()
{
return maxPendingSplitsPerTask;
return minPendingSplitsPerTask;
}

@Config("node-scheduler.max-adjusted-pending-splits-per-task")
public NodeSchedulerConfig setMaxAdjustedPendingSplitsWeightPerTask(int maxAdjustedPendingSplitsWeightPerTask)
{
this.maxAdjustedPendingSplitsWeightPerTask = maxAdjustedPendingSplitsWeightPerTask;
return this;
}

@Min(0)
public int getMaxAdjustedPendingSplitsWeightPerTask()
{
return maxAdjustedPendingSplitsWeightPerTask;
}

public int getMaxSplitsPerNode()
Expand Down
Expand Up @@ -172,7 +172,7 @@ else if (!splitWaitingForAnyNode) {
continue;
}
Set<InternalNode> nodes = nodeMap.getWorkersByNetworkPath().get(location);
chosenNode = bestNodeSplitCount(splitWeight, new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplitsWeightPerTask(i, depth), assignmentStats);
chosenNode = bestNodeSplitCount(splitWeight, new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMinPendingSplitsWeightPerTask(i, depth), assignmentStats);
if (chosenNode != null) {
chosenDepth = i;
break;
Expand All @@ -196,12 +196,12 @@ else if (!splitWaitingForAnyNode) {
}

ListenableFuture<Void> blocked;
long maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplitsWeightPerTask(0, topologicalSplitCounters.size() - 1);
long minPendingForWildcardNetworkAffinity = calculateMinPendingSplitsWeightPerTask(0, topologicalSplitCounters.size() - 1);
if (splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(minPendingForWildcardNetworkAffinity));
}
else {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(minPendingForWildcardNetworkAffinity));
}
return new SplitPlacementResult(blocked, assignment);
}
Expand All @@ -211,7 +211,7 @@ else if (!splitWaitingForAnyNode) {
* splitAffinity. A split with zero affinity can only fill half the queue, whereas one that matches
* exactly can fill the entire queue.
*/
private long calculateMaxPendingSplitsWeightPerTask(int splitAffinity, int totalDepth)
private long calculateMinPendingSplitsWeightPerTask(int splitAffinity, int totalDepth)
{
if (totalDepth == 0) {
return maxPendingSplitsWeightPerTask;
Expand All @@ -229,7 +229,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}

@Nullable
private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<InternalNode> candidates, int minCandidatesWhenFull, long maxPendingSplitsWeightPerTask, NodeAssignmentStats assignmentStats)
private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<InternalNode> candidates, int minCandidatesWhenFull, long minPendingSplitsWeightPerTask, NodeAssignmentStats assignmentStats)
{
InternalNode bestQueueNotFull = null;
long minWeight = Long.MAX_VALUE;
Expand All @@ -246,7 +246,7 @@ private InternalNode bestNodeSplitCount(SplitWeight splitWeight, Iterator<Intern
}
fullCandidatesConsidered++;
long taskQueuedWeight = assignmentStats.getQueuedSplitsWeightForStage(node);
if (taskQueuedWeight < minWeight && canAssignSplitBasedOnWeight(taskQueuedWeight, maxPendingSplitsWeightPerTask, splitWeight)) {
if (taskQueuedWeight < minWeight && canAssignSplitBasedOnWeight(taskQueuedWeight, minPendingSplitsWeightPerTask, splitWeight)) {
minWeight = taskQueuedWeight;
bestQueueNotFull = node;
}
Expand Down
Expand Up @@ -63,7 +63,7 @@ public class TopologyAwareNodeSelectorFactory
private final int minCandidates;
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
private final long maxPendingSplitsWeightPerTask;
private final long minPendingSplitsWeightPerTask;
private final NodeTaskMap nodeTaskMap;

private final List<CounterStat> placementCounters;
Expand All @@ -87,10 +87,10 @@ public TopologyAwareNodeSelectorFactory(
this.includeCoordinator = schedulerConfig.isIncludeCoordinator();
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
int maxSplitsPerNode = schedulerConfig.getMaxSplitsPerNode();
int maxPendingSplitsPerTask = schedulerConfig.getMaxPendingSplitsPerTask();
checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask");
int minPendingSplitsPerTask = schedulerConfig.getMinPendingSplitsPerTask();
checkArgument(maxSplitsPerNode >= minPendingSplitsPerTask, "maxSplitsPerNode must be > minPendingSplitsPerTask");
this.maxSplitsWeightPerNode = SplitWeight.rawValueForStandardSplitCount(maxSplitsPerNode);
this.maxPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(maxPendingSplitsPerTask);
this.minPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(minPendingSplitsPerTask);

Builder<CounterStat> placementCounters = ImmutableList.builder();
ImmutableMap.Builder<String, CounterStat> placementCountersByName = ImmutableMap.builder();
Expand Down Expand Up @@ -133,7 +133,7 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
nodeMap,
minCandidates,
maxSplitsWeightPerNode,
maxPendingSplitsWeightPerTask,
minPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
placementCounters,
networkTopology);
Expand Down

0 comments on commit ac18d5b

Please sign in to comment.