-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a method to evaluate an AnalyzerDef node to a tensor without saving to the graph #129
Comments
For removing nodes from the Graph, please refer this article and this Github Link and confirm if it helps. Thanks. |
I couldn't get that tool to run against my graph. I get this error:
Regardless, I don't think that solves my problem, as I don't want to remove the node completely, I just don't want the result to be saved. It's not too great to have to run that tool between runs, either. |
I don't understand what you are trying to do. Can you provide some sample code? |
Sure.
and I want to find all unique pairs of c_key and w_key, and the value for each "zone" label. The result looks like this
The actual data are invoice lines, with various kinds of charges, that need to be grouped into single invoices. So c_key and w_key are basically primary keys for the invoice, and zone would be different categories of charges. def preprocessing_inputs_fn(inputs):
c_key = inputs["CKey"]
w_key = inputs["WKey"]
values = inputs["Values"]
zones = inputs["Zones"]
one_hot_zones = tdt.one_hot_encode(zones, 4)
multiplied_zones = tf.math.multiply(one_hot_zones, tf.linalg.matmul(tf.expand_dims(values, axis=1), tf.ones(shape=[1, 4])))
segment_sum, unique_values = tdt.group_by_sum([c_key, w_key], multiplied_zones, 4)
return {
'label': unique_values,
**tdt.create_dict_from_tensor(segment_sum, "sum")
} which calls these methods: def one_hot_encode(values, top_k):
x = tft.compute_and_apply_vocabulary(values, top_k=top_k)
one_hot_x = tf.one_hot(x, depth=top_k)
return one_hot_x
def create_dict_from_tensor(x, name, seperator="_"):
list_x = tf.unstack(x, axis=1)
return {name + seperator + str(i): list_x[i] for i in range(0, len(list_x))}
def group_by_sum(keys, values, width):
joined_keys = tf.strings.join(keys)
unique_values = tf.reshape(custom_tensorflow_analyzers.unique(joined_keys), [-1])
unique_values_len = tf.shape(unique_values)[0]
key_value_initializer = tf.lookup.KeyValueTensorInitializer(unique_values, tf.range(unique_values_len))
hash_table = tf.lookup.StaticHashTable(key_value_initializer, -1)
index = hash_table.lookup(joined_keys)
segment_sum = custom_tensorflow_analyzers.group_by_sum(values, index, width)
return segment_sum, unique_values The group_by_sum method calls two custom analyzers, because I needed a way to get unique values without computing a vocabulary, and I needed a # there's a quirk, not sure if this is TensflowTransform's fault or mine,
# but I was unable to get this to work by only returning one at a time
# I had to create a state-full method and return the whole list, and then
# wrap it in a list. Super weird, but it works
class UniqueMap(beam.DoFn):
def __init__(self):
self.test = []
def process(self, element, **kwargs):
self.test.append(element[1])
return [self.test]
def unique(data, name=None):
"""
Takes an input tensor data and returns a tensor containing all unique values in data
"""
with tf.name_scope(name, 'unique'):
vocab_ordering_type = tft.tf_utils.VocabOrderingType.FREQUENCY
(unique_inputs, none_weights, none_sum,
none_counts) = tft.tf_utils.reduce_batch_vocabulary(data, vocab_ordering_type)
assert none_sum is None
assert none_weights is None
assert none_counts is None
analyzer_inputs = [unique_inputs]
input_values_node = tft.analyzer_nodes.get_input_tensors_value_nodes(analyzer_inputs)
accumulate_output_value_node = tft.nodes.apply_operation(
tft.analyzer_nodes.VocabularyAccumulate, input_values_node,
vocab_ordering_type=vocab_ordering_type)
merge_output_value_node = tft.nodes.apply_operation(
tft.analyzer_nodes.VocabularyMerge, accumulate_output_value_node,
use_adjusted_mutual_info=False,
min_diff_from_avg=0.0,
vocab_ordering_type=vocab_ordering_type)
filtered_value_node = tft.nodes.apply_operation(
tft.analyzer_nodes.VocabularyOrderAndFilter,
merge_output_value_node,
coverage_top_k=None,
coverage_frequency_threshold=None,
key_fn=None,
top_k=None,
frequency_threshold=None)
outputs_value_nodes = tft.nodes.apply_operation(
custom_tensorflow_analyzer_nodes.ToAnalyzerDef,
filtered_value_node,
output_shape=[None, ],
do_fn=UniqueMap()
)
output = tft.analyzer_nodes.wrap_as_tensor(outputs_value_nodes)
return output
def group_by_sum(data, segment_ids, width, name=None):
"""
Analyzer that implements tf.math.unsorted_segment_sum in a tensorflow_transform safe way.
Takes a data tensor, and sums each row according to the key defined in segment_ids. width
should be tf.shape(data)[1] as an int.
"""
with tf.name_scope(name, "group_by_sum"):
segment_ids = tf.reshape(tf.cast(segment_ids, tf.dtypes.float32), [-1, 1])
inputs = tf.concat([segment_ids, data], 1)
input_values_node = tft.analyzer_nodes.get_input_tensors_value_nodes([inputs])
outputs_value_node = tft.nodes.apply_operation(
custom_tensorflow_analyzer_nodes.GroupBySum, input_values_node,
width=width)
output = tft.analyzer_nodes.wrap_as_tensor(outputs_value_node)
return output That calls some custom analyzer_nodes which are defined like this: class ToAnalyzerDef(collections.namedtuple('ToAnalyzerDef', ['output_shape', 'do_fn', 'label']),
tft.analyzer_nodes.AnalyzerDef):
def __new__(cls, output_shape, do_fn, label=None):
cls._output_shape = output_shape
if label is None:
scope = tf.get_default_graph().get_name_scope()
label = '{}[{}]'.format(cls.__name__, scope)
return super(ToAnalyzerDef, cls).__new__(
cls,
output_shape=output_shape,
do_fn=do_fn,
label=label)
@property
def output_tensor_infos(self):
return [tft.analyzer_nodes.TensorInfo(tf.string, self._output_shape, False)]
class GroupBySum(collections.namedtuple('GroupBySum', ['width', 'label']),
tft.analyzer_nodes.AnalyzerDef):
def __new__(cls, width, label=None):
cls._width = width
if label is None:
scope = tf.get_default_graph().get_name_scope()
label = '{}[{}]'.format(cls.__name__, scope)
return super(GroupBySum, cls).__new__(
cls,
width=cls._width,
label=label)
@property
def output_tensor_infos(self):
return [tft.analyzer_nodes.TensorInfo(tf.dtypes.float32, (None, self.width), False)] and implemented like @common.register_ptransform(custom_tensorflow_analyzer_nodes.ToAnalyzerDef)
class ToAnalyzerDefImpl(beam.PTransform):
def __init__(self, operation, extra_args):
self._do_fn = operation.do_fn
return
def expand(self, inputs):
pcoll, = inputs
return pcoll | beam.ParDo(self._do_fn)
@common.register_ptransform(custom_tensorflow_analyzer_nodes.GroupBySum)
class GroupBySumImpl(beam.PTransform):
def __init__(self, operation, extra_args):
self._width = operation.width
def expand(self, inputs):
pcoll, = inputs
return pcoll | beam.ParDo(GroupBySumMap(self._width))
class GroupBySumMap(beam.DoFn):
def __init__(self, width):
self.width = width
self.output_dict = {}
def compute_output(self):
output = []
for key in range(max(self.output_dict) + 1):
if key in self.output_dict:
output.append(self.output_dict[key])
else:
output.append(np.zeros([self.width]))
return [np.array(output, np.float32)]
def process(self, element, **kwargs):
array = element[0]
for row in array:
if int(row[0]) not in self.output_dict:
self.output_dict[int(row[0])] = row[1::]
else:
self.output_dict[int(row[0])] += row[1::]
return self.compute_output() This all works, except, when I save the graph and call it again like data_schema = tld.TransformMetadata("test_data/GroupBy.meta.json")
csv_loader = tld.CsvLoader("test_data/GroupByTest.csv", data_schema, batch_size=12,
skip_headers=True, working_dir="tmp",
transform_data_file="test_data",
transform_inputs_fn=preprocessing_inputs_fn)
test_fn = csv_loader.load_and_transform_csv()
csv_loader_test = tld.CsvLoader("test_data/GroupByTest-2.csv", data_schema, batch_size=12,
skip_headers=True, working_dir="tmp",
transform_data_file="test_data")
test_fn2 = csv_loader_test.load_and_transform_csv_with_saved_fn()
data2 = test_fn2()
data = test_fn() then both results are the exact same, presumably because the results of the call to my custom unique node, and group by node are being saved to the graph when I call I don't want to save the results of Let me know if you need any further clarification, or if I'm missing something. Thanks. |
Could you please confirm if this issue still persists.Thanks |
Currently the only way to get a tensor from any analyzer node is to call
tft.analyzer_nodes.wrap_as_tensor()
which will evaluate the tensor, but also will save the tensor to the graph, and during future runs tensorflow transform will use the saved value instead of re-evaluating it. That's great for situations where that value is something like mean or max and you want to use the same values when transforming live data, but there are use cases for analyzer nodes without that feature. For example, in my project I have to take in values, group by a key, and compute a sum over those keys. In normal tensorflow code I would dobut in tensorflow transform both those methods break, because they're dependent on seeing the entire dataset. Fine, then, I wrote my own tensorflow analyzers for unique (vocabulary won't work here, as keys is just a primary key; I don't want to save a vocabulary file) and unsorted_segment_sum, but now the values are saved to the graph and I have to delete the saved function afterwards, or any future runs will just use the previously computed values. I'm working around this for now by doing multiple passes through my dataset, one with this function, and others for everything else, but it would be really nice to be able to just signal to tensorflow transform that I don't want the value saved to the graph, and to recompute it during future runs.
The text was updated successfully, but these errors were encountered: