-
Notifications
You must be signed in to change notification settings - Fork 211
/
analyzers.py
2701 lines (2267 loc) · 108 KB
/
analyzers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions that involve a full pass over the dataset.
This module contains functions that are used in the preprocessing function, to
define a full pass operation such as computing the sum, min, max or unique
values of a tensor over the entire dataset. This is implemented by a reduction
operation in the Beam implementation.
From the user's point of view, an analyzer appears as a regular TensorFlow
function, i.e. it accepts and returns tensors. However it is represented in
the graph as a `Analyzer` which is not a TensorFlow op, but a placeholder for
the computation that takes place outside of TensorFlow.
"""
import functools
import os
import pickle
import re
from typing import Any, Callable, Collection, List, Optional, Sequence, Tuple, Union
from absl import logging
import numpy as np
import pyarrow as pa
import tensorflow as tf
from tensorflow_transform import analyzer_nodes
from tensorflow_transform import annotators
from tensorflow_transform import common
from tensorflow_transform import common_types
from tensorflow_transform import gaussianization
from tensorflow_transform import nodes
from tensorflow_transform import schema_inference
from tensorflow_transform import tf_utils
from tfx_bsl import sketches
# TODO(b/243513856): Switch to `collections.namedtuple` or `typing.NamedTuple`
# once the Spark issue is resolved.
from tfx_bsl.types import tfx_namedtuple
from typing_extensions import Literal
from google.protobuf import descriptor_pb2
__all__ = [
'count_per_key',
'covariance',
'histogram',
'max',
'mean',
'min',
'pca',
'quantiles',
'size',
'sum',
'tukey_location',
'tukey_scale',
'tukey_h_params',
'var',
'vocabulary',
]
# This module defines max and min functions that override the builtins.
builtin_max = max
builtin_min = min
DEFAULT_VOCABULARY_FILE_FORMAT: Literal['text'] = 'text'
ALLOWED_VOCABULARY_FILE_FORMATS = ('text', 'tfrecord_gzip')
VOCAB_FILENAME_PREFIX = 'vocab_'
VOCAB_FREQUENCY_FILENAME_PREFIX = 'vocab_frequency_'
# Experimentally estimated value of top_k after which the exact `tft.vocabulary`
# implementation becomes more efficient than
# `tft.experimental.approximate_vocabulary`.
LARGE_VOCAB_TOP_K = 200_000
# Matches empty strings and strings with \n or \r (including strings with \n or
# \r that contain invalid UTF-8 characters). This has to follow the re2 syntax:
# https://github.com/google/re2/wiki/Syntax.
_EMPTY_STRING_OR_NEWLINE_CHARS_REGEX = r'^$|\C*[\n\r]\C*'
# For some input types, widen the output type of sum analyzer to avoid overflow.
_SUM_OUTPUT_DTYPE_MAP = {
tf.float16: tf.float32,
tf.float32: tf.float32,
tf.float64: tf.float64,
tf.int8: tf.int64,
tf.int16: tf.int64,
tf.int32: tf.int64,
tf.int64: tf.int64,
tf.uint8: tf.uint64,
tf.uint16: tf.uint64,
tf.uint32: tf.uint64,
tf.uint64: tf.uint64,
}
_FLOAT_OUTPUT_DTYPE_MAP = {
tf.float16: tf.float16,
tf.float32: tf.float32,
tf.float64: tf.float64,
tf.int8: tf.float32,
tf.int16: tf.float32,
tf.int32: tf.float32,
tf.int64: tf.float32,
tf.uint8: tf.float32,
tf.uint16: tf.float32,
tf.uint32: tf.float32,
tf.uint64: tf.float32,
}
def apply_cacheable_combine_operation(
combiner: analyzer_nodes.Combiner,
*tensor_inputs: common_types.TensorType) -> Tuple[nodes.ValueNode, ...]:
"""Applies combine operation nodes over the whole dataset.
Applied nodes are subject to analyzer cache optimization.
Args:
combiner: Combiner to be applied.
*tensor_inputs: Tensors representing inputs to the combiner.
Returns:
A tuple of ValueNodes representing outputs of the combiner.
"""
input_values_node = analyzer_nodes.get_input_tensors_value_nodes(
tensor_inputs)
accumulate_outputs_value_nodes = nodes.apply_multi_output_operation(
analyzer_nodes.CacheableCombineAccumulate,
input_values_node,
combiner=combiner)
merge_outputs_value_nodes = nodes.apply_multi_output_operation(
analyzer_nodes.CacheableCombineMerge,
*accumulate_outputs_value_nodes,
combiner=combiner)
return nodes.apply_multi_output_operation(
analyzer_nodes.ExtractCombineMergeOutputs,
*merge_outputs_value_nodes,
output_tensor_info_list=combiner.output_tensor_infos())
def _apply_cacheable_combiner(
combiner: analyzer_nodes.Combiner,
*tensor_inputs: common_types.TensorType) -> Tuple[tf.Tensor, ...]:
"""Applies the combiner over the whole dataset possibly utilizing cache.
Similar to above but returns a tuple of output tensors.
Args:
combiner: Combiner to be applied.
*tensor_inputs: Tensors representing inputs to the combiner.
Returns:
A tuple of tensors representing outputs of the combiner.
"""
outputs_value_nodes = apply_cacheable_combine_operation(
combiner, *tensor_inputs)
return tuple(map(analyzer_nodes.wrap_as_tensor, outputs_value_nodes)) # pytype: disable=bad-return-type
def _apply_cacheable_combiner_per_key(
combiner: analyzer_nodes.Combiner,
*tensor_inputs: common_types.TensorType) -> Tuple[tf.Tensor, ...]:
"""Similar to _apply_cacheable_combiner but this is computed per key."""
input_values_node = analyzer_nodes.get_input_tensors_value_nodes(
tensor_inputs)
accumulate_outputs_value_nodes = nodes.apply_multi_output_operation(
analyzer_nodes.CacheableCombinePerKeyAccumulate,
input_values_node,
combiner=combiner)
merge_output_value_node = nodes.apply_operation(
analyzer_nodes.CacheableCombinePerKeyMerge,
*accumulate_outputs_value_nodes,
combiner=combiner)
output_value_nodes = nodes.apply_multi_output_operation(
analyzer_nodes.CacheableCombinePerKeyFormatKeys,
merge_output_value_node,
combiner=combiner)
return tuple(map(analyzer_nodes.wrap_as_tensor, output_value_nodes))
def _apply_cacheable_combiner_per_key_large(
combiner: analyzer_nodes.Combiner, key_vocabulary_filename: str,
*tensor_inputs: common_types.TensorType
) -> Union[tf.Tensor, tf.saved_model.Asset]:
"""Similar to above but saves the combined result to a file."""
input_values_node = analyzer_nodes.get_input_tensors_value_nodes(
tensor_inputs)
accumulate_outputs_value_node = nodes.apply_operation(
analyzer_nodes.CacheableCombinePerKeyAccumulate,
input_values_node,
combiner=combiner)
merge_output_value_node = nodes.apply_operation(
analyzer_nodes.CacheableCombinePerKeyMerge,
accumulate_outputs_value_node,
combiner=combiner)
keys_and_values_node = nodes.apply_operation(
analyzer_nodes.CacheableCombinePerKeyFormatLarge,
merge_output_value_node)
# `store_frequency` is True by default because we want to write some values
# alongside the key "vocabulary". Without doing so it would be equivalent to
# vanilla vocabulary analzyer. `fingerprint_shuffle` is not as important but
# signifies that the values are not required to be ordered here.
key_vocabulary_filename_node = nodes.apply_operation(
analyzer_nodes.VocabularyOrderAndWrite,
keys_and_values_node,
vocab_filename=key_vocabulary_filename,
store_frequency=True,
fingerprint_shuffle=True,
# TODO(b/62379925): Use tfrecord.
file_format='text')
return analyzer_nodes.wrap_as_tensor(key_vocabulary_filename_node)
class NumPyCombiner(analyzer_nodes.Combiner):
"""Combines the PCollection only on the 0th dimension using nparray.
Attributes:
fn: The numpy function representing the reduction to be done.
default_accumulator_value: The default value each accumulator entry is
initialized to.
output_dtypes: The numpy dtype to cast each output to.
output_shapes: List of tuples representing the shapes of the outputs or
Nones if the shapes are not fully defined.
"""
def __init__(self, fn, default_accumulator_value, output_dtypes,
output_shapes):
self._fn = fn
self._default_accumulator_value = default_accumulator_value
self._default_sub_accumulator = np.array(default_accumulator_value)
self._output_dtypes = output_dtypes
if not all(
isinstance(shape, (tuple, type(None))) for shape in output_shapes):
raise TypeError('Expected all tuples or Nones, but got %r' %
output_shapes)
self._output_shapes = output_shapes
if np.isnan(default_accumulator_value):
# This case is needed because np.nan != np.nan.
self._is_default_sub_accumulator = self._equals_to_scalar_nan
else:
self._is_default_sub_accumulator = self._equals_to_default_sub_accumulator
def _equals_to_scalar_nan(self, array):
return not array.shape and np.isnan(array)
def _equals_to_default_sub_accumulator(self, array):
# Note that `np.array_equal` below does at most per-element comparison of
# 0-dim arrays since `_default_sub_accumulator` is a 0-dim array, and
# `np.array_equal` exits early on a shape mismatch.
return np.array_equal(array, self._default_sub_accumulator)
def _is_default_sub_accumulator(self, array):
raise NotImplementedError('Implementation should be set in __init__.')
def create_accumulator(self):
return [
self._create_sub_accumulator(shape)
for shape in self._output_shapes
]
def _create_sub_accumulator(self, shape):
# Returns a default subaccumulator of the given shape if it's fully defined
# and a 0-dim default array otherwise.
if shape is None:
return self._default_sub_accumulator
else:
return np.full(shape, self._default_accumulator_value)
def add_input(self, accumulator, batch_values):
# TODO(b/112414577): Go back to accepting only a single input.
# See comment in _numeric_combine.
# If the first subaccumulator is default, then the accumulator is default
# and can be discarded.
if self._is_default_sub_accumulator(accumulator[0]):
return batch_values
else:
return [
self._fn((sub_accumulator, batch_value), axis=0)
for sub_accumulator, batch_value in zip(accumulator, batch_values)
]
def merge_accumulators(self, accumulators):
# If the first subaccumulator is default, then the accumulator is default
# and can be discarded.
non_default_accumulators = [
accumulator for accumulator in accumulators
if not self._is_default_sub_accumulator(accumulator[0])
]
if non_default_accumulators:
return [
# numpy's sum, min, max, etc functions operate on array-like objects,
# but not arbitrary iterables. Convert the provided sub_accumulators
# into a list.
self._fn(list(sub_accumulators), axis=0)
for sub_accumulators in zip(*non_default_accumulators)
]
else:
return self.create_accumulator()
def extract_output(self, accumulator):
# For each output, cast that output to the specified type. Note there
# will be one output for each input tensor to the analyzer.
return [
sub_accumulator.astype(output_dtype) for sub_accumulator, output_dtype
in zip(accumulator, self._output_dtypes)
]
def output_tensor_infos(self):
return [
analyzer_nodes.TensorInfo(tf.as_dtype(dtype), shape, None)
for dtype, shape in zip(self._output_dtypes, self._output_shapes)
]
def _get_output_shape_from_input(x):
if isinstance(x, tf.SparseTensor):
return x.get_shape().as_list()[1:]
# When reducing over batch dimensions, with known shape, the result will be
# the same shape as the input, but without the batch.
if x.shape.rank is not None:
return x.shape.as_list()[1:]
return (None,)
def _get_elementwise_per_key_output_shape(
x: tf.Tensor, key: Optional[tf.Tensor]) -> Optional[Tuple[int]]:
shape = x.get_shape() if key is None else x.get_shape()[1:]
return tuple(shape) if shape.is_fully_defined() else None
# TODO(b/112414577): Go back to accepting only a single input.
# Currently we accept multiple inputs so that we can implement min and max
# with a single combiner. Once this is done, add a return pytype as well.
def _numeric_combine(inputs: List[tf.Tensor],
fn: Callable[[np.ndarray], np.ndarray],
default_accumulator_value: Union[float, int],
reduce_instance_dims: bool = True,
output_dtypes: Optional[List[tf.DType]] = None,
key: Optional[tf.Tensor] = None,
key_vocabulary_filename: Optional[str] = None):
"""Apply a reduction, defined by a numpy function to multiple inputs.
Args:
inputs: A list of tensors, which will be independently reduced.
fn: A function to reduce tensors across instances/batches, to get a single
output.
default_accumulator_value: The default scalar value that each accumulator
entry is initialized to. Must be properly processed by the reduction
function.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
output_dtypes: (Optional) A list of dtypes of the output tensors. If None,
the output tensor has the same type as the input one.
key: (Optional) Apply the same operation, but on a per-key basis.
key_vocabulary_filename: (Optional) The file name for the key-output mapping
file. If None and key are provided, this combiner assumes the keys fit in
memory and will not store the result in a file. If empty string, a file
name will be chosen based on the current scope. If not an empty string,
should be unique within a given preprocessing function.
Returns:
Either:
(A) A list of Tensors with the same length as `inputs`, representing the
input Tensors that have been reduced by `fn` across instances and
batches (if key_vocabulary_filename is None).
(B) A Tensor with the filename where the key-value mapping is stored (if
key_vocabulary_filename is not None).
"""
for x in inputs:
if not isinstance(x, tf.Tensor):
raise TypeError('Expected a Tensor, but got %r' % x)
if not np.isscalar(default_accumulator_value):
raise TypeError('Expected a scalar, but got %r' % default_accumulator_value)
if output_dtypes is None:
output_dtypes = [x.dtype for x in inputs]
if reduce_instance_dims:
# If reducing over all dimensions, result is scalar.
output_shapes = [() for _ in inputs]
else:
# Reducing over batch dimensions.
output_shapes = [
_get_elementwise_per_key_output_shape(x, key) for x in inputs
]
combiner = NumPyCombiner(fn, default_accumulator_value,
[dtype.as_numpy_dtype for dtype in output_dtypes],
output_shapes)
if key is None:
return _apply_cacheable_combiner(combiner, *inputs)
if key_vocabulary_filename is None:
return _apply_cacheable_combiner_per_key(combiner, key, *inputs)
return _apply_cacheable_combiner_per_key_large(
combiner, _maybe_get_per_key_vocab_filename(key_vocabulary_filename), key,
*inputs)
@common.log_api_use(common.ANALYZER_COLLECTION)
def min( # pylint: disable=redefined-builtin
x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the minimum of the values of `x` over the whole dataset.
In the case of a `CompositeTensor` missing values will be used in return
value: for float, NaN is used and for other dtypes the max is used.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a `Tensor` of the same shape as the input.
name: (Optional) A name for this operation.
Returns:
A `Tensor` with the same type as `x`.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'min'):
return _min_and_max(x, reduce_instance_dims, name)[0]
@common.log_api_use(common.ANALYZER_COLLECTION)
def max( # pylint: disable=redefined-builtin
x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the maximum of the values of `x` over the whole dataset.
In the case of a `CompositeTensor` missing values will be used in return
value: for float, NaN is used and for other dtypes the min is used.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
Returns:
A `Tensor`. Has the same type as `x`.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'max'):
return _min_and_max(x, reduce_instance_dims, name)[1]
def _min_and_max(x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None) -> Tuple[tf.Tensor, tf.Tensor]:
"""Computes the min and max of the values of `x`.
In the case of a `CompositeTensor` missing values will be used in return
value:
for float, NaN is used and for other dtypes the min is used.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
Returns:
Two `Tensor`s. Both have the same type as `x`.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'min_and_max'):
output_dtype = x.dtype
if (not reduce_instance_dims and isinstance(x, tf.SparseTensor) and
x.dtype.is_floating):
combine_fn = np.nanmax
default_accumulator_value = (np.nan if x.dtype.is_floating else
-output_dtype.max)
elif not reduce_instance_dims and isinstance(x, tf.RaggedTensor):
raise NotImplementedError(
'Elementwise min_and_max does not support RaggedTensors.')
else:
combine_fn = np.max
default_accumulator_value = (-np.inf if x.dtype.is_floating else
-output_dtype.max)
x_batch_minus_min, x_batch_max = tf_utils.reduce_batch_minus_min_and_max(
x, reduce_instance_dims)
minus_x_min, x_max = _numeric_combine( # pylint: disable=unbalanced-tuple-unpacking
inputs=[x_batch_minus_min, x_batch_max],
fn=combine_fn,
default_accumulator_value=default_accumulator_value,
reduce_instance_dims=reduce_instance_dims)
return tf.cast(0 - minus_x_min, output_dtype), tf.cast(x_max, output_dtype)
def _min_and_max_per_key(
x: common_types.TensorType,
key: common_types.TensorType,
reduce_instance_dims: bool = True,
key_vocabulary_filename: Optional[str] = None,
name: Optional[str] = None
) -> Union[Tuple[tf.Tensor, tf.Tensor, tf.Tensor], tf.Tensor]:
"""Computes the min and max of the values of `x`.
In the case of a `CompositeTensor` missing values will be used in return
value: for float, NaN is used and for other dtypes the min is used.
This function operates under the assumption that the size of the key set
is small enough to fit in memory. Anything above a certain size larger is not
guaranteed to be handled properly, but support for larger key sets may be
available in a future version.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
key: A `Tensor`, `SparseTensor`, or `RaggedTensor` of dtype tf.string. If
`x` is a `CompositeTensor`, `key` must exactly match `x` in everything
except values.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input. The False
case is not currently supported for _min_and_max_per_key.
key_vocabulary_filename: (Optional) The file name for the key-output mapping
file. If None and key are provided, this combiner assumes the keys fit in
memory and will not store the result in a file. If empty string, a file
name will be chosen based on the current scope. If not an empty string,
should be unique within a given preprocessing function.
name: (Optional) A name for this operation.
Returns:
Either:
(A) Three `Tensor`s. The first is the key vocab of type tf.string, and the
second two have same type as `x` (if key_vocabulary_filename is None).
(B) The filename where the key-value mapping is stored (if
key_vocabulary_filename is not None).
Raises:
TypeError: If the type of `x` is not supported.
"""
if key is None:
raise ValueError('A key is required for _min_and_max_per_key')
if not reduce_instance_dims and isinstance(
x, (tf.SparseTensor, tf.RaggedTensor)):
raise NotImplementedError(
'Per-key elementwise reduction of Composite Tensors not supported ')
with tf.compat.v1.name_scope(name, 'min_and_max_per_key'):
output_dtype = x.dtype
if (not reduce_instance_dims and
isinstance(x,
(tf.SparseTensor, tf.RaggedTensor)) and x.dtype.is_floating):
combine_fn = np.nanmax
default_accumulator_value = (np.nan if x.dtype.is_floating else
-output_dtype.max)
else:
combine_fn = np.max
default_accumulator_value = (-np.inf if x.dtype.is_floating else
-output_dtype.max)
key_vocab, x_batch_minus_min, x_batch_max = (
tf_utils.reduce_batch_minus_min_and_max_per_key(x, key,
reduce_instance_dims))
key_values = _numeric_combine( # pylint: disable=unbalanced-tuple-unpacking
inputs=[x_batch_minus_min, x_batch_max],
fn=combine_fn,
default_accumulator_value=default_accumulator_value,
reduce_instance_dims=reduce_instance_dims,
key=key_vocab,
key_vocabulary_filename=key_vocabulary_filename)
if key_vocabulary_filename is not None:
return key_values # pytype: disable=bad-return-type # always-use-return-annotations
key, minus_x_min, x_max = key_values
return (
key,
tf.cast(0 - minus_x_min, output_dtype),
tf.cast(x_max, output_dtype))
def _sum_combine_fn_and_dtype(
input_dtype: tf.DType
) -> Tuple[tf.DType, Callable[[np.ndarray], np.ndarray]]:
output_dtype = _SUM_OUTPUT_DTYPE_MAP.get(input_dtype)
if output_dtype is None:
raise TypeError('Tensor type %r is not supported' % input_dtype)
return output_dtype, functools.partial(
np.sum, dtype=output_dtype.as_numpy_dtype)
@common.log_api_use(common.ANALYZER_COLLECTION)
def sum( # pylint: disable=redefined-builtin
x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the sum of the values of a `Tensor` over the whole dataset.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`. Its type must be floating
point (float{16|32|64}),integral (int{8|16|32|64}), or unsigned
integral (uint{8|16}).
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
Returns:
A `Tensor` containing the sum. If `x` is float32 or float64, the sum will
have the same type as `x`. If `x` is float16, the output is cast to float32.
If `x` is integral, the output is cast to [u]int64. If `x` is sparse and
reduce_inst_dims is False will return 0 in place where column has no values
across batches.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'sum'):
if reduce_instance_dims:
x = tf.reduce_sum(input_tensor=tf_utils.get_values(x))
elif isinstance(x, tf.SparseTensor):
if x.dtype == tf.uint8 or x.dtype == tf.uint16:
x = tf.cast(x, tf.int64)
elif x.dtype == tf.uint32 or x.dtype == tf.uint64:
raise TypeError('Data type %r is not supported' % x.dtype)
x = tf.sparse.reduce_sum(x, axis=0)
elif isinstance(x, tf.RaggedTensor):
raise NotImplementedError(
'Elementwise sum does not support RaggedTensors.')
else:
x = tf.reduce_sum(input_tensor=x, axis=0)
output_dtype, sum_fn = _sum_combine_fn_and_dtype(x.dtype)
return _numeric_combine(
inputs=[x],
fn=sum_fn,
default_accumulator_value=0,
reduce_instance_dims=reduce_instance_dims,
output_dtypes=[output_dtype])[0]
def remove_leftmost_boundary(boundaries: tf.Tensor) -> tf.Tensor:
"""Removes the leftmost boundary from [1, None]-shaped `Tensor` of buckets."""
return boundaries[:, 1:]
@common.log_api_use(common.ANALYZER_COLLECTION)
def histogram(x: common_types.TensorType,
boundaries: Optional[Union[tf.Tensor, int]] = None,
categorical: Optional[bool] = False,
name: Optional[str] = None) -> Tuple[tf.Tensor, tf.Tensor]:
"""Computes a histogram over x, given the bin boundaries or bin count.
Ex (1):
counts, boundaries = histogram([0, 1, 0, 1, 0, 3, 0, 1], range(5))
counts: [4, 3, 0, 1, 0]
boundaries: [0, 1, 2, 3, 4]
Ex (2):
Can be used to compute class weights.
counts, classes = histogram([0, 1, 0, 1, 0, 3, 0, 1], categorical=True)
probabilities = counts / tf.reduce_sum(counts)
class_weights = dict(map(lambda (a, b): (a.numpy(), 1.0 / b.numpy()),
zip(classes, probabilities)))
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
boundaries: (Optional) A `Tensor` or `int` used to build the histogram;
ignored if `categorical` is True. If possible, provide boundaries as
multiple sorted values. Default to 10 intervals over the 0-1 range, or
find the min/max if an int is provided (not recommended because
multi-phase analysis is inefficient).
categorical: (Optional) A `bool` that treats `x` as discrete values if true.
name: (Optional) A name for this operation.
Returns:
counts: The histogram, as counts per bin.
boundaries: A `Tensor` used to build the histogram representing boundaries.
"""
with tf.compat.v1.name_scope(name, 'histogram'):
x = tf.reshape(tf_utils.get_values(x), [-1])
if categorical:
x_dtype = x.dtype
x = x if x_dtype == tf.string else tf.strings.as_string(x)
elements, counts = count_per_key(x)
if x_dtype != elements.dtype:
elements = tf.strings.to_number(elements, tf.int64)
return counts, elements
if boundaries is None:
boundaries = tf.range(11, dtype=tf.float32) / 10.0
elif isinstance(boundaries, int) or (isinstance(boundaries, tf.Tensor) and
boundaries.get_shape().ndims == 0):
min_value, max_value = _min_and_max(x, True)
boundaries = tf.linspace(
tf.cast(min_value, tf.float32), tf.cast(max_value, tf.float32),
tf.cast(boundaries, tf.int64))
# Shift the boundaries slightly to account for floating point errors,
# and due to the fact that the rightmost boundary is essentially ignored.
boundaries = tf.expand_dims(tf.cast(boundaries, tf.float32), 0) - 0.0001
bucket_indices = tf_utils.assign_buckets(
tf.cast(x, tf.float32), remove_leftmost_boundary(boundaries))
bucket_vocab, counts = count_per_key(tf.strings.as_string(bucket_indices))
counts = tf_utils.reorder_histogram(bucket_vocab, counts,
tf.size(boundaries) - 1)
return counts, boundaries
@common.log_api_use(common.ANALYZER_COLLECTION)
def size(x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the total size of instances in a `Tensor` over the whole dataset.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
Returns:
A `Tensor` of type int64.
"""
with tf.compat.v1.name_scope(name, 'size'):
# Note: Calling `sum` defined in this module, not the builtin.
if isinstance(x, tf.SparseTensor):
ones_like_x = tf.SparseTensor(
indices=x.indices,
values=tf.ones_like(x.values, tf.int64),
dense_shape=x.dense_shape)
else:
ones_like_x = tf.ones_like(x, dtype=tf.int64)
return sum(ones_like_x, reduce_instance_dims)
@common.log_api_use(common.ANALYZER_COLLECTION)
def count_per_key(key: common_types.TensorType,
key_vocabulary_filename: Optional[str] = None,
name: Optional[str] = None):
"""Computes the count of each element of a `Tensor`.
Args:
key: A `Tensor`, `SparseTensor`, or `RaggedTensor` of dtype tf.string or
tf.int.
key_vocabulary_filename: (Optional) The file name for the key-output mapping
file. If None and key are provided, this combiner assumes the keys fit in
memory and will not store the result in a file. If empty string, a file
name will be chosen based on the current scope. If not an empty string,
should be unique within a given preprocessing function.
name: (Optional) A name for this operation.
Returns:
Either:
(A) Two `Tensor`s: one the key vocab with dtype of input;
the other the count for each key, dtype tf.int64. (if
key_vocabulary_filename is None).
(B) The filename where the key-value mapping is stored (if
key_vocabulary_filename is not None).
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'count_per_key'):
key_dtype = key.dtype
batch_keys, batch_counts = tf_utils.reduce_batch_count_per_key(key)
output_dtype, sum_fn = _sum_combine_fn_and_dtype(tf.int64)
numeric_combine_result = _numeric_combine(
inputs=[batch_counts],
fn=sum_fn,
default_accumulator_value=0,
reduce_instance_dims=True,
output_dtypes=[output_dtype],
key=batch_keys,
key_vocabulary_filename=key_vocabulary_filename)
if key_vocabulary_filename is not None:
return numeric_combine_result
keys, counts = numeric_combine_result
if key_dtype is not tf.string:
keys = tf.strings.to_number(keys, key_dtype)
return keys, counts
@common.log_api_use(common.ANALYZER_COLLECTION)
def mean(x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None,
output_dtype: Optional[tf.DType] = None) -> tf.Tensor:
"""Computes the mean of the values of a `Tensor` over the whole dataset.
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`. Its type must be floating
point (float{16|32|64}), or integral ([u]int{8|16|32|64}).
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
output_dtype: (Optional) If not None, casts the output tensor to this type.
Returns:
A `Tensor` containing the mean. If `x` is floating point, the mean will have
the same type as `x`. If `x` is integral, the output is cast to float32.
NaNs and infinite input values are ignored.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'mean'):
return _mean_and_var(x, reduce_instance_dims, output_dtype)[0]
@common.log_api_use(common.ANALYZER_COLLECTION)
def var(x: common_types.TensorType,
reduce_instance_dims: bool = True,
name: Optional[str] = None,
output_dtype: Optional[tf.DType] = None) -> tf.Tensor:
"""Computes the variance of the values of a `Tensor` over the whole dataset.
Uses the biased variance (0 delta degrees of freedom), as given by
(x - mean(x))**2 / length(x).
Args:
x: `Tensor`, `SparseTensor`, or `RaggedTensor`. Its type must be floating
point (float{16|32|64}), or integral ([u]int{8|16|32|64}).
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
name: (Optional) A name for this operation.
output_dtype: (Optional) If not None, casts the output tensor to this type.
Returns:
A `Tensor` containing the variance. If `x` is floating point, the variance
will have the same type as `x`. If `x` is integral, the output is cast to
float32. NaNs and infinite input values are ignored.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'var'):
return _mean_and_var(x, reduce_instance_dims, output_dtype)[1]
def _mean_and_var(x: common_types.TensorType,
reduce_instance_dims: bool = True,
output_dtype: Optional[tf.DType] = None):
"""More efficient combined `mean` and `var`. See `var`."""
if output_dtype is None:
output_dtype = _FLOAT_OUTPUT_DTYPE_MAP.get(x.dtype)
if output_dtype is None:
raise TypeError('Tensor type %r is not supported' % x.dtype)
if not reduce_instance_dims and isinstance(x, tf.RaggedTensor):
raise NotImplementedError(
'Elementwise mean_and_var does not support RaggedTensors.')
with tf.compat.v1.name_scope('mean_and_var'):
x = tf.cast(x, output_dtype)
x_count, x_mean, x_variance = (
tf_utils.reduce_batch_count_mean_and_var(x, reduce_instance_dims))
combine_inputs = _WeightedMeanAndVarAccumulator(
count=x_count,
mean=x_mean,
variance=x_variance,
weight=tf.zeros([], tf.float32))
output_shape = ()
if not reduce_instance_dims:
# We need to use tf.expand_dims to artificially add a batch dimension.
output_shape = _get_output_shape_from_input(
tf.expand_dims(x_count, axis=0))
x_mean, x_var = _apply_cacheable_combiner(
WeightedMeanAndVarCombiner(output_dtype.as_numpy_dtype, output_shape),
*combine_inputs)
return x_mean, x_var
@common.log_api_use(common.ANALYZER_COLLECTION)
def tukey_location(x: common_types.TensorType,
reduce_instance_dims: Optional[bool] = True,
output_dtype: Optional[tf.DType] = None,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the location of the values of a `Tensor` over the whole dataset.
This computes the location of x, assuming a Tukey HH distribution, i.e.
(x - tukey_location) / tukey_scale is a Tukey HH distribution with parameters
tukey_h_params. See the following publication for the definition of the Tukey
HH distribution:
Todd C. Headrick, and Mohan D. Pant. "Characterizing Tukey h and
hh-Distributions through L-Moments and the L-Correlation," ISRN Applied
Mathematics, vol. 2012, 2012. doi:10.5402/2012/980153
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`. Its type must be floating
point (float{16|32|64}), or integral ([u]int{8|16|32|64}).
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
output_dtype: (Optional) If not None, casts the output tensor to this type.
name: (Optional) A name for this operation.
Returns:
A `Tensor` containing the location. If `x` is floating point, the location
will have the same type as `x`. If `x` is integral, the output is cast to
float32.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'tukey_location'):
return _tukey_parameters(x, reduce_instance_dims, output_dtype)[0]
@common.log_api_use(common.ANALYZER_COLLECTION)
def tukey_scale(x: common_types.TensorType,
reduce_instance_dims: Optional[bool] = True,
output_dtype: Optional[tf.DType] = None,
name: Optional[str] = None) -> tf.Tensor:
"""Computes the scale of the values of a `Tensor` over the whole dataset.
This computes the scale of x, assuming a Tukey HH distribution, i.e.
(x - tukey_location) / tukey_scale is a Tukey HH distribution with parameters
tukey_h_params. See the following publication for the definition of the Tukey
HH distribution:
Todd C. Headrick, and Mohan D. Pant. "Characterizing Tukey h and
hh-Distributions through L-Moments and the L-Correlation," ISRN Applied
Mathematics, vol. 2012, 2012. doi:10.5402/2012/980153
Args:
x: A `Tensor`, `SparseTensor`, or `RaggedTensor`. Its type must be floating
point (float{16|32|64}), or integral ([u]int{8|16|32|64}).
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
output_dtype: (Optional) If not None, casts the output tensor to this type.
name: (Optional) A name for this operation.
Returns:
A `Tensor` containing the scale. If `x` is floating point, the location
will have the same type as `x`. If `x` is integral, the output is cast to
float32.
Raises:
TypeError: If the type of `x` is not supported.
"""
with tf.compat.v1.name_scope(name, 'tukey_scale'):
return _tukey_parameters(x, reduce_instance_dims, output_dtype)[1]
@common.log_api_use(common.ANALYZER_COLLECTION)
def tukey_h_params(x: common_types.TensorType,
reduce_instance_dims: bool = True,
output_dtype: Optional[tf.DType] = None,
name: Optional[str] = None) -> Tuple[tf.Tensor, tf.Tensor]:
"""Computes the h parameters of the values of a `Tensor` over the dataset.
This computes the parameters (hl, hr) of the samples, assuming a Tukey HH
distribution, i.e. (x - tukey_location) / tukey_scale is a Tukey HH