Skip to content

Commit

Permalink
add tests for pyfunc predict and serving
Browse files Browse the repository at this point in the history
Signed-off-by: Serena Ruan <serena.rxy@gmail.com>
  • Loading branch information
serena-ruan committed Oct 27, 2023
1 parent cf17437 commit 2bb6e63
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 5 deletions.
8 changes: 4 additions & 4 deletions mlflow/models/utils.py
Expand Up @@ -758,10 +758,10 @@ def _is_scalar(x):
):
pf_input = pd.DataFrame([pf_input])
elif isinstance(pf_input, dict) and all(
isinstance(value, np.ndarray)
and value.dtype.type == np.str_
and value.size == 1
and value.shape == ()
isinstance(value, np.ndarray) and value.dtype.type == np.str_
# size & shape constraint makes some data batch inference result not
# consistent with serving result.
and value.size == 1 and value.shape == ()
for value in pf_input.values()
):
# This check is specifically to handle the serving structural cast for
Expand Down
8 changes: 7 additions & 1 deletion mlflow/utils/proto_json_utils.py
Expand Up @@ -3,6 +3,7 @@
import json
import os
from collections import defaultdict
from copy import deepcopy
from functools import partial
from json import JSONEncoder
from typing import Any, Dict, Optional
Expand Down Expand Up @@ -365,10 +366,15 @@ def parse_tf_serving_input(inp_dict, schema=None):
import numpy as np

def cast_schema_type(input_data):
input_data = deepcopy(input_data)
if schema is not None:
if schema.has_input_names():
input_names = schema.input_names()
if len(input_names) == 1 and isinstance(input_data, list):
if (
len(input_names) == 1
and isinstance(input_data, list)
and not any(isinstance(x, dict) for x in input_data)
):
# for schemas with a single column, match input with column
input_data = {input_names[0]: input_data}
if not isinstance(input_data, dict):
Expand Down
173 changes: 173 additions & 0 deletions tests/pyfunc/test_pyfunc_schema_enforcement.py
Expand Up @@ -1920,3 +1920,176 @@ def predict(self, context, model_input, params=None):
assert response.status_code == 200, response.content
result = json.loads(response.content.decode("utf-8"))["predictions"]
assert result == ["input1"]


@pytest.mark.parametrize(
("data", "schema"),
[
({"a": np.array([1, 2, 3])}, Schema([ColSpec(DataType.long, name="a")])),
({"query": "sentence"}, Schema([ColSpec(DataType.string, name="query")])),
(
{"query": ["sentence_1", "sentence_2"]},
Schema([ColSpec(DataType.string, name="query")]),
),
(
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
Schema(
[
ColSpec(DataType.string, name="query"),
ColSpec(DataType.string, name="table"),
]
),
),
(
[{"query": "sentence"}, {"query": "sentence"}],
Schema([ColSpec(DataType.string, name="query")]),
),
(
[
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
],
Schema(
[
ColSpec(DataType.string, name="query"),
ColSpec(DataType.string, name="table"),
]
),
),
],
)
def test_pyfunc_model_schema_enforcement_with_dicts_and_lists(data, schema):
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
return model_input

signature = ModelSignature(schema)
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=MyModel(),
artifact_path="test_model",
signature=signature,
)
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)
prediction = loaded_model.predict(data)
if isinstance(data, dict) and all(
isinstance(x, str) or (isinstance(x, list) and all(isinstance(y, str) for y in x))
for x in data.values()
):
df = pd.DataFrame([data])
else:
df = pd.DataFrame(data)
pd.testing.assert_frame_equal(prediction, df)

# Test pandas DataFrame input
prediction = loaded_model.predict(df)
pd.testing.assert_frame_equal(prediction, df)


@pytest.mark.parametrize(
("data", "schema"),
[
({"query": "sentence"}, Schema([ColSpec(DataType.string, name="query")])),
(
{"query": ["sentence_1", "sentence_2"]},
Schema([ColSpec(DataType.string, name="query")]),
),
(
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
Schema(
[
ColSpec(DataType.string, name="query"),
ColSpec(DataType.string, name="table"),
]
),
),
],
)
# `instances` is an invalid key for schema with Mlflow < 2.9.0
@pytest.mark.parametrize("format_key", ["inputs", "dataframe_split", "dataframe_records"])
def test_pyfunc_model_serving_with_dicts(data, schema, format_key):
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
return model_input

signature = ModelSignature(schema)
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=MyModel(),
artifact_path="test_model",
signature=signature,
)

df = (
pd.DataFrame([data])
if all(isinstance(x, str) for x in data.values())
else pd.DataFrame(data)
)
if format_key == "inputs":
payload = {format_key: data}
elif format_key in ("dataframe_split", "dataframe_records"):
payload = {format_key: df.to_dict(orient=format_key[10:])}

response = pyfunc_serve_and_score_model(
model_info.model_uri,
data=json.dumps(payload),
content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
extra_args=["--env-manager", "local"],
)
assert response.status_code == 200, response.content
result = json.loads(response.content.decode("utf-8"))["predictions"]
# This is not consistent with batch inference df
pd.testing.assert_frame_equal(pd.DataFrame(result), df)


@pytest.mark.parametrize(
("data", "schema"),
[
(
[{"query": "sentence"}, {"query": "sentence"}],
Schema([ColSpec(DataType.string, name="query")]),
),
(
[
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
{"query": ["sentence_1", "sentence_2"], "table": "some_table"},
],
Schema(
[
ColSpec(DataType.string, name="query"),
ColSpec(DataType.string, name="table"),
]
),
),
],
)
# `inputs`` is an invalid key for schema with Mlflow < 2.9.0
@pytest.mark.parametrize("format_key", ["instances", "dataframe_split", "dataframe_records"])
def test_pyfunc_model_serving_with_lists_of_dicts(data, schema, format_key):
class MyModel(mlflow.pyfunc.PythonModel):
def predict(self, context, model_input, params=None):
return model_input

signature = ModelSignature(schema)
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
python_model=MyModel(),
artifact_path="test_model",
signature=signature,
)

df = pd.DataFrame(data)
if format_key == "instances":
payload = {format_key: data}
elif format_key in ("dataframe_split", "dataframe_records"):
payload = {format_key: df.to_dict(orient=format_key[10:])}

response = pyfunc_serve_and_score_model(
model_info.model_uri,
data=json.dumps(payload),
content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
extra_args=["--env-manager", "local"],
)
assert response.status_code == 200, response.content
result = json.loads(response.content.decode("utf-8"))["predictions"]
pd.testing.assert_frame_equal(pd.DataFrame(result), df)

0 comments on commit 2bb6e63

Please sign in to comment.