Skip to content

Commit

Permalink
change CB limit exception code to 429 and skip CB check for remote mo…
Browse files Browse the repository at this point in the history
…dels

Signed-off-by: Xun Zhang <xunzh@amazon.com>
  • Loading branch information
Zhangxunmt committed May 23, 2024
1 parent 70ea17f commit d0791a8
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
import org.opensearch.ml.common.input.MLInput;
import org.opensearch.ml.common.transport.MLTaskResponse;
Expand Down Expand Up @@ -178,8 +178,8 @@ public void onResponse(MLModel mlModel) {
);
} else if (e instanceof MLResourceNotFoundException) {
wrappedListener.onFailure(new OpenSearchStatusException(e.getMessage(), RestStatus.NOT_FOUND));
} else if (e instanceof MLLimitExceededException) {
wrappedListener.onFailure(new OpenSearchStatusException(e.getMessage(), RestStatus.SERVICE_UNAVAILABLE));
} else if (e instanceof CircuitBreakingException) {
wrappedListener.onFailure(e);
} else {
wrappedListener
.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -838,7 +839,9 @@ private <T> ThreadedActionListener<T> threadedActionListener(String threadPoolNa
* @param runningTaskLimit limit
*/
public void checkAndAddRunningTask(MLTask mlTask, Integer runningTaskLimit) {
checkOpenCircuitBreaker(mlCircuitBreakerService, mlStats);
if (Objects.nonNull(mlTask) && mlTask.getFunctionName() != FunctionName.REMOTE) {
checkOpenCircuitBreaker(mlCircuitBreakerService, mlStats);
}
mlTaskManager.checkLimitAndAddRunningTask(mlTask, runningTaskLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,7 @@ public void dispatchTask(
if (clusterService.localNode().getId().equals(node.getId())) {
log.debug("Execute ML predict request {} locally on node {}", request.getRequestID(), node.getId());
request.setDispatchTask(false);
run(
// This is by design to NOT use mlPredictionTaskRequest.getMlInput().getAlgorithm() here
functionName,
request,
transportService,
listener
);
checkCBAndExecute(functionName, request, listener);
} else {
log.debug("Execute ML predict request {} remotely on node {}", request.getRequestID(), node.getId());
request.setDispatchTask(false);
Expand Down
10 changes: 8 additions & 2 deletions plugin/src/main/java/org/opensearch/ml/task/MLTaskRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ protected void handleAsyncMLTaskComplete(MLTask mlTask) {
public void run(FunctionName functionName, Request request, TransportService transportService, ActionListener<Response> listener) {
if (!request.isDispatchTask()) {
log.debug("Run ML request {} locally", request.getRequestID());
checkOpenCircuitBreaker(mlCircuitBreakerService, mlStats);
executeTask(request, listener);
checkCBAndExecute(functionName, request, listener);
return;
}
dispatchTask(functionName, request, transportService, listener);
Expand Down Expand Up @@ -129,4 +128,11 @@ public void dispatchTask(
protected abstract TransportResponseHandler<Response> getResponseHandler(ActionListener<Response> listener);

protected abstract void executeTask(Request request, ActionListener<Response> listener);

protected void checkCBAndExecute(FunctionName functionName, Request request, ActionListener<Response> listener) {
if (functionName != FunctionName.REMOTE) {
checkOpenCircuitBreaker(mlCircuitBreakerService, mlStats);
}
executeTask(request, listener);
}
}
8 changes: 6 additions & 2 deletions plugin/src/main/java/org/opensearch/ml/utils/MLNodeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.breaker.ThresholdCircuitBreaker;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.stats.MLNodeLevelStat;
import org.opensearch.ml.stats.MLStats;

Expand Down Expand Up @@ -92,7 +93,10 @@ public static void checkOpenCircuitBreaker(MLCircuitBreakerService mlCircuitBrea
ThresholdCircuitBreaker openCircuitBreaker = mlCircuitBreakerService.checkOpenCB();
if (openCircuitBreaker != null) {
mlStats.getStat(MLNodeLevelStat.ML_CIRCUIT_BREAKER_TRIGGER_COUNT).increment();
throw new MLLimitExceededException(openCircuitBreaker.getName() + " is open, please check your resources!");
throw new CircuitBreakingException(
openCircuitBreaker.getName() + " is open, please check your resources!",
CircuitBreaker.Durability.TRANSIENT
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.ml.common.FunctionName;
Expand All @@ -43,7 +45,6 @@
import org.opensearch.ml.common.dataframe.DataFrameBuilder;
import org.opensearch.ml.common.dataset.DataFrameInputDataset;
import org.opensearch.ml.common.dataset.remote.RemoteInferenceInputDataSet;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
import org.opensearch.ml.common.input.MLInput;
import org.opensearch.ml.common.input.parameter.clustering.KMeansParams;
Expand Down Expand Up @@ -242,7 +243,7 @@ public void testPrediction_MLLimitExceededException() {

doAnswer(invocation -> {
ActionListener<Boolean> listener = invocation.getArgument(3);
listener.onFailure(new MLLimitExceededException("Memory Circuit Breaker is open, please check your resources!"));
listener.onFailure(new CircuitBreakingException("Memory Circuit Breaker is open, please check your resources!", CircuitBreaker.Durability.TRANSIENT));
return null;
}).when(modelAccessControlHelper).validateModelGroupAccess(any(), any(), any(), any());

Expand All @@ -253,7 +254,7 @@ public void testPrediction_MLLimitExceededException() {

transportPredictionTaskAction.doExecute(null, mlPredictionTaskRequest, actionListener);

ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(OpenSearchStatusException.class);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(CircuitBreakingException.class);
verify(actionListener).onFailure(argumentCaptor.capture());
assertEquals("Memory Circuit Breaker is open, please check your resources!", argumentCaptor.getValue().getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.ml.breaker.MLCircuitBreakerService;
import org.opensearch.ml.breaker.MemoryCircuitBreaker;
import org.opensearch.ml.breaker.ThresholdCircuitBreaker;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.FunctionName;
Expand Down Expand Up @@ -115,7 +116,6 @@
import org.opensearch.ml.stats.MLStats;
import org.opensearch.ml.stats.suppliers.CounterSupplier;
import org.opensearch.ml.task.MLTaskManager;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.script.ScriptService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -324,7 +324,7 @@ public void testRegisterMLModel_CircuitBreakerOpen() {
when(mlCircuitBreakerService.checkOpenCB()).thenReturn(thresholdCircuitBreaker);
when(thresholdCircuitBreaker.getName()).thenReturn("Disk Circuit Breaker");
when(thresholdCircuitBreaker.getThreshold()).thenReturn(87);
expectedEx.expect(MLException.class);
expectedEx.expect(CircuitBreakingException.class);
expectedEx.expectMessage("Disk Circuit Breaker is open, please check your resources!");
modelManager.registerMLModel(registerModelInput, mlTask);
verify(mlTaskManager).updateMLTask(anyString(), anyMap(), anyLong(), anyBoolean());
Expand Down Expand Up @@ -453,21 +453,30 @@ public void testRegisterMLRemoteModel() throws PrivilegedActionException {
verify(mlTaskManager).updateMLTask(anyString(), anyMap(), anyLong(), anyBoolean());
}

public void testRegisterMLRemoteModel_WhenMemoryCBOpen_ThenFail() {
public void testRegisterMLRemoteModel_SkipMemoryCBOpen() {
ActionListener<MLRegisterModelResponse> listener = mock(ActionListener.class);
MemoryCircuitBreaker memCB = new MemoryCircuitBreaker(mock(JvmService.class));
String memCBIsOpenMessage = memCB.getName() + " is open, please check your resources!";
when(mlCircuitBreakerService.checkOpenCB()).thenThrow(new MLLimitExceededException(memCBIsOpenMessage));

doNothing().when(mlTaskManager).checkLimitAndAddRunningTask(any(), any());
when(mlCircuitBreakerService.checkOpenCB())
.thenThrow(
new CircuitBreakingException(
"Memory Circuit Breaker is open, please check your resources!",
CircuitBreaker.Durability.TRANSIENT
)
);
when(threadPool.executor(REGISTER_THREAD_POOL)).thenReturn(taskExecutorService);
when(modelHelper.isModelAllowed(any(), any())).thenReturn(true);
MLRegisterModelInput pretrainedInput = mockRemoteModelInput(true);
MLTask pretrainedTask = MLTask.builder().taskId("pretrained").modelId("pretrained").functionName(FunctionName.REMOTE).build();
mock_MLIndicesHandler_initModelIndex(mlIndicesHandler, true);
doAnswer(invocation -> {
ActionListener<IndexResponse> indexResponseActionListener = (ActionListener<IndexResponse>) invocation.getArguments()[1];
indexResponseActionListener.onResponse(indexResponse);
return null;
}).when(client).index(any(), any());
when(indexResponse.getId()).thenReturn("mockIndexId");
modelManager.registerMLRemoteModel(pretrainedInput, pretrainedTask, listener);

ArgumentCaptor<Exception> argCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(argCaptor.capture());
Exception e = argCaptor.getValue();
assertTrue(e instanceof MLLimitExceededException);
assertEquals(memCBIsOpenMessage, e.getMessage());
assertEquals(pretrainedTask.getFunctionName(), FunctionName.REMOTE);
verify(mlTaskManager).updateMLTask(anyString(), anyMap(), anyLong(), anyBoolean());
}

public void testIndexRemoteModel() throws PrivilegedActionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testRunWithMemoryCircuitBreaker() throws IOException {
exception.getMessage(),
allOf(
containsString("Memory Circuit Breaker is open, please check your resources!"),
containsString("m_l_limit_exceeded_exception")
containsString("circuit_breaking_exception")
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void testPredictWithAutoDeployAndTTL_RemoteModel() throws IOException, In
Response response = createConnector(completionModelConnectorEntity);
Map responseMap = parseResponseToMap(response);
String connectorId = (String) responseMap.get("connector_id");
response = registerRemoteModelWithTTL("openAI-GPT-3.5 completions", connectorId, 1);
response = registerRemoteModelWithTTLAndSkipHeapMemCheck("openAI-GPT-3.5 completions", connectorId, 1);
responseMap = parseResponseToMap(response);
String modelId = (String) responseMap.get("model_id");
String predictInput = "{\n" + " \"parameters\": {\n" + " \"prompt\": \"Say this is a test\"\n" + " }\n" + "}";
Expand Down Expand Up @@ -814,11 +814,13 @@ public static Response registerRemoteModel(String name, String connectorId) thro
.makeRequest(client(), "POST", "/_plugins/_ml/models/_register", null, TestHelper.toHttpEntity(registerModelEntity), null);
}

public static Response registerRemoteModelWithTTL(String name, String connectorId, int ttl) throws IOException {
public static Response registerRemoteModelWithTTLAndSkipHeapMemCheck(String name, String connectorId, int ttl) throws IOException {
String registerModelGroupEntity = "{\n"
+ " \"name\": \"remote_model_group\",\n"
+ " \"description\": \"This is an example description\"\n"
+ "}";
String updateJVMHeapThreshold = "{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":0}}";
TestHelper.makeRequest(client(), "PUT", "/_cluster/settings", null, TestHelper.toHttpEntity(updateJVMHeapThreshold), null);
Response response = TestHelper
.makeRequest(
client(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.MLTaskType;
import org.opensearch.ml.common.exception.MLLimitExceededException;
import org.opensearch.ml.common.transport.MLTaskRequest;
import org.opensearch.ml.stats.MLNodeLevelStat;
import org.opensearch.ml.stats.MLStat;
Expand Down Expand Up @@ -139,8 +138,8 @@ public void testRun_CircuitBreakerOpen() {
TransportService transportService = mock(TransportService.class);
ActionListener listener = mock(ActionListener.class);
MLTaskRequest request = new MLTaskRequest(false);
expectThrows(MLLimitExceededException.class, () -> mlTaskRunner.run(FunctionName.REMOTE, request, transportService, listener));
mlTaskRunner.run(FunctionName.REMOTE, request, transportService, listener);
Long value = (Long) mlStats.getStat(MLNodeLevelStat.ML_CIRCUIT_BREAKER_TRIGGER_COUNT).getValue();
assertEquals(1L, value.longValue());
assertEquals(0L, value.longValue());
}
}

0 comments on commit d0791a8

Please sign in to comment.