-
Notifications
You must be signed in to change notification settings - Fork 4k
/
utils.py
1378 lines (1211 loc) · 57.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime as dt
import decimal
import json
import logging
import os
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from mlflow.exceptions import INVALID_PARAMETER_VALUE, MlflowException
from mlflow.models import Model
from mlflow.store.artifact.utils.models import get_model_name_and_version
from mlflow.types import DataType, ParamSchema, ParamSpec, Schema, TensorSpec
from mlflow.types.schema import Array, Object, Property
from mlflow.types.utils import (
TensorsNotSupportedException,
_infer_param_schema,
clean_tensor_type,
)
from mlflow.utils.annotations import experimental
from mlflow.utils.proto_json_utils import (
NumpyEncoder,
dataframe_from_parsed_json,
parse_tf_serving_input,
)
from mlflow.utils.uri import get_databricks_profile_uri_from_artifact_uri
try:
from scipy.sparse import csc_matrix, csr_matrix
HAS_SCIPY = True
except ImportError:
HAS_SCIPY = False
try:
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import Row
from pyspark.sql.types import (
ArrayType,
BinaryType,
DateType,
FloatType,
IntegerType,
ShortType,
StructType,
TimestampType,
)
HAS_PYSPARK = True
except ImportError:
SparkDataFrame = None
HAS_PYSPARK = False
INPUT_EXAMPLE_PATH = "artifact_path"
EXAMPLE_DATA_KEY = "inputs"
EXAMPLE_PARAMS_KEY = "params"
EXAMPLE_FILENAME = "input_example.json"
ModelInputExample = Union[
pd.DataFrame, np.ndarray, dict, list, "csr_matrix", "csc_matrix", str, bytes, tuple
]
PyFuncInput = Union[
pd.DataFrame,
pd.Series,
np.ndarray,
"csc_matrix",
"csr_matrix",
List[Any],
Dict[str, Any],
dt.datetime,
bool,
bytes,
float,
int,
str,
]
PyFuncOutput = Union[pd.DataFrame, pd.Series, np.ndarray, list, str]
if HAS_PYSPARK:
PyFuncInput = Union[PyFuncInput, SparkDataFrame]
PyFuncOutput = Union[PyFuncOutput, SparkDataFrame]
_logger = logging.getLogger(__name__)
_FEATURE_STORE_FLAVOR = "databricks.feature_store.mlflow_model"
class _Example:
"""
Represents an input example for MLflow model.
Contains jsonable data that can be saved with the model and meta data about the exported format
that can be saved with :py:class:`Model <mlflow.models.Model>`.
The _Example is created from example data provided by user. The example(s) can be provided as
pandas.DataFrame, numpy.ndarray, python dictionary or python list. The assumption is that the
example contains jsonable elements (see storage format section below).
NOTE: If the example is 1 dimensional (e.g. dictionary of str -> scalar, or a list of scalars),
the assumption is that it is a single column of data.
Metadata:
The _Example metadata contains the following information:
- artifact_path: Relative path to the serialized example within the model directory.
- type: Type of example data provided by the user. E.g. dataframe, ndarray.
- One of the following metadata based on the `type`:
- pandas_orient: For dataframes, this attribute specifies how is the dataframe encoded
in json. For example, "split" value signals that the data is stored as
object with columns and data attributes.
- format: For tensors, this attribute specifies the standard being used to store an
input example. MLflow uses a JSON-formatted string representation of T
F serving input.
Storage Format:
The examples are stored as json for portability and readability. Therefore, the contents of the
example(s) must be jsonable. MLflow will make the following conversions automatically on behalf
of the user:
- binary values: :py:class:`bytes` or :py:class:`bytearray` are converted to base64
encoded strings.
- numpy types: Numpy types are converted to the corresponding python types or their closest
equivalent.
- csc/csr matrix: similar to 2 dims numpy array, csc/csr matrix are converted to
corresponding python types or their closest equivalent.
"""
def __init__(self, input_example: ModelInputExample):
def _is_scalar(x):
return np.isscalar(x) or x is None
def _validate_params(params):
try:
_infer_param_schema(params)
except MlflowException:
_logger.warning(f"Invalid params found in input example: {params}")
raise
def _is_ndarray(x):
return isinstance(x, np.ndarray) or (
isinstance(x, dict) and all(isinstance(ary, np.ndarray) for ary in x.values())
)
def _is_sparse_matrix(x):
if not HAS_SCIPY:
# we can safely assume that if no scipy is installed,
# the user won't log scipy sparse matrices
return False
return isinstance(x, (csc_matrix, csr_matrix))
def _handle_ndarray_nans(x: np.ndarray):
if np.issubdtype(x.dtype, np.number):
return np.where(np.isnan(x), None, x)
else:
return x
def _handle_ndarray_input(input_array: Union[np.ndarray, dict]):
if isinstance(input_array, dict):
result = {}
for name in input_array.keys():
result[name] = _handle_ndarray_nans(input_array[name]).tolist()
return {"inputs": result}
else:
return {"inputs": _handle_ndarray_nans(input_array).tolist()}
def _handle_sparse_matrix(x: Union["csr_matrix", "csc_matrix"]):
return {
"data": _handle_ndarray_nans(x.data).tolist(),
"indices": x.indices.tolist(),
"indptr": x.indptr.tolist(),
"shape": list(x.shape),
}
def _handle_dataframe_nans(df: pd.DataFrame):
return df.where(df.notnull(), None)
def _coerce_to_pandas_df(input_ex):
if isinstance(input_ex, dict):
# We need to be compatible with infer_schema's behavior, where
# it infers each value's type directly.
if all(
isinstance(x, str) or (isinstance(x, list) and all(_is_scalar(y) for y in x))
for x in input_ex.values()
):
# e.g.
# data = {"a": "a", "b": ["a", "b", "c"]}
# >>> pd.DataFrame([data])
# a b
# 0 a [a, b, c]
_logger.info(
"We convert input dictionaries to pandas DataFrames such that "
"each key represents a column, collectively constituting a "
"single row of data. If you would like to save data as "
"multiple rows, please convert your data to a pandas "
"DataFrame before passing to input_example."
)
input_ex = pd.DataFrame([input_ex])
elif np.isscalar(input_ex):
input_ex = pd.DataFrame([input_ex])
elif not isinstance(input_ex, pd.DataFrame):
try:
import pyspark.sql.dataframe
if isinstance(input_example, pyspark.sql.dataframe.DataFrame):
raise MlflowException(
"Examples can not be provided as Spark Dataframe. "
"Please make sure your example is of a small size and "
"turn it into a pandas DataFrame."
)
except ImportError:
pass
input_ex = None
return input_ex
def _handle_dataframe_input(df):
result = _handle_dataframe_nans(df).to_dict(orient="split")
# Do not include row index
del result["index"]
if all(df.columns == range(len(df.columns))):
# No need to write default column index out
del result["columns"]
return result
self.info = {
INPUT_EXAMPLE_PATH: EXAMPLE_FILENAME,
}
# Avoid changing the variable passed in
input_example = deepcopy(input_example)
if _contains_params(input_example):
input_example, self._inference_params = input_example
_validate_params(self._inference_params)
self.info[EXAMPLE_PARAMS_KEY] = "true"
else:
self._inference_params = None
if _is_ndarray(input_example):
self._inference_data = input_example
self.data = _handle_ndarray_input(input_example)
self.info.update(
{
"type": "ndarray",
"format": "tf-serving",
}
)
elif _is_sparse_matrix(input_example):
self._inference_data = input_example
self.data = _handle_sparse_matrix(input_example)
if isinstance(input_example, csc_matrix):
example_type = "sparse_matrix_csc"
else:
example_type = "sparse_matrix_csr"
self.info.update(
{
"type": example_type,
}
)
elif isinstance(input_example, list):
for i, x in enumerate(input_example):
if isinstance(x, np.ndarray) and len(x.shape) > 1:
raise TensorsNotSupportedException(f"Row '{i}' has shape {x.shape}")
if all(_is_scalar(x) for x in input_example):
# We should not convert data for langchain flavors
# List[scalar] is a typical langchain model input type
_logger.info(
"Lists of scalar values are not converted to a pandas DataFrame. "
"If you expect to use pandas DataFrames for inference, please "
"construct a DataFrame and pass it to input_example instead."
)
self._inference_data = input_example
self.data = {"inputs": self._inference_data}
self.info.update(
{
"type": "ndarray",
"format": "tf-serving",
}
)
else:
self._inference_data = pd.DataFrame(input_example)
self.data = _handle_dataframe_input(self._inference_data)
self.info.update(
{
"type": "dataframe",
"pandas_orient": "split",
}
)
else:
self._inference_data = _coerce_to_pandas_df(input_example)
if self._inference_data is None:
raise TypeError(
"Expected one of the following types:\n"
"- pandas.DataFrame\n"
"- numpy.ndarray\n"
"- dictionary of (name -> numpy.ndarray)\n"
"- scipy.sparse.csr_matrix\n"
"- scipy.sparse.csc_matrix\n"
"- dict\n"
"- list\n"
"- scalars\n"
f"but got '{type(input_example)}'",
)
self.data = _handle_dataframe_input(self._inference_data)
orient = "split" if "columns" in self.data else "values"
self.info.update(
{
"type": "dataframe",
"pandas_orient": orient,
}
)
def save(self, parent_dir_path: str):
"""Save the example as json at ``parent_dir_path``/`self.info['artifact_path']`."""
if self._inference_params is not None:
data = {EXAMPLE_DATA_KEY: self.data, EXAMPLE_PARAMS_KEY: self._inference_params}
else:
data = self.data
with open(os.path.join(parent_dir_path, self.info[INPUT_EXAMPLE_PATH]), "w") as f:
json.dump(data, f, cls=NumpyEncoder)
@property
def inference_data(self):
"""
Returns the input example in a form that PyFunc wrapped models can score.
"""
return self._inference_data
@property
def inference_params(self):
"""
Returns the params dictionary that PyFunc wrapped models can use for scoring.
"""
return self._inference_params
def _contains_params(input_example):
# For tuple input, we assume the first item is input_example data
# and the second item is params dictionary.
return (
isinstance(input_example, tuple)
and len(input_example) == 2
and isinstance(input_example[1], dict)
)
def _save_example(
mlflow_model: Model, input_example: ModelInputExample, path: str, no_conversion=False
):
"""
Saves example to a file on the given path and updates passed Model with example metadata.
The metadata is a dictionary with the following fields:
- 'artifact_path': example path relative to the model directory.
- 'type': Type of example. Currently the supported values are 'dataframe' and 'ndarray'
- One of the following metadata based on the `type`:
- 'pandas_orient': Used to store dataframes. Determines the json encoding for dataframe
examples in terms of pandas orient convention. Defaults to 'split'.
- 'format: Used to store tensors. Determines the standard used to store a tensor input
example. MLflow uses a JSON-formatted string representation of TF serving
input.
Args:
mlflow_model: Model metadata that will get updated with the example metadata.
path: Where to store the example file. Should be model the model directory.
"""
if no_conversion:
example_info = {
INPUT_EXAMPLE_PATH: EXAMPLE_FILENAME,
"type": "json_object",
}
try:
with open(os.path.join(path, example_info[INPUT_EXAMPLE_PATH]), "w") as f:
json.dump(input_example, f, cls=NumpyEncoder)
except Exception as e:
raise MlflowException.invalid_parameter_value(
"Failed to save input example. Please make sure the input example is jsonable "
f"when no_conversion is True. Got error: {e}"
) from e
else:
mlflow_model.saved_input_example_info = example_info
else:
example = _Example(input_example)
example.save(path)
mlflow_model.saved_input_example_info = example.info
def _get_mlflow_model_input_example_dict(mlflow_model: Model, path: str):
"""
Args:
mlflow_model: Model metadata.
path: Path to the model directory.
Returns:
Input example or None if the model has no example.
"""
if mlflow_model.saved_input_example_info is None:
return None
example_type = mlflow_model.saved_input_example_info["type"]
if example_type not in [
"dataframe",
"ndarray",
"sparse_matrix_csc",
"sparse_matrix_csr",
"json_object",
]:
raise MlflowException(f"This version of mlflow can not load example of type {example_type}")
path = os.path.join(path, mlflow_model.saved_input_example_info["artifact_path"])
with open(path) as handle:
return json.load(handle)
def _read_example(mlflow_model: Model, path: str):
"""
Read example from a model directory. Returns None if there is no example metadata (i.e. the
model was saved without example). Raises FileNotFoundError if there is model metadata but the
example file is missing.
Args:
mlflow_model: Model metadata.
path: Path to the model directory.
Returns:
Input example data or None if the model has no example.
"""
input_example = _get_mlflow_model_input_example_dict(mlflow_model, path)
if input_example is None:
return None
example_type = mlflow_model.saved_input_example_info["type"]
input_schema = mlflow_model.signature.inputs if mlflow_model.signature is not None else None
if mlflow_model.saved_input_example_info.get(EXAMPLE_PARAMS_KEY, None):
input_example = input_example[EXAMPLE_DATA_KEY]
if example_type == "json_object":
return input_example
if example_type == "ndarray":
return _read_tensor_input_from_json(input_example, schema=input_schema)
if example_type in ["sparse_matrix_csc", "sparse_matrix_csr"]:
return _read_sparse_matrix_from_json(input_example, example_type)
return dataframe_from_parsed_json(input_example, pandas_orient="split", schema=input_schema)
def _read_example_params(mlflow_model: Model, path: str):
"""
Read params of input_example from a model directory. Returns None if there is no params
in the input_example or the model was saved without example.
"""
if (
mlflow_model.saved_input_example_info is None
or mlflow_model.saved_input_example_info.get(EXAMPLE_PARAMS_KEY, None) is None
):
return None
input_example_dict = _get_mlflow_model_input_example_dict(mlflow_model, path)
return input_example_dict[EXAMPLE_PARAMS_KEY]
def _read_tensor_input_from_json(path_or_data, schema=None):
if isinstance(path_or_data, str) and os.path.exists(path_or_data):
with open(path_or_data) as handle:
inp_dict = json.load(handle)
else:
inp_dict = path_or_data
return parse_tf_serving_input(inp_dict, schema)
def _read_sparse_matrix_from_json(path_or_data, example_type):
if isinstance(path_or_data, str) and os.path.exists(path_or_data):
with open(path_or_data) as handle:
matrix_data = json.load(handle)
else:
matrix_data = path_or_data
data = matrix_data["data"]
indices = matrix_data["indices"]
indptr = matrix_data["indptr"]
shape = tuple(matrix_data["shape"])
if example_type == "sparse_matrix_csc":
return csc_matrix((data, indices, indptr), shape=shape)
else:
return csr_matrix((data, indices, indptr), shape=shape)
def plot_lines(data_series, xlabel, ylabel, legend_loc=None, line_kwargs=None, title=None):
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
if line_kwargs is None:
line_kwargs = {}
for label, data_x, data_y in data_series:
ax.plot(data_x, data_y, label=label, **line_kwargs)
if legend_loc:
ax.legend(loc=legend_loc)
ax.set(xlabel=xlabel, ylabel=ylabel, title=title)
return fig, ax
def _enforce_tensor_spec(
values: Union[np.ndarray, "csc_matrix", "csr_matrix"],
tensor_spec: TensorSpec,
):
"""
Enforce the input tensor shape and type matches the provided tensor spec.
"""
expected_shape = tensor_spec.shape
expected_type = tensor_spec.type
actual_shape = values.shape
actual_type = values.dtype if isinstance(values, np.ndarray) else values.data.dtype
# This logic is for handling "ragged" arrays. The first check is for a standard numpy shape
# representation of a ragged array. The second is for handling a more manual specification
# of shape while support an input which is a ragged array.
if len(expected_shape) == 1 and expected_shape[0] == -1 and expected_type == np.dtype("O"):
# Sample spec: Tensor('object', (-1,))
# Will pass on any provided input
return values
if (
len(expected_shape) > 1
and -1 in expected_shape[1:]
and len(actual_shape) == 1
and actual_type == np.dtype("O")
):
# Sample spec: Tensor('float64', (-1, -1, -1, 3))
# Will pass on inputs which are ragged arrays: shape==(x,), dtype=='object'
return values
if len(expected_shape) != len(actual_shape):
raise MlflowException(
f"Shape of input {actual_shape} does not match expected shape {expected_shape}."
)
for expected, actual in zip(expected_shape, actual_shape):
if expected == -1:
continue
if expected != actual:
raise MlflowException(
f"Shape of input {actual_shape} does not match expected shape {expected_shape}."
)
if clean_tensor_type(actual_type) != expected_type:
raise MlflowException(
f"dtype of input {actual_type} does not match expected dtype {expected_type}"
)
return values
def _enforce_mlflow_datatype(name, values: pd.Series, t: DataType):
"""
Enforce the input column type matches the declared in model input schema.
The following type conversions are allowed:
1. object -> string
2. int -> long (upcast)
3. float -> double (upcast)
4. int -> double (safe conversion)
5. np.datetime64[x] -> datetime (any precision)
6. object -> datetime
NB: pandas does not have native decimal data type, when user train and infer
model from pyspark dataframe that contains decimal type, the schema will be
treated as float64.
7. decimal -> double
Any other type mismatch will raise error.
"""
if values.dtype == object and t not in (DataType.binary, DataType.string):
values = values.infer_objects()
if t == DataType.string and values.dtype == object:
# NB: the object can contain any type and we currently cannot cast to pandas Strings
# due to how None is cast
return values
# NB: Comparison of pandas and numpy data type fails when numpy data type is on the left hand
# side of the comparison operator. It works, however, if pandas type is on the left hand side.
# That is because pandas is aware of numpy.
if t.to_pandas() == values.dtype or t.to_numpy() == values.dtype:
# The types are already compatible => conversion is not necessary.
return values
if t == DataType.binary and values.dtype.kind == t.binary.to_numpy().kind:
# NB: bytes in numpy have variable itemsize depending on the length of the longest
# element in the array (column). Since MLflow binary type is length agnostic, we ignore
# itemsize when matching binary columns.
return values
if t == DataType.datetime and values.dtype.kind == t.to_numpy().kind:
# NB: datetime values have variable precision denoted by brackets, e.g. datetime64[ns]
# denotes nanosecond precision. Since MLflow datetime type is precision agnostic, we
# ignore precision when matching datetime columns.
return values.astype(np.dtype("datetime64[ns]"))
if t == DataType.datetime and (values.dtype == object or values.dtype == t.to_python()):
# NB: Pyspark date columns get converted to object when converted to a pandas
# DataFrame. To respect the original typing, we convert the column to datetime.
try:
return values.astype(np.dtype("datetime64[ns]"), errors="raise")
except ValueError as e:
raise MlflowException(
f"Failed to convert column {name} from type {values.dtype} to {t}."
) from e
if t == DataType.boolean and values.dtype == object:
# Should not convert type otherwise it converts None to boolean False
return values
if t == DataType.double and values.dtype == decimal.Decimal:
# NB: Pyspark Decimal column get converted to decimal.Decimal when converted to pandas
# DataFrame. In order to support decimal data training from spark data frame, we add this
# conversion even we might lose the precision.
try:
return pd.to_numeric(values, errors="raise")
except ValueError:
raise MlflowException(
f"Failed to convert column {name} from type {values.dtype} to {t}."
)
numpy_type = t.to_numpy()
if values.dtype.kind == numpy_type.kind:
is_upcast = values.dtype.itemsize <= numpy_type.itemsize
elif values.dtype.kind == "u" and numpy_type.kind == "i":
is_upcast = values.dtype.itemsize < numpy_type.itemsize
elif values.dtype.kind in ("i", "u") and numpy_type == np.float64:
# allow (u)int => double conversion
is_upcast = values.dtype.itemsize <= 6
else:
is_upcast = False
if is_upcast:
return values.astype(numpy_type, errors="raise")
else:
# NB: conversion between incompatible types (e.g. floats -> ints or
# double -> float) are not allowed. While supported by pandas and numpy,
# these conversions alter the values significantly.
def all_ints(xs):
return all(pd.isnull(x) or int(x) == x for x in xs)
hint = ""
if (
values.dtype == np.float64
and numpy_type.kind in ("i", "u")
and values.hasnans
and all_ints(values)
):
hint = (
" Hint: the type mismatch is likely caused by missing values. "
"Integer columns in python can not represent missing values and are therefore "
"encoded as floats. The best way to avoid this problem is to infer the model "
"schema based on a realistic data sample (training dataset) that includes missing "
"values. Alternatively, you can declare integer columns as doubles (float64) "
"whenever these columns may have missing values. See `Handling Integers With "
"Missing Values <https://www.mlflow.org/docs/latest/models.html#"
"handling-integers-with-missing-values>`_ for more details."
)
raise MlflowException(
f"Incompatible input types for column {name}. "
f"Can not safely convert {values.dtype} to {numpy_type}.{hint}"
)
def _enforce_unnamed_col_schema(pf_input: pd.DataFrame, input_schema: Schema):
"""Enforce the input columns conform to the model's column-based signature."""
input_names = pf_input.columns[: len(input_schema.inputs)]
input_types = input_schema.input_types()
new_pf_input = {}
for i, x in enumerate(input_names):
if isinstance(input_types[i], DataType):
new_pf_input[x] = _enforce_mlflow_datatype(x, pf_input[x], input_types[i])
# If the input_type is objects/arrays, we assume pf_input must be a pandas DataFrame.
# Otherwise, the schema is not valid.
elif isinstance(input_types[i], Object):
new_pf_input[x] = pd.Series(
[_enforce_object(obj, input_types[i]) for obj in pf_input[x]], name=x
)
elif isinstance(input_types[i], Array):
new_pf_input[x] = pd.Series(
[_enforce_array(arr, input_types[i]) for arr in pf_input[x]], name=x
)
return pd.DataFrame(new_pf_input)
def _enforce_named_col_schema(pf_input: pd.DataFrame, input_schema: Schema):
"""Enforce the input columns conform to the model's column-based signature."""
input_names = input_schema.input_names()
input_dict = input_schema.input_dict()
new_pf_input = {}
for name in input_names:
input_type = input_dict[name].type
required = input_dict[name].required
if name not in pf_input:
if required:
raise MlflowException(
f"The input column '{name}' is required by the model "
"signature but missing from the input data."
)
else:
continue
if isinstance(input_type, DataType):
new_pf_input[name] = _enforce_mlflow_datatype(name, pf_input[name], input_type)
# If the input_type is objects/arrays, we assume pf_input must be a pandas DataFrame.
# Otherwise, the schema is not valid.
elif isinstance(input_type, Object):
new_pf_input[name] = pd.Series(
[_enforce_object(obj, input_type, required) for obj in pf_input[name]], name=name
)
elif isinstance(input_type, Array):
new_pf_input[name] = pd.Series(
[_enforce_array(arr, input_type, required) for arr in pf_input[name]], name=name
)
return pd.DataFrame(new_pf_input)
def _reshape_and_cast_pandas_column_values(name, pd_series, tensor_spec):
if tensor_spec.shape[0] != -1 or -1 in tensor_spec.shape[1:]:
raise MlflowException(
"For pandas dataframe input, the first dimension of shape must be a variable "
"dimension and other dimensions must be fixed, but in model signature the shape "
f"of {'input ' + name if name else 'the unnamed input'} is {tensor_spec.shape}."
)
if np.isscalar(pd_series[0]):
for shape in [(-1,), (-1, 1)]:
if tensor_spec.shape == shape:
return _enforce_tensor_spec(
np.array(pd_series, dtype=tensor_spec.type).reshape(shape), tensor_spec
)
raise MlflowException(
f"The input pandas dataframe column '{name}' contains scalar "
"values, which requires the shape to be (-1,) or (-1, 1), but got tensor spec "
f"shape of {tensor_spec.shape}.",
error_code=INVALID_PARAMETER_VALUE,
)
elif isinstance(pd_series[0], list) and np.isscalar(pd_series[0][0]):
# If the pandas column contains list type values,
# in this case, the shape and type information is lost,
# so do not enforce the shape and type, instead,
# reshape the array value list to the required shape, and cast value type to
# required type.
reshape_err_msg = (
f"The value in the Input DataFrame column '{name}' could not be converted to the "
f"expected shape of: '{tensor_spec.shape}'. Ensure that each of the input list "
"elements are of uniform length and that the data can be coerced to the tensor "
f"type '{tensor_spec.type}'"
)
try:
flattened_numpy_arr = np.vstack(pd_series.tolist())
reshaped_numpy_arr = flattened_numpy_arr.reshape(tensor_spec.shape).astype(
tensor_spec.type
)
except ValueError:
raise MlflowException(reshape_err_msg, error_code=INVALID_PARAMETER_VALUE)
if len(reshaped_numpy_arr) != len(pd_series):
raise MlflowException(reshape_err_msg, error_code=INVALID_PARAMETER_VALUE)
return reshaped_numpy_arr
elif isinstance(pd_series[0], np.ndarray):
reshape_err_msg = (
f"The value in the Input DataFrame column '{name}' could not be converted to the "
f"expected shape of: '{tensor_spec.shape}'. Ensure that each of the input numpy "
"array elements are of uniform length and can be reshaped to above expected shape."
)
try:
# Because numpy array includes precise type information, so we don't convert type
# here, so that in following schema validation we can have strict type check on
# numpy array column.
reshaped_numpy_arr = np.vstack(pd_series.tolist()).reshape(tensor_spec.shape)
except ValueError:
raise MlflowException(reshape_err_msg, error_code=INVALID_PARAMETER_VALUE)
if len(reshaped_numpy_arr) != len(pd_series):
raise MlflowException(reshape_err_msg, error_code=INVALID_PARAMETER_VALUE)
return reshaped_numpy_arr
else:
raise MlflowException(
"Because the model signature requires tensor spec input, the input "
"pandas dataframe values should be either scalar value, python list "
"containing scalar values or numpy array containing scalar values, "
"other types are not supported.",
error_code=INVALID_PARAMETER_VALUE,
)
def _enforce_tensor_schema(pf_input: PyFuncInput, input_schema: Schema):
"""Enforce the input tensor(s) conforms to the model's tensor-based signature."""
def _is_sparse_matrix(x):
if not HAS_SCIPY:
# we can safely assume that it's not a sparse matrix if scipy is not installed
return False
return isinstance(x, (csr_matrix, csc_matrix))
if input_schema.has_input_names():
if isinstance(pf_input, dict):
new_pf_input = {}
for col_name, tensor_spec in zip(input_schema.input_names(), input_schema.inputs):
if not isinstance(pf_input[col_name], np.ndarray):
raise MlflowException(
"This model contains a tensor-based model signature with input names,"
" which suggests a dictionary input mapping input name to a numpy"
f" array, but a dict with value type {type(pf_input[col_name])} was found.",
error_code=INVALID_PARAMETER_VALUE,
)
new_pf_input[col_name] = _enforce_tensor_spec(pf_input[col_name], tensor_spec)
elif isinstance(pf_input, pd.DataFrame):
new_pf_input = {}
for col_name, tensor_spec in zip(input_schema.input_names(), input_schema.inputs):
pd_series = pf_input[col_name]
new_pf_input[col_name] = _reshape_and_cast_pandas_column_values(
col_name, pd_series, tensor_spec
)
else:
raise MlflowException(
"This model contains a tensor-based model signature with input names, which"
" suggests a dictionary input mapping input name to tensor, or a pandas"
" DataFrame input containing columns mapping input name to flattened list value"
f" from tensor, but an input of type {type(pf_input)} was found.",
error_code=INVALID_PARAMETER_VALUE,
)
else:
tensor_spec = input_schema.inputs[0]
if isinstance(pf_input, pd.DataFrame):
num_input_columns = len(pf_input.columns)
if pf_input.empty:
raise MlflowException("Input DataFrame is empty.")
elif num_input_columns == 1:
new_pf_input = _reshape_and_cast_pandas_column_values(
None, pf_input[pf_input.columns[0]], tensor_spec
)
else:
if tensor_spec.shape != (-1, num_input_columns):
raise MlflowException(
"This model contains a model signature with an unnamed input. Since the "
"input data is a pandas DataFrame containing multiple columns, "
"the input shape must be of the structure "
"(-1, number_of_dataframe_columns). "
f"Instead, the input DataFrame passed had {num_input_columns} columns and "
f"an input shape of {tensor_spec.shape} with all values within the "
"DataFrame of scalar type. Please adjust the passed in DataFrame to "
"match the expected structure",
error_code=INVALID_PARAMETER_VALUE,
)
new_pf_input = _enforce_tensor_spec(pf_input.to_numpy(), tensor_spec)
elif isinstance(pf_input, np.ndarray) or _is_sparse_matrix(pf_input):
new_pf_input = _enforce_tensor_spec(pf_input, tensor_spec)
else:
raise MlflowException(
"This model contains a tensor-based model signature with no input names,"
" which suggests a numpy array input or a pandas dataframe input with"
f" proper column values, but an input of type {type(pf_input)} was found.",
error_code=INVALID_PARAMETER_VALUE,
)
return new_pf_input
def _enforce_schema(pf_input: PyFuncInput, input_schema: Schema, flavor: Optional[str] = None):
"""
Enforces the provided input matches the model's input schema,
For signatures with input names, we check there are no missing inputs and reorder the inputs to
match the ordering declared in schema if necessary. Any extra columns are ignored.
For column-based signatures, we make sure the types of the input match the type specified in
the schema or if it can be safely converted to match the input schema.
For Pyspark DataFrame inputs, MLflow casts a sample of the PySpark DataFrame into a Pandas
DataFrame. MLflow will only enforce the schema on a subset of the data rows.
For tensor-based signatures, we make sure the shape and type of the input matches the shape
and type specified in model's input schema.
"""
def _is_scalar(x):
return np.isscalar(x) or x is None
original_pf_input = pf_input
if isinstance(pf_input, pd.Series):
pf_input = pd.DataFrame(pf_input)
if not input_schema.is_tensor_spec():
# convert single DataType to pandas DataFrame
if np.isscalar(pf_input):
pf_input = pd.DataFrame([pf_input])
elif isinstance(pf_input, dict):
# keys are column names
if any(
isinstance(col_spec.type, (Array, Object)) for col_spec in input_schema.inputs
) or all(
_is_scalar(value)
or (isinstance(value, list) and all(isinstance(item, str) for item in value))
for value in pf_input.values()
):
pf_input = pd.DataFrame([pf_input])
else:
try:
# This check is specifically to handle the serving structural cast for
# certain inputs for the transformers implementation. Due to the fact that
# specific Pipeline types in transformers support passing input data
# of the form Dict[str, str] in which the value is a scalar string, model
# serving will cast this entry as a numpy array with shape () and size 1.
# This is seen as a scalar input when attempting to create a Pandas
# DataFrame from such a numpy structure and requires the array to be
# encapsulated in a list in order to prevent a ValueError exception for
# requiring an index if passing in all scalar values thrown by Pandas.
if all(
isinstance(value, np.ndarray)
and value.dtype.type == np.str_
and value.size == 1
and value.shape == ()
for value in pf_input.values()
):
pf_input = pd.DataFrame([pf_input])
elif any(
isinstance(value, np.ndarray) and value.ndim > 1
for value in pf_input.values()
):
# Pandas DataFrames can't be constructed with embedded multi-dimensional
# numpy arrays. Accordingly, we convert any multi-dimensional numpy
# arrays to lists before constructing a DataFrame. This is safe because
# ColSpec model signatures do not support array columns, so subsequent
# validation logic will result in a clear "incompatible input types"
# exception. This is preferable to a pandas DataFrame construction error
pf_input = pd.DataFrame(
{
key: (
value.tolist()
if (isinstance(value, np.ndarray) and value.ndim > 1)
else value
)
for key, value in pf_input.items()
}
)
else:
pf_input = pd.DataFrame(pf_input)
except Exception as e:
raise MlflowException(
"This model contains a column-based signature, which suggests a DataFrame"
" input. There was an error casting the input data to a DataFrame:"
f" {e}"
)
elif isinstance(pf_input, (list, np.ndarray, pd.Series)):
pf_input = pd.DataFrame(pf_input)
elif HAS_PYSPARK and isinstance(pf_input, SparkDataFrame):
pf_input = pf_input.limit(10).toPandas()
for field in original_pf_input.schema.fields:
if isinstance(field.dataType, (StructType, ArrayType)):
pf_input[field.name] = pf_input[field.name].apply(
lambda row: convert_complex_types_pyspark_to_pandas(row, field.dataType)
)
if not isinstance(pf_input, pd.DataFrame):
raise MlflowException(
f"Expected input to be DataFrame. Found: {type(pf_input).__name__}"
)
if input_schema.has_input_names():
# make sure there are no missing columns
input_names = input_schema.required_input_names()
optional_names = input_schema.optional_input_names()
expected_required_cols = set(input_names)
actual_cols = set()
optional_cols = set(optional_names)
if len(expected_required_cols) == 1 and isinstance(pf_input, np.ndarray):
# for schemas with a single column, match input with column
pf_input = {input_names[0]: pf_input}
actual_cols = expected_required_cols
elif isinstance(pf_input, pd.DataFrame):
actual_cols = set(pf_input.columns)
elif isinstance(pf_input, dict):
actual_cols = set(pf_input.keys())
missing_cols = expected_required_cols - actual_cols
extra_cols = actual_cols - expected_required_cols - optional_cols
# Preserve order from the original columns, since missing/extra columns are likely to
# be in same order.
missing_cols = [c for c in input_names if c in missing_cols]
extra_cols = [c for c in actual_cols if c in extra_cols]
if missing_cols:
message = f"Model is missing inputs {missing_cols}."
if extra_cols:
message += f" Note that there were extra inputs: {extra_cols}"
raise MlflowException(message)
elif not input_schema.is_tensor_spec():
# The model signature does not specify column names => we can only verify column count.
num_actual_columns = len(pf_input.columns)
if num_actual_columns < len(input_schema.inputs):
raise MlflowException(
"Model inference is missing inputs. The model signature declares "
"{} inputs but the provided value only has "
"{} inputs. Note: the inputs were not named in the signature so we can "
"only verify their count.".format(len(input_schema.inputs), num_actual_columns)
)
if input_schema.is_tensor_spec():
return _enforce_tensor_schema(pf_input, input_schema)
elif HAS_PYSPARK and isinstance(original_pf_input, SparkDataFrame):
return _enforce_pyspark_dataframe_schema(
original_pf_input, pf_input, input_schema, flavor=flavor
)
else: