Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StopIteration while running Interactive Context on Transform #2756

Closed
RaphRzb opened this issue Nov 4, 2020 · 4 comments
Closed

StopIteration while running Interactive Context on Transform #2756

RaphRzb opened this issue Nov 4, 2020 · 4 comments

Comments

@RaphRzb
Copy link

RaphRzb commented Nov 4, 2020

Hi, first thanks for all the amazing work,

I'm currently writing a pipeline for SSD but I'm starting with image classification to get familiar with writing pipelines for image processing.

I'm loading images as byte data (open with 'rb') in a custom executor for FileBasedExampleGen, then pretty much following the taxi template, changing only the training and preprocessing files. Also tried with writing TFRecords first and then loading them.

I just finished the training block, haven't got farther yet.

When I run this with tfx==0.21.1 it works like a charm but when I run it with 0.24.1 I get this error when I try to do

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('preprocess.py')
)

context.run(transform)

I get this StopIteration:

---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<timed eval> in <module>

~/.local/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
     65       # __IPYTHON__ variable is set by IPython, see
     66       # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 67       return fn(*args, **kwargs)
     68     else:
     69       absl.logging.warning(

~/.local/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
    180         telemetry_utils.LABEL_TFX_RUNNER: runner_label,
    181     }):
--> 182       execution_id = launcher.launch().execution_id
    183 
    184     return execution_result.ExecutionResult(

~/.local/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
    200       absl.logging.info('Running executor for %s',
    201                         self._component_info.component_id)
--> 202       self._run_executor(execution_decision.execution_id,
    203                          execution_decision.input_dict,
    204                          execution_decision.output_dict,

~/.local/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
     65         executor_context)  # type: ignore
     66 
---> 67     executor.Do(input_dict, output_dict, exec_properties)

~/.local/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
    415       label_outputs[labels.CACHE_OUTPUT_PATH_LABEL] = cache_output
    416     status_file = 'status_file'  # Unused
--> 417     self.Transform(label_inputs, label_outputs, status_file)
    418     absl.logging.debug('Cleaning up temp path %s on executor success',
    419                        temp_path)

~/.local/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
    933     materialization_format = (
    934         transform_paths_file_formats[-1] if materialize_output_paths else None)
--> 935     self._RunBeamImpl(analyze_data_list, transform_data_list,
    936                       preprocessing_fn, input_dataset_metadata,
    937                       transform_output_path, raw_examples_data_format,

~/.local/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, compute_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count)
   1090                     os.path.join(output_cache_dir, span_cache_dir.key))
   1091 
-> 1092           (cache_output
   1093            | 'WriteCache' >> analyzer_cache.WriteAnalysisCacheToFS(
   1094                pipeline=pipeline,

~/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, pvalueish, _unused)
   1026 
   1027   def __ror__(self, pvalueish, _unused=None):
-> 1028     return self.transform.__ror__(pvalueish, self.label)
   1029 
   1030   def expand(self, pvalue):

~/.local/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
    563     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
    564     self.pipeline = p
--> 565     result = p.apply(self, pvalueish, label)
    566     if deferred:
    567       return result

~/.local/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    632       try:
    633         old_label, transform.label = transform.label, label
--> 634         return self.apply(transform, pvalueish)
    635       finally:
    636         transform.label = old_label

~/.local/lib/python3.8/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    640     # every child transform in a composite.
    641     if self._current_transform() is self._root_transform():
--> 642       alter_label_if_ipython(transform, pvalueish)
    643 
    644     full_label = '/'.join(

~/.local/lib/python3.8/site-packages/apache_beam/utils/interactive_utils.py in alter_label_if_ipython(transform, pvalueish)
     82     from IPython import get_ipython
     83     prompt = get_ipython().execution_count
---> 84     pipeline = _extract_pipeline_of_pvalueish(pvalueish)
     85     if (pipeline
     86         # We only alter for transforms to be applied to user-defined pipelines

~/.local/lib/python3.8/site-packages/apache_beam/utils/interactive_utils.py in _extract_pipeline_of_pvalueish(pvalueish)
     95     pvalue = pvalueish[0]
     96   elif isinstance(pvalueish, dict):
---> 97     pvalue = next(iter(pvalueish.values()))
     98   else:
     99     pvalue = pvalueish

StopIteration: 

Any idea why this happens?

@rmothukuru
Copy link
Contributor

@RaphRzb,
In order to expedite the trouble-shooting process, please provide a code snippet to reproduce the issue reported here. Thanks!

@RaphRzb
Copy link
Author

RaphRzb commented Nov 5, 2020

Of course. Here's a cats vs. dogs version that gives the same error (for simplicity: https://www.kaggle.com/c/dogs-vs-cats/data)

import tensorflow as tf
import tfx

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

from tfx.components.example_gen.base_example_gen_executor import BaseExampleGenExecutor
from tfx.components.example_gen import utils
from tfx.components import FileBasedExampleGen
from tfx.utils.dsl_utils import external_input
from tfx.proto import example_gen_pb2

from tfx.components import StatisticsGen
from tfx.components import SchemaGen
from tfx.components import ExampleValidator
from tfx.components import Transform
from tfx.components import Trainer
from tfx.components import ResolverNode
from tfx.components import Evaluator
from tfx.components import Pusher
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.proto import trainer_pb2
from tfx.proto import pusher_pb2
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing

import tensorflow_model_analysis as tfma


from typing import Any, Dict, Optional, Text, List
import absl

import apache_beam as beam
from apache_beam.io import fileio 

import glob
import numpy as np
import os

####################################################
_DATASET_DIRECTORY = 'train/'
_LABELS = ['cat', 'dog']
_preprocessing_file = "preprocess.py"
_HASH_BUCKETS_TRAIN = 3
_HASH_BUCKETS_EVAL = 1
_labels_to_int = {label:i for i,label in enumerate(_LABELS)}

################ MISC FUNCTIONS ######################
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _float_list_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))
def _bytes_list_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))
def _int64_list_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


context = InteractiveContext()

################ DATA INGESTION #######################
def convert_image_to_TFExample(image_path):
    
    label = image_path.split('/')[-1].split('.')[0]
    
    
    # Read the image:
    file=open(image_path,'rb')
    raw_file = file.read()
    file.close()
    
    
    # Create the Tensorflow Exmple data structure:
    example = tf.train.Example(features=tf.train.Features(feature={
        'image': _bytes_feature(raw_file),
        'label': _int64_feature(_labels_to_int[label]),
    }))
    
    return example

@beam.ptransform_fn
@beam.typehints.with_input_types(beam.Pipeline)
@beam.typehints.with_output_types(tf.train.Example)
def image_to_example(
    pipeline: beam.Pipeline,
    #input_dict: Dict[Text, List[tfx.types.Artifact]], #remove in tfx>=0.23
    exec_properties: Dict[Text, Any],
    split_pattern: Text) -> beam.pvalue.PCollection:

    input_base_uri = exec_properties[utils.INPUT_BASE_KEY] # in tfx>=0.23
    #input_base_uri = tfx.types.artifact_utils.get_single_uri(input_dict['input']) # in tfx<=0.22
    image_pattern = os.path.join(input_base_uri, split_pattern)
    absl.logging.info(
        'Processing input image data {}' 
        ' to tf.Example.'.format(image_pattern))

    image_files = tf.io.gfile.glob(image_pattern)
    if not image_files:
        raise RuntimeError(
            "Split pattern {} did not match any valid path." 
            "".format(image_pattern))

    p_collection = (
        pipeline
        | 'CreateFiles' >> beam.Create(image_files)
        | 'ConvertImagesToTFExample' >> beam.Map(convert_image_to_TFExample)
    )
    return p_collection
    

class ImageExampleGenExecutor(BaseExampleGenExecutor):
    
    def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
        return image_to_example
    
    
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=_HASH_BUCKETS_TRAIN),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=_HASH_BUCKETS_EVAL)
             ]))

input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='images', pattern='*.jpg'),
])

example_gen = FileBasedExampleGen(
    input_base = _DATASET_DIRECTORY, # for tfx>=0.23 
    #input=external_input(_DATASET_DIRECTORY), # for tfx<=0.22
    input_config=input_config,
    output_config=output,
    custom_executor_spec=tfx.components.base.executor_spec.ExecutorClassSpec(
        ImageExampleGenExecutor)
)

context.run(example_gen)

################# DATA VALIDATION ####################
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples']
)

context.run(statistics_gen)

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)

context.run(schema_gen)


example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

context.run(example_validator)

################ PREPROCESSING ########################
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath('preprocess.py')
)

context.run(transform)

With preprocess.py as:

import tensorflow as tf
import numpy as np
import tensorflow_transform as tft

def process_image(raw_image):
    raw_image = tf.reshape(raw_image, [-1])
    img = tf.map_fn(lambda img_x:tf.io.decode_jpeg(img_x,channels=3), raw_image, dtype=tf.uint8)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img,(300, 300))
    return tf.reshape(img,[-1, 300, 300, 3])

def preprocessing_fn(inputs):
    outputs = {}
    
    image_raw = inputs['image']
    
    outputs['image_xf'] = process_image(image_raw)
    outputs['label_xf'] = tf.reshape(tf.one_hot(inputs['label'],2),[-1,2])
    
    return outputs

(Credit to @hanneshapke and @drcat101 for a solid 90% of this code. Thanks guys your book is great)

@hanneshapke
Copy link

hanneshapke commented Nov 5, 2020

Hi @RaphRzb,

I think the stopIteration issue is related to an issue with the Transformer component. @zoyahav fixed the issue and pushed a fix to the tfx nightly build.

You can find an intermediate fix here by setting the disable_analyzer_cache attribute or switch to the tfx-nightly build as Zohar mentioned.

I hope that works for you.
Hannes

@RaphRzb
Copy link
Author

RaphRzb commented Nov 5, 2020

Thanks @hanneshapke, I used the intermediate fix and it works perfectly

Raphael

@RaphRzb RaphRzb closed this as completed Nov 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants