/
types.py
263 lines (219 loc) · 10 KB
/
types.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# Lint as: python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Types."""
from __future__ import absolute_import
from __future__ import division
# Standard __future__ imports
from __future__ import print_function
import datetime
from typing import Any, Callable, Dict, List, Optional, Text, Tuple, Union, NamedTuple
import numpy as np
import six
import tensorflow as tf
from tfx_bsl.beam import shared
# pylint: disable=invalid-name
TensorType = Union[tf.Tensor, tf.SparseTensor]
TensorOrOperationType = Union[TensorType, tf.Operation]
DictOfTensorType = Dict[Text, TensorType]
TensorTypeMaybeDict = Union[TensorType, DictOfTensorType]
TensorValue = Union[tf.compat.v1.SparseTensorValue, np.ndarray]
DictOfTensorValue = Dict[Text, TensorValue]
TensorValueMaybeDict = Union[TensorValue, DictOfTensorValue]
MetricVariablesType = List[Any]
class ValueWithTDistribution(
NamedTuple('ValueWithTDistribution', [
('sample_mean', float),
('sample_standard_deviation', float),
('sample_degrees_of_freedom', int),
('unsampled_value', float),
])):
r"""Represents the t-distribution value.
It includes sample_mean, sample_standard_deviation,
sample_degrees_of_freedom. And also unsampled_value is also stored here to
record the value calculated without bootstrapping.
The sample_standard_deviation is calculated as:
\sqrt{ \frac{1}{N-1} \sum_{i=1}^{N}{(x_i - \bar{x})^2} }
"""
def __new__(
cls,
sample_mean: float,
sample_standard_deviation: Optional[float] = None,
sample_degrees_of_freedom: Optional[int] = None,
unsampled_value: Optional[float] = None,
):
return super(ValueWithTDistribution,
cls).__new__(cls, sample_mean, sample_standard_deviation,
sample_degrees_of_freedom, unsampled_value)
# AddMetricsCallback should have the following prototype:
# def add_metrics_callback(features_dict, predictions_dict, labels_dict):
#
# It should create and return a metric_ops dictionary, such that
# metric_ops['metric_name'] = (value_op, update_op), just as in the Trainer.
#
# Note that features_dict, predictions_dict and labels_dict are not
# necessarily dictionaries - they might also be Tensors, depending on what the
# model's eval_input_receiver_fn returns.
# pyformat: disable
AddMetricsCallbackType = Any
# pyformat: enable
# Type of keys we support for prediction, label and features dictionaries.
FPLKeyType = Union[Text, Tuple[Text, ...]]
# Dictionary of Tensor values fetched. The dictionary maps original dictionary
# keys => ('node' => value). This type exists for backward compatibility with
# FeaturesPredictionsLabels, new code should use DictOfTensorValue instead.
DictOfFetchedTensorValues = Dict[FPLKeyType, Dict[Text, TensorValue]]
FeaturesPredictionsLabels = NamedTuple(
'FeaturesPredictionsLabels', [('input_ref', int),
('features', DictOfFetchedTensorValues),
('predictions', DictOfFetchedTensorValues),
('labels', DictOfFetchedTensorValues)])
# Used in building the model diagnostics table, a MaterializedColumn is a value
# inside of Extracts that will be emitted to file. Note that for strings, the
# values are raw byte strings rather than unicode strings. This is by design, as
# features can have arbitrary bytes values.
MaterializedColumn = NamedTuple(
'MaterializedColumn',
[('name', Text),
('value', Union[List[bytes], List[int], List[float], bytes, int, float])])
# Extracts represent data extracted during pipeline processing. In order to
# provide a flexible API, these types are just dicts where the keys are defined
# (reserved for use) by different extractor implementations. For example, the
# PredictExtractor stores the data for the features, labels, and predictions
# under the keys "features", "labels", and "predictions".
Extracts = Dict[Text, Any]
# pylint: enable=invalid-name
def is_tensor(obj):
return isinstance(obj, tf.Tensor) or isinstance(obj, tf.SparseTensor)
class ModelLoader(object):
"""Model loader is responsible for loading shared model types.
Attributes:
construct_fn: A callable which creates the model instance. The callable
should take no args as input (typically a closure is used to capture
necessary parameters).
tags: Optional model tags (e.g. 'serve' for serving or 'eval' for
EvalSavedModel).
"""
__slots__ = ['construct_fn', 'tags', '_shared_handle']
def __init__(self,
construct_fn: Callable[[], Any],
tags: Optional[List[Text]] = None):
self.construct_fn = construct_fn
self.tags = tags
self._shared_handle = shared.Shared()
def load(
self,
model_load_time_callback: Optional[Callable[[int], None]] = None) -> Any:
"""Returns loaded model.
Args:
model_load_time_callback: Optional callback to track load time.
"""
if model_load_time_callback:
construct_fn = self._construct_fn_with_load_time(model_load_time_callback)
else:
construct_fn = self.construct_fn
return self._shared_handle.acquire(construct_fn)
def _construct_fn_with_load_time(
self, model_load_time_callback: Callable[[int],
None]) -> Callable[[], Any]:
"""Wraps actual construct fn to allow for load time metrics."""
def with_load_times():
start_time = datetime.datetime.now()
model = self.construct_fn()
end_time = datetime.datetime.now()
model_load_time_callback(int((end_time - start_time).total_seconds()))
return model
return with_load_times
class EvalSharedModel(
NamedTuple(
'EvalSharedModel',
[
('model_path', Text),
('add_metrics_callbacks',
List[Callable]), # List[AnyMetricsCallbackType]
('include_default_metrics', bool),
('example_weight_key', Union[Text, Dict[Text, Text]]),
('additional_fetches', List[Text]),
('model_loader', ModelLoader),
('model_name', Text),
('model_type', Text),
])):
# pyformat: disable
"""Shared model used during extraction and evaluation.
Attributes:
model_path: Path to EvalSavedModel (containing the saved_model.pb file).
add_metrics_callbacks: Optional list of callbacks for adding additional
metrics to the graph. The names of the metrics added by the callbacks
should not conflict with existing metrics. See below for more details
about what each callback should do. The callbacks are only used during
evaluation.
include_default_metrics: True to include the default metrics that are part
of the saved model graph during evaluation.
example_weight_key: Example weight key (single-output model) or dict of
example weight keys (multi-output model) keyed by output_name.
additional_fetches: Prefixes of additional tensors stored in
signature_def.inputs that should be fetched at prediction time. The
"features" and "labels" tensors are handled automatically and should not
be included in this list.
model_loader: Model loader.
model_name: Model name (should align with ModelSpecs.name).
model_type: Model type (tfma.TF_KERAS, tfma.TF_LITE, tfma.TF_ESTIMATOR, ..).
More details on add_metrics_callbacks:
Each add_metrics_callback should have the following prototype:
def add_metrics_callback(features_dict, predictions_dict, labels_dict):
Note that features_dict, predictions_dict and labels_dict are not
necessarily dictionaries - they might also be Tensors, depending on what the
model's eval_input_receiver_fn returns.
It should create and return a metric_ops dictionary, such that
metric_ops['metric_name'] = (value_op, update_op), just as in the Trainer.
Short example:
def add_metrics_callback(features_dict, predictions_dict, labels):
metrics_ops = {}
metric_ops['mean_label'] = tf.metrics.mean(labels)
metric_ops['mean_probability'] = tf.metrics.mean(tf.slice(
predictions_dict['probabilities'], [0, 1], [2, 1]))
return metric_ops
"""
# pyformat: enable
def __new__(
cls,
model_path: Optional[Text] = None,
add_metrics_callbacks: Optional[List[AddMetricsCallbackType]] = None,
include_default_metrics: Optional[bool] = True,
example_weight_key: Optional[Union[Text, Dict[Text, Text]]] = None,
additional_fetches: Optional[List[Text]] = None,
model_loader: Optional[ModelLoader] = None,
model_name: Text = '',
model_type: Text = '',
construct_fn: Optional[Callable[[], Any]] = None):
if not add_metrics_callbacks:
add_metrics_callbacks = []
if model_loader and construct_fn:
raise ValueError(
'only one of model_loader or construct_fn should be used')
if construct_fn:
model_loader = ModelLoader(tags=None, construct_fn=construct_fn)
if model_path is not None:
model_path = six.ensure_str(model_path)
return super(EvalSharedModel,
cls).__new__(cls, model_path, add_metrics_callbacks,
include_default_metrics, example_weight_key,
additional_fetches, model_loader, model_name,
model_type)
# MaybeMultipleEvalSharedModels represents a parameter that can take on a single
# model or a list of models.
#
# TODO(b/150416505): Deprecate support for dict.
MaybeMultipleEvalSharedModels = Union[EvalSharedModel, List[EvalSharedModel],
Dict[Text, EvalSharedModel]]