Skip to content

Commit

Permalink
Changes from review feedback:
Browse files Browse the repository at this point in the history
    1. Modified `server live`, `server health`, `model ready` check method.
    2. Added tests `server live`, `server health`, `model ready` for grpc
    3. Added tests `server live`, `server health`, for http

Signed-off-by: Andrews Arokiam <andrews.arokiam@ideas2it.com>
  • Loading branch information
andyi2it committed Jan 10, 2024
1 parent 212131d commit 572ec36
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.pytorch.serve.util.messages.WorkerCommands;
import org.pytorch.serve.wlm.Model;
import org.pytorch.serve.wlm.ModelManager;
import org.pytorch.serve.wlm.WorkerState;
import org.pytorch.serve.wlm.WorkerThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,7 +63,7 @@ public void serverLive(ServerLiveRequest request, StreamObserver<ServerLiveRespo
});

ServerLiveResponse readyResponse = ServerLiveResponse.newBuilder()
.setLive(ApiUtils.getTsWorkerStatus())
.setLive(true)
.build();
responseObserver.onNext(readyResponse);
responseObserver.onCompleted();
Expand All @@ -80,7 +82,7 @@ public void serverReady(ServerReadyRequest request, StreamObserver<ServerReadyRe
});

ServerReadyResponse readyResponse = ServerReadyResponse.newBuilder()
.setReady(ApiUtils.getTsWorkerStatus())
.setReady(true)
.build();
responseObserver.onNext(readyResponse);
responseObserver.onCompleted();
Expand Down Expand Up @@ -110,6 +112,7 @@ public void modelReady(ModelReadyRequest request, StreamObserver<ModelReadyRespo
String modelName = request.getName();
String modelVersion = request.getVersion();
ModelManager modelManager = ModelManager.getInstance();
boolean isModelReady = false;
if (modelName == null || "".equals(modelName)) {
BadRequestException e = new BadRequestException("Parameter name is required.");
sendErrorResponse(responseObserver, Status.INTERNAL, e, "BadRequestException.()");
Expand All @@ -124,8 +127,19 @@ public void modelReady(ModelReadyRequest request, StreamObserver<ModelReadyRespo
if (model == null) {
throw new ModelNotFoundException("Model not found: " + modelName);
}

// int numScaled = model.getMinWorkers();
// int numHealthy = modelManager.getNumHealthyWorkers(model.getModelVersionName());
// isModelReady = numHealthy >= numScaled;

List<WorkerThread> workers = modelManager.getWorkers(model.getModelVersionName());
for (WorkerThread worker : workers) {
isModelReady = worker.isRunning() && worker.getState() == WorkerState.WORKER_MODEL_LOADED;

}

ModelReadyResponse modelReadyResponse = ModelReadyResponse.newBuilder()
.setReady(true)
.setReady(isModelReady)
.build();
responseObserver.onNext(modelReadyResponse);
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public void handleRequest(
if (concatenatedSegments.equals(SERVER_READY_API)) {
// for serve ready check
JsonObject response = new JsonObject();
response.addProperty("ready", ApiUtils.getTsWorkerStatus());
response.addProperty("ready", true);
NettyUtils.sendJsonResponse(ctx, response);
} else if (concatenatedSegments.equals(SERVER_LIVE_API)) {
// for serve live check
JsonObject response = new JsonObject();
response.addProperty("live", ApiUtils.getTsWorkerStatus());
response.addProperty("live", true);
NettyUtils.sendJsonResponse(ctx, response);
} else if (concatenatedSegments.equals(SERVER_METADATA_API)) {
// For fetch server metadata
Expand Down
15 changes: 0 additions & 15 deletions frontend/server/src/main/java/org/pytorch/serve/util/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,19 +460,4 @@ public static String getDescribeErrorResponseMessage(String modelName) {
return responseMessage;
}

public static boolean getTsWorkerStatus() {
boolean isTsWorkerStarted = false;
ModelManager modelManager = ModelManager.getInstance();
Map<Integer, WorkerThread> workersMap = modelManager.getWorkers();

List<WorkerThread> workers = new ArrayList<>(workersMap.values());

for (WorkerThread worker : workers) {
if (worker.getState() == WorkerState.WORKER_MODEL_LOADED) {
isTsWorkerStarted = true;
}
}
return isTsWorkerStarted;
}

}
74 changes: 68 additions & 6 deletions kubernetes/kserve/tests/scripts/test_mnist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ function make_cluster_accessible() {
SERVICE_HOSTNAME=$(kubectl get inferenceservice ${SERVICE_NAME} -o jsonpath='{.status.url}' | cut -d "/" -f 3)
wait_for_port_forwarding 5
echo "Make inference request"
PREDICTION=$(curl -H "Content-Type: application/json" -H "Host: ${SERVICE_HOSTNAME}" ${URL} -d @"$3")
if [ -z "$3" ]; then # for empty input data (http get method call)
PREDICTION=$(curl -H "Content-Type: application/json" -H "Host: ${SERVICE_HOSTNAME}" ${URL})
else
PREDICTION=$(curl -H "Content-Type: application/json" -H "Host: ${SERVICE_HOSTNAME}" ${URL} -d @"$3")
fi

PREDICTION=$(echo -n "$PREDICTION" | tr -d '\n[:space:]')
EXPECTED="$4"
if [ "${PREDICTION}" = "${EXPECTED}" ]; then
echo "✓ SUCCESS"
kubectl delete inferenceservice ${SERVICE_NAME}
else
echo "✘ Test failed: Prediction: ${PREDICTION}, expected ${EXPECTED}."
delete_minikube_cluster
Expand All @@ -49,20 +53,28 @@ function make_cluster_accessible() {
}

function make_cluster_accessible_for_grpc() {
PATTERN='^{.*}$' # Regex pattern to match the input value like json ex: '{"name": "mnist"}'
PROTO_FILE_PATH="./frontend/server/src/main/resources/proto/open_inference_grpc.proto"
SERVICE_NAME="$1"
GRPC_METHOD="$2"
INPUT="$3"
wait_for_inference_service 300 5 "$1"
SERVICE_HOSTNAME=$(kubectl get inferenceservice ${SERVICE_NAME} -o jsonpath='{.status.url}' | cut -d "/" -f 3)
wait_for_port_forwarding 5
echo "Make inference request"

PREDICTION=$(grpcurl -plaintext -d @ -proto ${PROTO_FILE_PATH} -authority ${SERVICE_HOSTNAME} ${INGRESS_HOST}:${INGRESS_PORT} ${GRPC_METHOD} < "$3")
if [ -z "$INPUT" ]; then # for empty input data
PREDICTION=$(grpcurl -plaintext -proto ${PROTO_FILE_PATH} -authority ${SERVICE_HOSTNAME} ${INGRESS_HOST}:${INGRESS_PORT} ${GRPC_METHOD})
elif echo "$INPUT" | grep -qE "$PATTERN"; then # for pass input data with command Ex: '{"name": "mnist"}'
PREDICTION=$(grpcurl -plaintext -d "${INPUT}" -proto ${PROTO_FILE_PATH} -authority ${SERVICE_HOSTNAME} ${INGRESS_HOST}:${INGRESS_PORT} ${GRPC_METHOD})
else # for read input data from file
PREDICTION=$(grpcurl -plaintext -d @ -proto ${PROTO_FILE_PATH} -authority ${SERVICE_HOSTNAME} ${INGRESS_HOST}:${INGRESS_PORT} ${GRPC_METHOD} < "${INPUT}")
fi

PREDICTION=$(echo -n "$PREDICTION" | tr -d '\n[:space:]')
EXPECTED="$4"
if [ "${PREDICTION}" = "${EXPECTED}" ]; then
echo "✓ SUCCESS"
kubectl delete inferenceservice ${SERVICE_NAME}
else
echo "✘ Test failed: Prediction: ${PREDICTION}, expected ${EXPECTED}."
delete_minikube_cluster
Expand Down Expand Up @@ -156,22 +168,72 @@ echo "MNIST KServe V2 test begin"
deploy_cluster "kubernetes/kserve/tests/configs/mnist_v2_cpu.yaml" "torchserve-mnist-v2-predictor"
URL="http://${INGRESS_HOST}:${INGRESS_PORT}/v2/models/${MODEL_NAME}/infer"
make_cluster_accessible "torchserve-mnist-v2" ${URL} "./kubernetes/kserve/kf_request_json/v2/mnist/mnist_v2_tensor.json" '{"model_name":"mnist","model_version":null,"id":"d3b15cad-50a2-4eaf-80ce-8b0a428bd298","parameters":null,"outputs":[{"name":"input-0","shape":[1],"datatype":"INT64","parameters":null,"data":[1]}]}'
kubectl delete inferenceservice torchserve-mnist-v2

echo "MNIST KServe V1 test begin"
deploy_cluster "kubernetes/kserve/tests/configs/mnist_v1_cpu.yaml" "torchserve-predictor"
URL="http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/${MODEL_NAME}:predict"
make_cluster_accessible "torchserve" ${URL} "./kubernetes/kserve/kf_request_json/v1/mnist.json" '{"predictions":[2]}'
kubectl delete inferenceservice torchserve

# OIP HTTP method calls
echo "MNIST Torchserve Open Inference Protocol HTTP"
SERVICE_NAME="torchserve-mnist-v2-http"
deploy_cluster "kubernetes/kserve/tests/configs/mnist_oip_http.yaml" "torchserve-mnist-v2-http-predictor"

# ModelInfer
echo "HTTP ModelInfer method call"
URL="http://${INGRESS_HOST}:${INGRESS_PORT}/v2/models/${MODEL_NAME}/infer"
EXPECTED_OUTPUT='{"id":"d3b15cad-50a2-4eaf-80ce-8b0a428bd298","model_name":"mnist","model_version":"1.0","outputs":[{"name":"input-0","datatype":"INT64","data":[1],"shape":[1]}]}'
make_cluster_accessible "torchserve-mnist-v2-http" ${URL} "./kubernetes/kserve/kf_request_json/v2/mnist/mnist_v2_tensor.json" ${EXPECTED_OUTPUT}
make_cluster_accessible ${SERVICE_NAME} ${URL} "./kubernetes/kserve/kf_request_json/v2/mnist/mnist_v2_tensor.json" ${EXPECTED_OUTPUT}

# ServerReady
echo "HTTP ServerReady method call"
URL="http://${INGRESS_HOST}:${INGRESS_PORT}/v2/health/ready"
EXPECTED_OUTPUT='{"ready":true}'
make_cluster_accessible ${SERVICE_NAME} ${URL} "" ${EXPECTED_OUTPUT}

# ServerLive
echo "HTTP ServerLive method call"
URL="http://${INGRESS_HOST}:${INGRESS_PORT}/v2/health/live"
EXPECTED_OUTPUT='{"live":true}'
make_cluster_accessible ${SERVICE_NAME} ${URL} "" ${EXPECTED_OUTPUT}

# delete oip http isvc
kubectl delete inferenceservice ${SERVICE_NAME}

# OIP GRPC method calls
echo "MNIST Torchserve Open Inference Protocol GRPC"
SERVICE_NAME="torchserve-mnist-v2-grpc"
deploy_cluster "kubernetes/kserve/tests/configs/mnist_oip_grpc.yaml" "torchserve-mnist-v2-grpc-predictor"

# ModelInfer
echo "GRPC ModelInfer method call"
GRPC_METHOD="org.pytorch.serve.grpc.openinference.GRPCInferenceService.ModelInfer"
EXPECTED_OUTPUT='{"modelName":"mnist","modelVersion":"1.0","id":"d3b15cad-50a2-4eaf-80ce-8b0a428bd298","outputs":[{"name":"input-0","datatype":"INT64","shape":["1"],"contents":{"int64Contents":["1"]}}]}'
make_cluster_accessible_for_grpc "torchserve-mnist-v2-grpc" ${GRPC_METHOD} "./kubernetes/kserve/kf_request_json/v2/mnist/mnist_v2_tensor_grpc.json" ${EXPECTED_OUTPUT}
INPUT="./kubernetes/kserve/kf_request_json/v2/mnist/mnist_v2_tensor_grpc.json"
make_cluster_accessible_for_grpc "${SERVICE_NAME}" "${GRPC_METHOD}" "${INPUT}" "${EXPECTED_OUTPUT}"

# ServerReady
echo "GRPC ServerReady method call"
GRPC_METHOD="org.pytorch.serve.grpc.openinference.GRPCInferenceService.ServerReady"
EXPECTED_OUTPUT='{"ready":true}'
make_cluster_accessible_for_grpc "${SERVICE_NAME}" "${GRPC_METHOD}" "" "${EXPECTED_OUTPUT}"

# ServerLive
echo "GRPC ServerLive method call"
GRPC_METHOD="org.pytorch.serve.grpc.openinference.GRPCInferenceService.ServerLive"
EXPECTED_OUTPUT='{"live":true}'
make_cluster_accessible_for_grpc "${SERVICE_NAME}" "${GRPC_METHOD}" "" "${EXPECTED_OUTPUT}"

# ModelReady
echo "GRPC ModelReady method call"
GRPC_METHOD="org.pytorch.serve.grpc.openinference.GRPCInferenceService.ModelReady"
EXPECTED_OUTPUT='{"ready":true}'
INPUT='{"name": "mnist"}'
make_cluster_accessible_for_grpc "$SERVICE_NAME" "$GRPC_METHOD" "$INPUT" "$EXPECTED_OUTPUT"

# delete oip grpc isvc
kubectl delete inferenceservice ${SERVICE_NAME}

delete_minikube_cluster

0 comments on commit 572ec36

Please sign in to comment.