Skip to content

Commit

Permalink
Fix regression in schema enforcement (#8326)
Browse files Browse the repository at this point in the history
* Fix schema enforcement regression

Signed-off-by: Ben Wilson <benjamin.wilson@databricks.com>

* WIP - tests will fail

Signed-off-by: Ben Wilson <benjamin.wilson@databricks.com>

* add in specific logic for transformers schema enforcement

Signed-off-by: Ben Wilson <benjamin.wilson@databricks.com>

---------

Signed-off-by: Ben Wilson <benjamin.wilson@databricks.com>
  • Loading branch information
BenWilson2 committed Apr 26, 2023
1 parent ef7b6ed commit cef03da
Show file tree
Hide file tree
Showing 7 changed files with 1,084 additions and 653 deletions.
31 changes: 26 additions & 5 deletions mlflow/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,17 +612,38 @@ def _enforce_schema(pf_input: PyFuncInput, input_schema: Schema):
For tensor-based signatures, we make sure the shape and type of the input matches the shape
and type specified in model's input schema.
"""

def _is_scalar(x):
return np.isscalar(x) or x is None

if isinstance(pf_input, pd.Series):
pf_input = pd.DataFrame(pf_input)
if not input_schema.is_tensor_spec():
if isinstance(pf_input, (list, np.ndarray, dict, pd.Series, str)):
try:
if isinstance(pf_input, dict) and all(
not isinstance(value, (dict, list)) for value in pf_input.values()
if isinstance(pf_input, str):
pf_input = pd.DataFrame([pf_input])
elif isinstance(pf_input, dict) and all(
_is_scalar(value) for value in pf_input.values()
):
pf_input = pd.DataFrame([pf_input])
elif isinstance(pf_input, dict) and all(
isinstance(value, np.ndarray)
and value.dtype.type == np.str_
and value.size == 1
and value.shape == ()
for value in pf_input.values()
):
pf_input = pd.DataFrame(pf_input, index=[0])
elif isinstance(pf_input, str):
pf_input = pd.DataFrame({"inputs": pf_input}, index=[0])
# This check is specifically to handle the serving structural cast for
# certain inputs for the transformers implementation. Due to the fact that
# specific Pipeline types in transformers support passing input data
# of the form Dict[str, str] in which the value is a scalar string, model
# serving will cast this entry as a numpy array with shape () and size 1.
# This is seen as a scalar input when attempting to create a Pandas DataFrame
# from such a numpy structure and requires the array to be encapsulated in a
# list in order to prevent a ValueError exception for requiring an index
# if passing in all scalar values thrown by Pandas.
pf_input = pd.DataFrame([pf_input])
else:
pf_input = pd.DataFrame(pf_input)
except Exception as e:
Expand Down
76 changes: 75 additions & 1 deletion mlflow/transformers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import ast
import json
import logging
import pathlib
import pandas as pd
import numpy as np
import re
from typing import Union, List, Optional, Dict, Any, NamedTuple

Expand Down Expand Up @@ -1455,6 +1457,8 @@ def _predict(self, data):
# Optional stripping out of `\n` for specific generator pipelines.
collapse_whitespace = self.inference_config.pop("collapse_whitespace", False)

data = self._convert_cast_lists_from_np_back_to_list(data)

# Generate inference data with the pipeline object
if isinstance(self.pipeline, transformers.ConversationalPipeline):
conversation_output = self.pipeline(self._conversation)
Expand Down Expand Up @@ -1604,7 +1608,19 @@ def _coerce_exploded_dict_to_single_dict(self, data):
if value_type == str:
parsed[key] = [parsed[key], value]
elif value_type == list:
parsed[key] = parsed[key].append(value)
if all(len(entry) == 1 for entry in value):
# This conversion is required solely for model serving.
# In the parsing logic that occurs internally, strings that
# contain single quotes `'` result in casting to a List[char]
# instead of a str type. Attempting to append a List[char]
# to a List[str] as would happen in the `else` block here
# results in the entire List being overwritten as `None` without
# an Exception being raised. By checking for single value entries
# and subsequently converting to list and extracting the first
# element reconstructs the original input string.
parsed[key].append([str(value)][0])
else:
parsed[key] = parsed[key].append(value)
else:
parsed[key] = value
return parsed
Expand Down Expand Up @@ -1903,6 +1919,42 @@ def _parse_json_encoded_dict_payload_to_dict(data, key_to_unpack):
}
for entry in data
]
elif isinstance(data, dict):
# This is to handle serving use cases as the DataFrame encapsulation converts
# collections within rows to np.array type. In order to process this data through
# the transformers.Pipeline API, we need to cast these arrays back to lists
# and replace the single quotes with double quotes after extracting the
# json-encoded `table` (a pandas DF) in order to convert it to a dict that
# the TableQuestionAnsweringPipeline can accept and cast to a Pandas DataFrame.
#
# An example casting that occurs for this case when input to model serving is the
# conversion of a user input of:
# '{"inputs": {"query": "What is the longest distance?",
# "table": {"Distance": ["1000", "10", "1"]}}}'
# is converted to:
# [{'query': array('What is the longest distance?', dtype='<U29'),
# 'table': array('{\'Distance\': [\'1000\', \'10\', \'1\']}', dtype='U<204')}]
# which is an invalid input to the pipeline.
# this method converts the input to:
# {'query': 'What is the longest distance?',
# 'table': {'Distance': ['1000', '10', '1']}}
# which is a valid input to the TableQuestionAnsweringPipeline.
output = {}
for key, value in data.items():
if key == key_to_unpack:
if isinstance(value, np.ndarray):
output[key] = ast.literal_eval(value.item())
else:
output[key] = ast.literal_eval(value)
else:
if isinstance(value, np.ndarray):
# This cast to np.ndarray occurs when more than one question is asked.
output[key] = value.item()
else:
# Otherwise, the entry does not need casting from a np.ndarray type to
# list as it is already a scalar string.
output[key] = value
return output
else:
return {
key: (
Expand All @@ -1924,3 +1976,25 @@ def _validate_str_or_list_str(data):
"If supplying a list, all values must be of string type.",
error_code=INVALID_PARAMETER_VALUE,
)

@staticmethod
def _convert_cast_lists_from_np_back_to_list(data):
"""
This handles the casting of dicts within lists from Pandas DF conversion within model
serving back into the required Dict[str, List[str]] if this type matching occurs.
Otherwise, it's a noop.
"""
if not isinstance(data, list):
# NB: applying a short-circuit return here to not incur runtime overhead with
# type validation if the input is not a list
return data
elif not all(isinstance(value, dict) for value in data):
return data
else:
parsed_data = []
for entry in data:
if all(isinstance(value, np.ndarray) for value in entry.values()):
parsed_data.append({key: value.tolist() for key, value in entry.items()})
else:
parsed_data.append(entry)
return parsed_data
8 changes: 7 additions & 1 deletion mlflow/types/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,21 @@ def _is_spark_df(x) -> bool:
def _validate_input_dictionary_contains_only_strings_and_lists_of_strings(data) -> None:
invalid_keys = []
invalid_values = []
value_type = None
for key, value in data.items():
if not value_type:
value_type = type(value)
if isinstance(key, bool):
invalid_keys.append(key)
elif not isinstance(key, (str, int)):
invalid_keys.append(key)
if isinstance(value, list) and not all(isinstance(item, str) for item in value):
invalid_values.append(key)
elif not isinstance(value, (list, str)):
elif not isinstance(value, (np.ndarray, list, str)):
invalid_values.append(key)
elif isinstance(value, np.ndarray) or value_type == np.ndarray:
if not isinstance(value, value_type):
invalid_values.append(key)
if invalid_values:
raise MlflowException(
"Invalid values in dictionary. If passing a dictionary containing strings, all "
Expand Down
Loading

0 comments on commit cef03da

Please sign in to comment.