Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error in graph analyzer when using universal sentence encoder from tf_hub as per tutorial #160

Closed
jusjosgra opened this issue Feb 18, 2020 · 16 comments

Comments

@jusjosgra
Copy link

jusjosgra commented Feb 18, 2020

I am getting an error running the following code in direct runner
code:

import tensorflow as tf
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tensorflow_transform.coders as tft_coders
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile

model = None

def embed_text(text):
    import tensorflow_hub as hub
    global model
    if model is None:
        model = hub.load(
            'https://tfhub.dev/google/universal-sentence-encoder/4')
    embedding = model(text)
    return embedding


def get_metadata():
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.tf_metadata import dataset_metadata

    metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
        'id': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation()),
        'text': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation())
    }))
    return metadata


def preprocess_fn(input_features):

    text_integerized = embed_text(input_features['text'])
    output_features = {
        'id': input_features['id'],
        'embedding': text_integerized
    }
    return output_features


def run(pipeline_options, known_args):
    argv = None  # if None, uses sys.argv
    pipeline_options = PipelineOptions(argv)

    pipeline = beam.Pipeline(options=pipeline_options)
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        articles = (
                pipeline
                | beam.Create([
            {'id':'01','text':'To be, or not to be: that is the question: '},
            {'id':'02','text':"Whether 'tis nobler in the mind to suffer "},
            {'id':'03','text':'The slings and arrows of outrageous fortune, '},
            {'id':'04','text':'Or to take arms against a sea of troubles, '},
        ]))

        articles_dataset = (articles, get_metadata())

        transformed_dataset, transform_fn = (
                articles_dataset
                | 'Extract embeddings' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
        )

        transformed_data, transformed_metadata = transformed_dataset

        _ = (
            transformed_data | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix='{0}'.format(known_args.output_dir),
            file_name_suffix='.tfrecords',
            coder=tft_coders.example_proto_coder.ExampleProtoCoder(
                transformed_metadata.schema),
            num_shards=1
            )
            )
    result = pipeline.run()
    result.wait_until_finished()

called with:

import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from etl import pipeline_local_minimal as pipeline


def get_args(argv):

  parser = argparse.ArgumentParser()

  parser.add_argument('--output_dir',
                      help='A directory location of output embeddings')

  parser.add_argument('--enable_debug',
                      action='store_true',
                      help='Enable debug options.')

  parser.add_argument('--debug_output_prefix',
                      help='Specify prefix of debug output.')

  parser.add_argument('--transform_temp_dir',
                      default='tft_temp',
                      help='A temp directory used by tf.transform.')

  parser.add_argument('--transform_export_dir',
                      default='tft_out',
                      help='A directory where tft function is saved')

  known_args, pipeline_args = parser.parse_known_args(argv)
  return known_args, pipeline_args


def main(argv=None):
  known_args, pipeline_args = get_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  setup_options = pipeline_options.view_as(SetupOptions)
  setup_options.save_main_session = True
  pipeline.run(pipeline_options, known_args)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.ERROR)
  main()

and

# Configurable parameters
ROOT_DIR="./data"

# Datastore parameters
KIND="wikipedia"

# Directory for output data files
OUTPUT_PREFIX="${ROOT_DIR}/${KIND}/embeddings/embed"

# Working directories for Dataflow
DF_JOB_DIR="${ROOT_DIR}/${KIND}/dataflow"
STAGING_LOCATION="${DF_JOB_DIR}/staging"
TEMP_LOCATION="${DF_JOB_DIR}/temp"

# Working directories for tf.transform
TRANSFORM_ROOT_DIR="${DF_JOB_DIR}/transform"
TRANSFORM_TEMP_DIR="${TRANSFORM_ROOT_DIR}/temp"
TRANSFORM_EXPORT_DIR="${TRANSFORM_ROOT_DIR}/export"

# Working directories for Debug log
DEBUG_OUTPUT_PREFIX="${DF_JOB_DIR}/debug/log"

# Running Config for Dataflow
RUNNER=DirectRunner

# Cleaning working and oputput directories before running the Dataflow job
#echo "Cleaning working and output directories..."
#rm -r "${DF_JOB_DIR}"
#rm -r "${OUTPUT_PREFIX}"

echo "Running the Dataflow job..."

# Command to run the Dataflow job
python run_local.py \
  --output_dir="${OUTPUT_PREFIX}" \
  --transform_temp_dir="${TRANSFORM_TEMP_DIR}" \
  --transform_export_dir="${TRANSFORM_EXPORT_DIR}" \
  --runner="${RUNNER}" \
  --kind="${KIND}" \
  --staging_location="${STAGING_LOCATION}" \
  --temp_location="${TEMP_LOCATION}" \
  --setup_file=$(pwd)/setup.py \
  --enable_debug \
  --debug_output_prefix="${DEBUG_OUTPUT_PREFIX}"

echo "Dataflow job submitted successfully!"

error:

...
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 655, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in process
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 221, in acquire
    return _shared_map.acquire(self._key, constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 184, in acquire
    result = control_block.acquire(constructor_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tfx_bsl/beam/shared.py", line 87, in acquire
    result = constructor_fn()
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 338, in <lambda>
    lambda: self._make_graph_state(saved_model_dir))
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 314, in _make_graph_state
    self._exclude_outputs, self._tf_config)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/beam/impl.py", line 232, in __init__
    tensor_inputs = graph_tools.get_dependent_inputs(graph, inputs, fetches)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 690, in get_dependent_inputs
    sink_tensors_ready)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 503, in __init__
    table_init_op, graph_analyzer_for_table_init, translate_path_fn)
  File "/Users/justingrace/.pyenv/versions/hlx36/lib/python3.6/site-packages/tensorflow_transform/graph_tools.py", line 564, in _get_table_init_op_source_info
    if table_init_op.type not in _TABLE_INIT_OP_TYPES:
AttributeError: 'Tensor' object has no attribute 'type'

It looks like the graph analyzer is expecting a list of ops with a type attribute but is being passed a tensor instead. It is unclear to me what is going wrong here. Any help would be greatly appreciated!

@luischinchillagarcia
Copy link

I'd like to add that any tf.keras layer also doesn't work within the preprocess_fn. For example, hub.KerasLayer(module_url), or ever tf.keras.layers.Embedding would not work since it throws errors regarding tf.Sessions.

I tried using a tf.function decorator but it also would not work and throw errors regarding missing graphs.

@rmothukuru rmothukuru self-assigned this Feb 19, 2020
@rmothukuru
Copy link

rmothukuru commented Feb 19, 2020

@jusjosgra,
As per my understanding, we need to run the above code using Command Line. Can you please provide command used to run the above code, so that we can reproduce the error at our end. Thanks!

@jusjosgra
Copy link
Author

@rmothukuru thanks for updating the status of this - could you explain what status: awaiting tensorflower means please?

@rmothukuru
Copy link

@jusjosgra,
The status, "Awaiting Tensorflower" means that we are waiting for the response of one of the Experts of Tensorflow Transform.

@jusjosgra
Copy link
Author

ah, its tensorflow-er. I thought it was tensor-flower, like a module for distributed growth or something. I saw there are a lot of issues awaiting tensorflowers, how many people are supporting the transform project? We are hoping to include it in production workflows but it seems a bit too unstable atm. I realise its pre 1.0.

@Dirzys
Copy link

Dirzys commented Mar 12, 2020

We're having exact same issue, both locally and in Dataflow.

@zoyahav zoyahav assigned zoyahav and unassigned zoyahav May 2, 2020
@zoyahav zoyahav assigned varshaan and unassigned zoyahav Jun 26, 2020
@daehwan7
Copy link

We're having exact same issue, both locally and in Dataflow, too

@varshaan
Copy link
Contributor

Could you let me know what version of Transform/TFX you are using? A recent commit should have fixed this. The commit is in the Transform 0.23 release and should be in the TFX 0.23 release.

Thanks!

@daehwan7
Copy link

@varshaan , I try again after installing TFX 0.23..but It occurs new error below:

Tensor EncoderDNN/CNN_layers/LayerNorm/beta is not found in b'gs://bts_pan//transform_temp_dir/tftransform_tmp/906257a511934b83a52c74996b94ba03/variables/variables' checkpoint {'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_9': [30, 512], 'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_7': [30, 512],....

I currently use "https://tfhub.dev/google/universal-sentence-encoder-multilingual/3" model...
please check this error...as soon as possible. thanks in adavance.

@varshaan
Copy link
Contributor

Could you please post a snippet of your preprocessing_fn and the code where this error is being raised? Is it being raised in Transform or during Training?

@daehwan7
Copy link

daehwan7 commented Aug 24, 2020

@varshaan !
my code is very similar to justin's .
my code is shared for you below

preprocessing_fn

encoder = None

def preprocess_fn(input_features):
    import tensorflow_transform as tft
    embedding = embed_text(input_features['data']) 
    output_features = {
    'id': input_features['id'],
    'logkey': input_features['logkey'],
    'data': input_features['data'],
    'embedding': embedding
    }
    return output_features

def embed_text(text):
    import numpy as np
    import tensorflow_hub as hub
    import tensorflow_text
    global encoder
    use_url = "https://tfhub.dev/google/universal-sentence-encoder-multilingual/3"
    if encoder is None:
        encoder = hub.load(use_url)
    outputs = encoder(text)
    
    return outputs

pipeline

transform_temp_dir = DEST_DIR + '/transform_temp_dir'

with tft_beam.Context(transform_temp_dir):
    pipeline = beam.Pipeline(runner, options=opts)   
    
    raw_data =(
        pipeline
        | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(project='project-m', query=sql, use_standard_sql=True,validate=True,
        flatten_results=False)
    )
    
    dataset  = (raw_data, get_metadata())
  
    result1 = (
        raw_data
        |  'Write raw data to gcs' >> beam.io.WriteToText(DEST_DIR + job_name + '/raw_output'+ '/output',file_name_suffix='.txt')
    )
    
    transformed_dataset, _ = (
        dataset
        | 'Embedding data' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)  
    )

error
ValueError Traceback (most recent call last)
/opt/conda/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/lib/python3.7/site-packages/apache_beam/transforms/core.py in (x)
1510 else:
-> 1511 wrapper = lambda x: [fn(x)]
1512

/opt/conda/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in _infer_metadata_from_saved_model(saved_model_dir)
692 saved_transform_io.partially_apply_saved_transform_internal(
--> 693 saved_model_dir, {}))
694

/opt/conda/lib/python3.7/site-packages/tensorflow_transform/saved/saved_transform_io.py in partially_apply_saved_transform_internal(saved_model_dir, logical_input_map, tensor_replacement_map)
438 unbound_inputs, outputs = _partially_apply_saved_transform_impl(
--> 439 saved_model_dir, logical_input_map, tensor_replacement_map)
440 return unbound_inputs, outputs

/opt/conda/lib/python3.7/site-packages/tensorflow_transform/saved/saved_transform_io.py in _partially_apply_saved_transform_impl(saved_model_dir, logical_input_map, tensor_replacement_map)
336 if var_map:
--> 337 tf.compat.v1.train.init_from_checkpoint(checkpoint_path, var_map)
338

/opt/conda/lib/python3.7/site-packages/tensorflow/python/training/checkpoint_utils.py in init_from_checkpoint(ckpt_dir_or_file, assignment_map)
291 distribution_strategy_context.get_replica_context().merge_call(
--> 292 init_from_checkpoint_fn)
293

/opt/conda/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py in merge_call(self, merge_fn, args, kwargs)
2714 merge_fn, autograph_ctx.control_status_ctx(), convert_by_default=False)
-> 2715 return self._merge_call(merge_fn, args, kwargs)
2716

/opt/conda/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py in _merge_call(self, merge_fn, args, kwargs)
2721 try:
-> 2722 return merge_fn(self._strategy, *args, **kwargs)
2723 finally:

/opt/conda/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py in wrapper(*args, **kwargs)
274 with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.UNSPECIFIED):
--> 275 return func(*args, **kwargs)
276

/opt/conda/lib/python3.7/site-packages/tensorflow/python/training/checkpoint_utils.py in (_)
286 init_from_checkpoint_fn = lambda _: _init_from_checkpoint(
--> 287 ckpt_dir_or_file, assignment_map)
288 if distribution_strategy_context.get_cross_replica_context():

/opt/conda/lib/python3.7/site-packages/tensorflow/python/training/checkpoint_utils.py in _init_from_checkpoint(ckpt_dir_or_file, assignment_map)
319 raise ValueError("Tensor %s is not found in %s checkpoint %s" % (
--> 320 tensor_name_in_ckpt, ckpt_dir_or_file, variable_map
321 ))

ValueError: Tensor EncoderDNN/CNN_layers/LayerNorm/beta is not found in b'gs://bts_pan//transform_temp_dir/tftransform_tmp/359590ecb90e4a188b379704e7852ad2/variables/variables' checkpoint {'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_9': [30, 512], 'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_7': [30, 512], 'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_6': [30, 512], 'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_4': [30, 512], 'EncoderDNN/DNN/ResidualHidden_3/dense/kernel/part_3': [30, 512],

**

full sample code below

from time import time

import tensorflow as tf
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tensorflow_transform.coders as tft_coders
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile

model = None

def embed_text(text):
    import tensorflow_hub as hub
    import tensorflow_text
    global model
    if model is None:
        model = hub.load(
            'https://tfhub.dev/google/universal-sentence-encoder-multilingual/3')
    embedding = model(text)
    return embedding


def get_metadata():
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.tf_metadata import dataset_metadata

    metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
        'id': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation()),
        'text': dataset_schema.ColumnSchema(
            tf.string, [], dataset_schema.FixedColumnRepresentation())
    }))
    return metadata


def preprocess_fn(input_features):

    text_integerized = embed_text(input_features['text'])
    output_features = {
        'id': input_features['id'],
        'embedding': text_integerized
    }
    return output_features


def run(runner):
    pipeline_options = beam.pipeline.PipelineOptions(None)
    DEST_DIR = "gs://daehwan/"
    job_name = 'dataflow-use-multilingual-{}'.format(str(time())[:10])
    options = {
        'runner': runner,
    #     'num_workers' : 10,
    #     'machine_type' : 'n1-highmem-16',
        'staging_location': DEST_DIR + 'staging',
        'temp_location': DEST_DIR + 'tmp',
        'job_name': job_name,
        'project': 'project',
        'region' : 'us-central1',
    #     'teardown_policy': 'TEARDOWN_ALWAYS',
    #     'no_save_main_session': True ,  
        'save_main_session': False,
        'service_account_email' : 'id@project.iam.gserviceaccount.com',
        'setup_file' : './setup.py'

    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    
    pipeline = beam.Pipeline(runner, options=opts)
    transform_temp_dir = DEST_DIR + '/transform_temp_dir'
    with tft_beam.Context(transform_temp_dir):
        articles = (
                pipeline
                | beam.Create([
            {'id':'01','text':'To be, or not to be: that is the question: '},
            {'id':'02','text':"Whether 'tis nobler in the mind to suffer "},
            {'id':'03','text':'The slings and arrows of outrageous fortune, '},
            {'id':'04','text':'Or to take arms against a sea of troubles, '},
        ]))

        articles_dataset = (articles, get_metadata())

        transformed_dataset, transform_fn = (
                articles_dataset
                | 'Extract embeddings' >> tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
        )

        transformed_data, transformed_metadata = transformed_dataset

#         _ = (
#             transformed_data | 'Write embeddings to TFRecords' >> beam.io.tfrecordio.WriteToTFRecord(
#             file_path_prefix='{0}'.format(known_args.output_dir),
#             file_name_suffix='.tfrecords',
#             coder=tft_coders.example_proto_coder.ExampleProtoCoder(
#                 transformed_metadata.schema),
#             num_shards=1
#             )
#             )
    result = pipeline.run()
    result.wait_until_finished()


runner = 'DirectRunner'
# runner = 'DataflowRunner
run(runner)

@varshaan
Copy link
Contributor

Both hub modules mentioned in this issue are TF 2 hub modules. There is a new (still experimental) parameter in tft_beam.Context: force_tf_compat_v1. Setting this to False, will trace the preprocessing fn using TF 2. I tested these hub modules work post commit 25170b6 with force_tf_compat_v1 as False. Please re-open if you see any issues with this path.

@hyeonsuukang
Copy link

Using the very similar code here, setting force_tf_compat_v1=False did solve the Tensor Not Found error (I'm using the USE v.5 which is a TF2 hub module) but now there's an assertion error like below. Any insights on this?

AssertionError: Tried to export a function which references untracked object Tensor("7946:0", shape=(), dtype=resource).TensorFlow objects (e.g. tf.Variable) captured by functions must be tracked by assigning them to an attribute of a tracked object or assigned to an attribute of the main object directly. [while running 'Extract embeddings/AnalyzeDataset/CreateSavedModel/CreateSavedModel']

@varshaan
Copy link
Contributor

I only see a V4 here: https://tfhub.dev/google/universal-sentence-encoder/4, Could you give me a link to your hub module?

Also, could you please give me details as to what version of Transform you are using? The fix to this issue is not yet in a release.
Thanks!

@hyeonsuukang
Copy link

Sure! The transformer based USE V.5 is available here: https://tfhub.dev/google/universal-sentence-encoder-large/5

I'm using tensorflow-transform version 0.24.1
tensorflow version 2.3.1 and
tensorflow-hu version 0.10.0

@varshaan
Copy link
Contributor

I tested that hub module and it works at the Github master branch. As mentioned in my previous comment, the commit is not in a release yet. If possible you could try using transform from master. Alternately, you can wait and try it with 0.25. If you still face an error, please open a new issue as this one has currently been marked closed. Thanks!_

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants