Skip to content

Commit

Permalink
[ML] allow autoscaling to work when vertical scaling is possible (ela…
Browse files Browse the repository at this point in the history
…stic#84242)

When an NLP model is deployed, or a DFA/Anomaly job is assigned, we have historically relied only on the xpack.ml.max_lazy_ml_nodes to determine if scaling is possible. But, in certain scenarios, it may be that scaling is available when xpack.ml.max_lazy_ml_nodes is fully satisfied.

xpack.ml.max_ml_node_size is now checked to see if the current ML nodes exceed this size. If not, we assume vertical scaling is possible and allow the tasks to be created.

closes elastic#84198
  • Loading branch information
benwtrent authored and tlrx committed Mar 3, 2022
1 parent 0630055 commit cd636c4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 12 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/84242.yaml
@@ -0,0 +1,6 @@
pr: 84242
summary: Allow autoscaling to work when vertical scaling is possible
area: Machine Learning
type: bug
issues:
- 84198
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.elasticsearch.xpack.ml.inference.allocation.TrainedModelAllocationService;
import org.elasticsearch.xpack.ml.inference.persistence.ChunkedTrainedModelRestorer;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelDefinitionDoc;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

import java.util.Collections;
Expand All @@ -70,6 +72,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -89,6 +92,7 @@ public class TransportStartTrainedModelDeploymentAction extends TransportMasterN
private final NamedXContentRegistry xContentRegistry;
private final MlMemoryTracker memoryTracker;
protected volatile int maxLazyMLNodes;
protected volatile long maxMLNodeSize;

@Inject
public TransportStartTrainedModelDeploymentAction(
Expand Down Expand Up @@ -121,13 +125,19 @@ public TransportStartTrainedModelDeploymentAction(
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.trainedModelAllocationService = Objects.requireNonNull(trainedModelAllocationService);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
}

private void setMaxLazyMLNodes(int value) {
this.maxLazyMLNodes = value;
}

private void setMaxMLNodeSize(ByteSizeValue value) {
this.maxMLNodeSize = value.getBytes();
}

@Override
protected void masterOperation(
Task task,
Expand Down Expand Up @@ -241,7 +251,7 @@ private void waitForDeploymentState(
AllocationStatus.State state,
ActionListener<CreateTrainedModelAllocationAction.Response> listener
) {
DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes);
DeploymentStartedPredicate predicate = new DeploymentStartedPredicate(modelId, state, maxLazyMLNodes, maxMLNodeSize);
trainedModelAllocationService.waitForAllocationCondition(
modelId,
predicate,
Expand Down Expand Up @@ -402,11 +412,13 @@ private static class DeploymentStartedPredicate implements Predicate<ClusterStat
private final String modelId;
private final AllocationStatus.State waitForState;
private final int maxLazyMLNodes;
private final long maxMLNodeSize;

DeploymentStartedPredicate(String modelId, AllocationStatus.State waitForState, int maxLazyMLNodes) {
DeploymentStartedPredicate(String modelId, AllocationStatus.State waitForState, int maxLazyMLNodes, long maxMLNodeSize) {
this.modelId = ExceptionsHelper.requireNonNull(modelId, "model_id");
this.waitForState = waitForState;
this.maxLazyMLNodes = maxLazyMLNodes;
this.maxMLNodeSize = maxMLNodeSize;
}

@Override
Expand Down Expand Up @@ -445,9 +457,14 @@ public boolean test(ClusterState clusterState) {
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
.filter(TaskParams::mayAllocateToNode)
.collect(Collectors.toList());
OptionalLong smallestMLNode = nodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min();

// No nodes allocated at all!
if (nodesAndState.isEmpty() && maxLazyMLNodes <= nodes.size()) {
if (nodesAndState.isEmpty()
// We cannot scale horizontally
&& maxLazyMLNodes <= nodes.size()
// We cannot scale vertically
&& (smallestMLNode.isEmpty() || smallestMLNode.getAsLong() >= maxMLNodeSize)) {
String msg = "Could not start deployment because no suitable nodes were found, allocation explanation ["
+ trainedModelAllocation.getReason()
+ "]";
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -272,40 +273,45 @@ public PersistentTasksCustomMetadata.Assignment selectNode(
reasons.values(),
maxNodeSize > 0L
? NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage)
: Long.MAX_VALUE
: Long.MAX_VALUE,
maxNodeSize
);
}

PersistentTasksCustomMetadata.Assignment createAssignment(
long estimatedMemoryUsage,
DiscoveryNode minLoadedNode,
Collection<String> reasons,
long biggestPossibleJob
long mostAvailableMemoryForML,
long maxNodeSize
) {
if (minLoadedNode == null) {
String explanation = String.join("|", reasons);
PersistentTasksCustomMetadata.Assignment currentAssignment = new PersistentTasksCustomMetadata.Assignment(null, explanation);
logger.debug("no node selected for job [{}], reasons [{}]", jobId, explanation);
if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > biggestPossibleJob) {
if ((MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage) > mostAvailableMemoryForML) {
ParameterizedMessage message = new ParameterizedMessage(
"[{}] not waiting for node assignment as estimated job size [{}] is greater than largest possible job size [{}]",
jobId,
MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + estimatedMemoryUsage,
biggestPossibleJob
mostAvailableMemoryForML
);
logger.info(message);
List<String> newReasons = new ArrayList<>(reasons);
newReasons.add(message.getFormattedMessage());
explanation = String.join("|", newReasons);
return new PersistentTasksCustomMetadata.Assignment(null, explanation);
}
return considerLazyAssignment(currentAssignment);
return considerLazyAssignment(currentAssignment, maxNodeSize);
}
logger.debug("selected node [{}] for job [{}]", minLoadedNode, jobId);
return new PersistentTasksCustomMetadata.Assignment(minLoadedNode.getId(), "");
}

PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksCustomMetadata.Assignment currentAssignment) {
PersistentTasksCustomMetadata.Assignment considerLazyAssignment(
PersistentTasksCustomMetadata.Assignment currentAssignment,
long maxNodeSize
) {

assert currentAssignment.getExecutorNode() == null;

Expand All @@ -316,10 +322,21 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC
}
}

// Can we scale horizontally?
if (numMlNodes < maxLazyNodes) { // Means we have lazy nodes left to allocate
return AWAITING_LAZY_ASSIGNMENT;
}

// Can we scale vertically and is scaling possible?
if (maxNodeSize > 0L && maxLazyNodes > 0) {
OptionalLong smallestMLNode = candidateNodes.stream()
.filter(MachineLearning::isMlNode)
.map(NodeLoadDetector::getNodeSize)
.flatMapToLong(OptionalLong::stream)
.min();
if (smallestMLNode.isPresent() && smallestMLNode.getAsLong() < maxNodeSize) {
return AWAITING_LAZY_ASSIGNMENT;
}
}
return currentAssignment;
}

Expand Down
Expand Up @@ -29,10 +29,27 @@
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR;

public class NodeLoadDetector {

private final MlMemoryTracker mlMemoryTracker;

/**
* Returns the node's total memory size.
* @param node The node whose size to grab
* @return maybe the answer, will be empty if size cannot be determined
*/
public static OptionalLong getNodeSize(DiscoveryNode node) {
String memoryString = node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR);
try {
return OptionalLong.of(Long.parseLong(memoryString));
} catch (NumberFormatException e) {
assert e == null : "ml.machine_memory should parse because we set it internally: invalid value was " + memoryString;
return OptionalLong.empty();
}
}

public NodeLoadDetector(MlMemoryTracker memoryTracker) {
this.mlMemoryTracker = memoryTracker;
}
Expand Down
Expand Up @@ -1009,7 +1009,8 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() {
node -> nodeFilter(node, job)
);
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
new PersistentTasksCustomMetadata.Assignment(null, "foo")
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
ByteSizeValue.ofGb(1).getBytes()
);
assertEquals("foo", result.getExplanation());
assertNull(result.getExecutorNode());
Expand Down Expand Up @@ -1053,7 +1054,53 @@ public void testConsiderLazyAssignmentWithLazyNodes() {
node -> nodeFilter(node, job)
);
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
new PersistentTasksCustomMetadata.Assignment(null, "foo")
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
ByteSizeValue.ofGb(1).getBytes()
);
assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation());
assertNull(result.getExecutorNode());
}

public void testConsiderLazyAssignmentWithFilledLazyNodesAndVerticalScale() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(
new DiscoveryNode(
"_node_name1",
"_node_id1",
new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())),
ROLES_WITH_ML,
Version.CURRENT
)
)
.add(
new DiscoveryNode(
"_node_name2",
"_node_id2",
new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(ByteSizeValue.ofGb(1).getBytes())),
ROLES_WITH_ML,
Version.CURRENT
)
)
.build();

ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
cs.nodes(nodes);

Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
randomIntBetween(1, 3),
node -> nodeFilter(node, job)
);
PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.considerLazyAssignment(
new PersistentTasksCustomMetadata.Assignment(null, "foo"),
ByteSizeValue.ofGb(64).getBytes()
);
assertEquals(JobNodeSelector.AWAITING_LAZY_ASSIGNMENT.getExplanation(), result.getExplanation());
assertNull(result.getExecutorNode());
Expand Down

0 comments on commit cd636c4

Please sign in to comment.