-
Notifications
You must be signed in to change notification settings - Fork 276
/
extractor.py
86 lines (74 loc) · 3.39 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extractor type."""
from typing import Any, Dict, Iterable, NamedTuple, Optional, Union
import apache_beam as beam
from tensorflow_model_analysis import types
from tensorflow_model_analysis.utils import util
# Tag for the last extractor in list of extractors.
LAST_EXTRACTOR_STAGE_NAME = '<last-extractor>'
# An Extractor is a PTransform that takes Extracts as input and returns Extracts
# as output. A typical example is a PredictExtractor that receives an 'input'
# placeholder for input and adds additional 'features', 'labels', and
# 'predictions' extracts.
Extractor = NamedTuple( # pylint: disable=invalid-name
'Extractor',
[
('stage_name', str),
# PTransform Extracts -> Extracts
('ptransform', beam.PTransform)
])
@beam.ptransform_fn
@beam.typehints.with_input_types(types.Extracts)
@beam.typehints.with_output_types(types.Extracts)
def Filter( # pylint: disable=invalid-name
extracts: beam.pvalue.PCollection,
include: Optional[Union[Iterable[str], Dict[str, Any]]] = None,
exclude: Optional[Union[Iterable[str],
Dict[str, Any]]] = None) -> beam.pvalue.PCollection:
"""Filters extracts to include/exclude specified keys.
Args:
extracts: PCollection of extracts.
include: List or map of keys to include in output. If a map of keys is
passed then the keys and sub-keys that exist in the map will be included
in the output. An empty dict behaves as a wildcard matching all keys or
the value itself. Since matching on feature values is not currently
supported, an empty dict must be used to represent the leaf nodes.
For example: {'key1': {'key1-subkey': {}}, 'key2': {}}.
exclude: List or map of keys to exclude from output. If a map of keys is
passed then the keys and sub-keys that exist in the map will be excluded
from the output. An empty dict behaves as a wildcard matching all keys or
the value itself. Since matching on feature values is not currently
supported, an empty dict must be used to represent the leaf nodes.
For example: {'key1': {'key1-subkey': {}}, 'key2': {}}.
Returns:
Filtered PCollection of Extracts.
Raises:
ValueError: If both include and exclude are used.
"""
if include and exclude:
raise ValueError('only one of include or exclude should be used.')
if not isinstance(include, dict):
include = {k: {} for k in include or []}
if not isinstance(exclude, dict):
exclude = {k: {} for k in exclude or []}
def filter_extracts(extracts: types.Extracts) -> types.Extracts: # pylint: disable=invalid-name
"""Filters extracts."""
if not include and not exclude:
return extracts
elif include:
return util.include_filter(include, extracts)
else:
return util.exclude_filter(exclude, extracts)
return extracts | beam.Map(filter_extracts)