Skip to content

Commit

Permalink
Implement node selection for caching affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan committed Apr 10, 2023
1 parent a3bd104 commit 13b044d
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public enum SplitsBalancingPolicy
NODE, STAGE
}

public enum CacheAffinityPolicy {
NONE, HARD, SOFT
}

private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
Expand All @@ -54,6 +58,7 @@ public enum SplitsBalancingPolicy
private int maxUnacknowledgedSplitsPerTask = 2000;
private Duration allowedNoMatchingNodePeriod = new Duration(2, TimeUnit.MINUTES);
private NodeAllocatorType nodeAllocatorType = NodeAllocatorType.BIN_PACKING;
private CacheAffinityPolicy cacheAffinityPolicy = CacheAffinityPolicy.NONE;

@NotNull
public NodeSchedulerPolicy getNodeSchedulerPolicy()
Expand Down Expand Up @@ -229,4 +234,16 @@ private static NodeAllocatorType toNodeAllocatorType(String nodeAllocatorType)
}
throw new IllegalArgumentException("Unknown node allocator type: " + nodeAllocatorType);
}

public CacheAffinityPolicy getCacheAffinityPolicy()
{
return cacheAffinityPolicy;
}

@Config("node-scheduler.cache-affinity-policy")
public NodeSchedulerConfig setCacheAffinityPolicy(CacheAffinityPolicy cacheAffinityPolicy)
{
this.cacheAffinityPolicy = cacheAffinityPolicy;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.scheduler;

import io.trino.metadata.InternalNode;
import io.trino.spi.HostAddress;

import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.unmodifiableList;

public class SimpleNodeProvider
{
private final List<InternalNode> sortedNodes;

public SimpleNodeProvider(List<InternalNode> sortedNodes)
{
this.sortedNodes = sortedNodes;
}

public List<HostAddress> get(String identifier, int count)
{
int size = sortedNodes.size();
int mod = identifier.hashCode() % size;
int position = mod < 0 ? mod + size : mod;
List<HostAddress> chosenCandidates = new ArrayList<>();
if (count > size) {
count = size;
}
for (int i = 0; i < count && i < sortedNodes.size(); i++) {
chosenCandidates.add(sortedNodes.get((position + i) % size).getHostAndPort());
}
return unmodifiableList(chosenCandidates);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static io.trino.execution.scheduler.NodeScheduler.selectExactNodes;
import static io.trino.execution.scheduler.NodeScheduler.selectNodes;
import static io.trino.execution.scheduler.NodeScheduler.toWhenHasSplitQueueSpaceFuture;
import static io.trino.execution.scheduler.NodeSchedulerConfig.CacheAffinityPolicy.NONE;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;
Expand All @@ -80,6 +81,7 @@ public class UniformNodeSelector
private final SplitsBalancingPolicy splitsBalancingPolicy;
private final boolean optimizedLocalScheduling;
private final QueueSizeAdjuster queueSizeAdjuster;
private final NodeSchedulerConfig.CacheAffinityPolicy cacheAffinityPolicy;

public UniformNodeSelector(
InternalNodeManager nodeManager,
Expand All @@ -92,7 +94,8 @@ public UniformNodeSelector(
long maxAdjustedPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling)
boolean optimizedLocalScheduling,
NodeSchedulerConfig.CacheAffinityPolicy cacheAffinityPolicy)
{
this(nodeManager,
nodeTaskMap,
Expand All @@ -104,6 +107,7 @@ public UniformNodeSelector(
maxUnacknowledgedSplitsPerTask,
splitsBalancingPolicy,
optimizedLocalScheduling,
cacheAffinityPolicy,
new QueueSizeAdjuster(minPendingSplitsWeightPerTask, maxAdjustedPendingSplitsWeightPerTask));
}

Expand All @@ -119,6 +123,7 @@ public UniformNodeSelector(
int maxUnacknowledgedSplitsPerTask,
SplitsBalancingPolicy splitsBalancingPolicy,
boolean optimizedLocalScheduling,
NodeSchedulerConfig.CacheAffinityPolicy cacheAffinityPolicy,
QueueSizeAdjuster queueSizeAdjuster)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
Expand All @@ -133,6 +138,7 @@ public UniformNodeSelector(
this.splitsBalancingPolicy = requireNonNull(splitsBalancingPolicy, "splitsBalancingPolicy is null");
this.optimizedLocalScheduling = optimizedLocalScheduling;
this.queueSizeAdjuster = queueSizeAdjuster;
this.cacheAffinityPolicy = cacheAffinityPolicy;
}

@Override
Expand Down Expand Up @@ -177,19 +183,27 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

if (cacheAffinityPolicy != NONE) {
SimpleNodeProvider nodeProvider = new SimpleNodeProvider(filteredNodes);
for (Split split : splits) {
if (split.getConnectorSplit().getCacheIdentifier().isPresent()) {
List<InternalNode> candidateNodes = selectExactNodes(nodeMap,
nodeProvider.get(split.getConnectorSplit().getCacheIdentifier().get(), 2), includeCoordinator);
if (assign(assignment, assignmentStats, split, candidateNodes)) {
splitsToBeRedistributed = true;
continue;
}
}
remainingSplits.add(split);
}
}
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
for (Split split : splits) {
if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty()) {
List<InternalNode> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);

Optional<InternalNode> chosenNode = candidateNodes.stream()
.filter(ownerNode -> assignmentStats.getTotalSplitsWeight(ownerNode) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(ownerNode) < maxUnacknowledgedSplitsPerTask)
.min(comparingLong(assignmentStats::getTotalSplitsWeight));

if (chosenNode.isPresent()) {
assignment.put(chosenNode.get(), split);
assignmentStats.addAssignedSplit(chosenNode.get(), split.getSplitWeight());
if (assign(assignment, assignmentStats, split, candidateNodes)) {
splitsToBeRedistributed = true;
continue;
}
Expand Down Expand Up @@ -268,6 +282,20 @@ else if (!splitWaitingForAnyNode) {
return new SplitPlacementResult(blocked, assignment);
}

private boolean assign(Multimap<InternalNode, Split> assignment, NodeAssignmentStats assignmentStats, Split split, List<InternalNode> candidateNodes)
{
Optional<InternalNode> chosenNode = candidateNodes.stream()
.filter(ownerNode -> assignmentStats.getTotalSplitsWeight(ownerNode) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(ownerNode) < maxUnacknowledgedSplitsPerTask)
.min(comparingLong(assignmentStats::getTotalSplitsWeight));

if (chosenNode.isPresent()) {
assignment.put(chosenNode.get(), split);
assignmentStats.addAssignedSplit(chosenNode.get(), split.getSplitWeight());
return true;
}
return false;
}

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, BucketNodeMap bucketNodeMap)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class UniformNodeSelectorFactory
private final boolean optimizedLocalScheduling;
private final NodeTaskMap nodeTaskMap;
private final Duration nodeMapMemoizationDuration;
private final NodeSchedulerConfig.CacheAffinityPolicy cacheAffinityPolicy;

@Inject
public UniformNodeSelectorFactory(
Expand Down Expand Up @@ -99,6 +100,7 @@ public UniformNodeSelectorFactory(
this.minPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(minPendingSplitsPerTask);
this.maxAdjustedPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(maxAdjustedPendingSplitsWeightPerTask);
this.nodeMapMemoizationDuration = nodeMapMemoizationDuration;
this.cacheAffinityPolicy = config.getCacheAffinityPolicy();
}

@Override
Expand Down Expand Up @@ -129,7 +131,8 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
maxAdjustedPendingSplitsWeightPerTask,
getMaxUnacknowledgedSplitsPerTask(session),
splitsBalancingPolicy,
optimizedLocalScheduling);
optimizedLocalScheduling,
cacheAffinityPolicy);
}

private NodeMap createNodeMap(Optional<CatalogHandle> catalogHandle)
Expand Down
6 changes: 6 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSplit;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.instanceSize;
Expand Down Expand Up @@ -89,4 +90,9 @@ public long getRetainedSizeInBytes()
+ catalogHandle.getRetainedSizeInBytes()
+ connectorSplit.getRetainedSizeInBytes();
}

public Optional<String> getCacheIdentifier()
{
return connectorSplit.getCacheIdentifier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.execution.scheduler.NodeSchedulerConfig.CacheAffinityPolicy.NONE;
import static io.trino.execution.scheduler.NodeSchedulerConfig.CacheAffinityPolicy.SOFT;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand All @@ -43,7 +45,8 @@ public void testDefaults()
.setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE)
.setOptimizedLocalScheduling(true)
.setAllowedNoMatchingNodePeriod(new Duration(2, MINUTES))
.setNodeAllocatorType("bin_packing"));
.setNodeAllocatorType("bin_packing")
.setCacheAffinityPolicy(NONE));
}

@Test
Expand All @@ -61,6 +64,7 @@ public void testExplicitPropertyMappings()
.put("node-scheduler.optimized-local-scheduling", "false")
.put("node-scheduler.allowed-no-matching-node-period", "1m")
.put("node-scheduler.allocator-type", "fixed_count")
.put("node-scheduler.cache-affinity-policy", "SOFT")
.buildOrThrow();

NodeSchedulerConfig expected = new NodeSchedulerConfig()
Expand All @@ -74,7 +78,8 @@ public void testExplicitPropertyMappings()
.setSplitsBalancingPolicy(NODE)
.setOptimizedLocalScheduling(false)
.setAllowedNoMatchingNodePeriod(new Duration(1, MINUTES))
.setNodeAllocatorType("fixed_count");
.setNodeAllocatorType("fixed_count")
.setCacheAffinityPolicy(SOFT);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.scheduler.NodeSchedulerConfig.CacheAffinityPolicy.NONE;
import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -134,6 +135,7 @@ public void testQueueSizeAdjustmentScaleDown()
500,
NodeSchedulerConfig.SplitsBalancingPolicy.STAGE,
false,
NONE,
queueSizeAdjuster);

Set<Split> splits = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.spi.SplitWeight;

import java.util.List;
import java.util.Optional;

public interface ConnectorSplit
{
Expand All @@ -42,4 +43,9 @@ default long getRetainedSizeInBytes()
{
throw new UnsupportedOperationException("This connector does not provide memory accounting capabilities for ConnectorSplit");
}

default Optional<String> getCacheIdentifier()
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ public Object getInfo()
.buildOrThrow();
}

@Override
public Optional<String> getCacheIdentifier()
{
return Optional.of(path);
}

@Override
public String toString()
{
Expand Down

0 comments on commit 13b044d

Please sign in to comment.