Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing for decoupled model use case #7246

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions qa/L0_backend_python/decoupled/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ mkdir -p models/dlpack_add_sub/1/
cp ../../python_models/dlpack_add_sub/model.py models/dlpack_add_sub/1/
cp ../../python_models/dlpack_add_sub/config.pbtxt models/dlpack_add_sub/

mkdir -p models/repeat_int32/1/
cp ../../python_models/repeat_thread/model.py models/repeat_int32/1/
cp python_backend/examples/decoupled/repeat_config.pbtxt models/repeat_int32/config.pbtxt

function verify_log_counts () {
if [ `grep -c "Specific Msg!" $SERVER_LOG` -lt 1 ]; then
echo -e "\n***\n*** Test Failed: Specific Msg Count Incorrect\n***"
Expand Down Expand Up @@ -113,6 +117,18 @@ if [ $? -ne 0 ]; then
fi
set -e

# Run the repeat_client.py to test the repeat_thread model. When server is shutdown,
# the repeat_thread model should not be hanging.
set +e
python3 python_backend/examples/decoupled/repeat_client.py >> $CLIENT_LOG 2>&1
grep "PASS: repeat_int32" $CLIENT_LOG
if [ $? -ne 0 ]; then
echo -e "\n***\n*** Failed to verify repeat_thread model. \n***"
cat $CLIENT_LOG
RET=1
fi
set -e

kill_server

verify_log_counts
Expand Down
24 changes: 12 additions & 12 deletions qa/L0_backend_python/examples/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ git clone ${TRITON_REPO_ORGANIZATION}/python_backend -b $PYTHON_BACKEND_REPO_TAG
cd python_backend

# Example 1
CLIENT_LOG="./examples_add_sub_client.log"
CLIENT_LOG="../examples_add_sub_client.log"
mkdir -p models/add_sub/1/
cp examples/add_sub/model.py models/add_sub/1/model.py
cp examples/add_sub/config.pbtxt models/add_sub/config.pbtxt
Expand Down Expand Up @@ -85,7 +85,7 @@ set -e
kill_server

# Example 2
CLIENT_LOG="./examples_pytorch_client.log"
CLIENT_LOG="../examples_pytorch_client.log"
mkdir -p models/pytorch/1/
cp examples/pytorch/model.py models/pytorch/1/model.py
cp examples/pytorch/config.pbtxt models/pytorch/config.pbtxt
Expand Down Expand Up @@ -118,7 +118,7 @@ kill_server
# JAX AddSub
# JAX is not supported on Jetson
if [ "$TEST_JETSON" == "0" ]; then
CLIENT_LOG="./examples_jax_client.log"
CLIENT_LOG="../examples_jax_client.log"
mkdir -p models/jax/1/
cp examples/jax/model.py models/jax/1/model.py
cp examples/jax/config.pbtxt models/jax/config.pbtxt
Expand Down Expand Up @@ -150,7 +150,7 @@ fi
# Example 4

# BLS Sync
CLIENT_LOG="./examples_sync_client.log"
CLIENT_LOG="../examples_sync_client.log"
mkdir -p models/bls_sync/1
cp examples/bls/sync_model.py models/bls_sync/1/model.py
cp examples/bls/sync_config.pbtxt models/bls_sync/config.pbtxt
Expand Down Expand Up @@ -181,7 +181,7 @@ kill_server
# Example 5

# Decoupled Repeat
CLIENT_LOG="./examples_repeat_client.log"
CLIENT_LOG="../examples_repeat_client.log"
mkdir -p models/repeat_int32/1/
cp examples/decoupled/repeat_model.py models/repeat_int32/1/model.py
cp examples/decoupled/repeat_config.pbtxt models/repeat_int32/config.pbtxt
Expand Down Expand Up @@ -212,7 +212,7 @@ kill_server
# Example 6

# Decoupled Square
CLIENT_LOG="./examples_square_client.log"
CLIENT_LOG="../examples_square_client.log"
mkdir -p models/square_int32/1/
cp examples/decoupled/square_model.py models/square_int32/1/model.py
cp examples/decoupled/square_config.pbtxt models/square_int32/config.pbtxt
Expand Down Expand Up @@ -247,7 +247,7 @@ kill_server
# Having multiple python versions lead to build issues.
# Anaconda is not officially supported on Jetson.
if [ "$TEST_JETSON" == "0" ]; then
CLIENT_LOG="./examples_async_client.log"
CLIENT_LOG="../examples_async_client.log"
mkdir -p models/bls_async/1
cp examples/bls/async_model.py models/bls_async/1/model.py
cp examples/bls/async_config.pbtxt models/bls_async/config.pbtxt
Expand Down Expand Up @@ -278,7 +278,7 @@ if [ "$TEST_JETSON" == "0" ]; then
fi

# Auto Complete Model Configuration Example
CLIENT_LOG="./examples_auto_complete_client.log"
CLIENT_LOG="../examples_auto_complete_client.log"
mkdir -p models/nobatch_auto_complete/1/
mkdir -p models/batch_auto_complete/1/
cp examples/auto_complete/nobatch_model.py models/nobatch_auto_complete/1/model.py
Expand Down Expand Up @@ -311,7 +311,7 @@ set -e
kill_server

# BLS Decoupled Sync
CLIENT_LOG="./examples_bls_decoupled_sync_client.log"
CLIENT_LOG="../examples_bls_decoupled_sync_client.log"
mkdir -p models/bls_decoupled_sync/1
cp examples/bls_decoupled/sync_model.py models/bls_decoupled_sync/1/model.py
cp examples/bls_decoupled/sync_config.pbtxt models/bls_decoupled_sync/config.pbtxt
Expand Down Expand Up @@ -341,7 +341,7 @@ kill_server

# BLS Decoupled Async
if [ "$TEST_JETSON" == "0" ]; then
CLIENT_LOG="./examples_bls_decoupled_async_client.log"
CLIENT_LOG="../examples_bls_decoupled_async_client.log"
mkdir -p models/bls_decoupled_async/1
cp examples/bls_decoupled/async_model.py models/bls_decoupled_async/1/model.py
cp examples/bls_decoupled/async_config.pbtxt models/bls_decoupled_async/config.pbtxt
Expand Down Expand Up @@ -374,7 +374,7 @@ fi
# Example 7

# Model Instance Kind
CLIENT_LOG="./examples_model_instance_kind.log"
CLIENT_LOG="../examples_model_instance_kind.log"
mkdir -p models/resnet50/1
cp examples/instance_kind/model.py models/resnet50/1/
cp examples/instance_kind/config.pbtxt models/resnet50/
Expand Down Expand Up @@ -403,7 +403,7 @@ set -e
kill_server

# Custom Metrics
CLIENT_LOG="./examples_custom_metrics_client.log"
CLIENT_LOG="../examples_custom_metrics_client.log"
mkdir -p models/custom_metrics/1
cp examples/custom_metrics/model.py models/custom_metrics/1/model.py
cp examples/custom_metrics/config.pbtxt models/custom_metrics/config.pbtxt
Expand Down
2 changes: 2 additions & 0 deletions qa/L0_backend_python/setup_python_enviroment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ if [ ${PYTHON_ENV_VERSION} = "11" ]; then
# tensorflow needs to be installed before numpy so pip does not mess up conda
# environment
pip install tensorflow==2.12.0
# six is needed for tensorflow
pip install six
conda install -c conda-forge libstdcxx-ng=12 -y
conda install numpy=1.23.5 -y
EXPECTED_VERSION_STRING="Python version is 3.11, NumPy version is 1.23.5, and Tensorflow version is 2.12.0"
Expand Down
159 changes: 159 additions & 0 deletions qa/python_models/repeat_thread/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import queue
import time
from threading import Thread

import numpy
import triton_python_backend_utils as pb_utils


class WorkItem:
def __init__(self, response_sender, in_input, delay_input):
self.response_sender = response_sender
self.in_input = in_input
self.delay_input = delay_input


class TritonPythonModel:
"""This model launches a separate thread to handle the request from a queue. The thread is launched from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the number of characters in each line below 80.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dyastremsky do you know why CodeQL didn't catch this?

the `initialize` function and is terminated in the `finalize` function. This is different from the repeat
example in the Python Backend repository where a thread is launched per request and terminated after the response
is sent.
"""

def initialize(self, args):
self.work_queue = queue.Queue()
self.running = True

self.model_config = model_config = json.loads(args["model_config"])

using_decoupled = pb_utils.using_decoupled_model_transaction_policy(
model_config
)
if not using_decoupled:
raise pb_utils.TritonModelException(
"""the model `{}` can generate any number of responses per request,
enable decoupled transaction policy in model configuration to
serve this model""".format(
args["model_name"]
)
)

# Get OUT configuration
out_config = pb_utils.get_output_config_by_name(model_config, "OUT")

# Get IDX configuration
idx_config = pb_utils.get_output_config_by_name(model_config, "IDX")

# Convert Triton types to numpy types
self.out_dtype = pb_utils.triton_string_to_numpy(out_config["data_type"])
self.idx_dtype = pb_utils.triton_string_to_numpy(idx_config["data_type"])

self.sender_thread = Thread(target=self.sender_loop)
self.sender_thread.daemon = True
self.sender_thread.start()

def sender_loop(self):
while self.running:
# Grab work from queue
work_item = self.work_queue.get()
if work_item.response_sender is None:
pb_utils.log(
pb_utils.LogLevel.INFO,
"Sender thread received dummy work item. Exiting...",
)
self.work_queue.task_done()
break

response_sender = work_item.response_sender
in_input = work_item.in_input
delay_input = work_item.delay_input

idx_dtype = self.idx_dtype
out_dtype = self.out_dtype

for idx in range(in_input.size):
in_value = in_input[idx]
delay_value = delay_input[idx]

time.sleep(delay_value / 1000)

idx_output = pb_utils.Tensor("IDX", numpy.array([idx], idx_dtype))
out_output = pb_utils.Tensor("OUT", numpy.array([in_value], out_dtype))
response = pb_utils.InferenceResponse(
output_tensors=[idx_output, out_output]
)
response_sender.send(response)

response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
self.work_queue.task_done()

def execute(self, requests):
# This model does not support batching, so 'request_count' should always
# be 1.
if len(requests) != 1:
raise pb_utils.TritonModelException(
"unsupported batch size " + len(requests)
)

in_input = pb_utils.get_input_tensor_by_name(requests[0], "IN").as_numpy()
delay_input = pb_utils.get_input_tensor_by_name(requests[0], "DELAY").as_numpy()
if in_input.shape != delay_input.shape:
raise pb_utils.TritonModelException(
f"expected IN and DELAY shape to match, got {list(in_input.shape)} and {list(delay_input.shape)}."
)

# Put work item in queue to be processed by the sender thread
self.work_queue.put(
WorkItem(requests[0].get_response_sender(), in_input, delay_input)
)

wait_input = pb_utils.get_input_tensor_by_name(requests[0], "WAIT").as_numpy()
time.sleep(wait_input[0] / 1000)

return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if you mean remove return None? I thought we'd need to return None from execute if it's in decoupled mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not returning anything is similar to returning None.


def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is OPTIONAL. This function allows
the model to perform any necessary clean ups before exit.
Here we will wait for all response threads to complete sending
responses.
"""
pb_utils.log(pb_utils.LogLevel.INFO, "Finalizing model...")

# Pass dummy work item to the queue to terminate the sender_thread
self.work_queue.put(
WorkItem(
None, numpy.array([0], numpy.int32), numpy.array([0], numpy.uint32)
)
)

self.running = False
self.sender_thread.join()
Loading