Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching is not Useful: {Kube,Air}flow Retries do not work and ExampleGen's cache isn't usefull #4226

Closed
vaskozl opened this issue Sep 1, 2021 · 7 comments

Comments

@vaskozl
Copy link

vaskozl commented Sep 1, 2021

Over the past year and a half I've been fighting with the various forms of caching provided by Airflow/Kubeflow and TFX.

As it stands, caching is useful only in a very particular scenario, and I think it' needs a big rethink. At present it doesn't make much sense to have caching enabled at all, and useful caching does not exist.

Issue 1: Retrying doesn't work

Airflow and Kubeflow both provide a facility for retrying and caching. Caching in Kubeflow seems to conflict with TFX caching (#3049) and seems to be better left disabled when using TFX.

As far as I can see "retrying" with the orchestrator is pretty much impossible, even with both orchestrator and TFX cache disabled, as the step after the step being retried will fail due to seeing 2 output artifacts instead of 1 (#2805)

The only way to make use of caching, and the only use case as far as I can see, is when you create a new pipeline run with the same components and arguments. This is a basically a un-intuitive "retry" and relies on the fact that your ExampleGen split arguments remain the same. This leads me to my second problem with caching.

Issue 2: ExampleGen are cached ineffectively

I use predominantly Query based ExampleGen, with queries as simple as [ SELECT * from my_data ]. If caching is enabled, no pipeline run ever pulls data, making runs do nothing to produce new models with new data.

This leaves you with two options if you want to use TFX to run you pipelines on a schedule:

  • Disable cache for the step (which obviously invalidates all further step cache). This compounds with my first problem, as now retrying becomes impossible. If your pipeline errors you have to wait for you massive query to write all the data to the tfrecords again on S3.
  • Bust cache by including something unique in the query, such as the present date, e.g. WHERE $date > 1970-01-01 and leave cache enabled. This basically lets you "retry" pipelines in case you want to rerun later steps (either due to failure or changes) by making a new pipeline and relying on TFX cache, but is annoying to implement as if a pipeline runs for longer than one day, then the cache is invalidated. In general this may help but isn't very useful either.

Issue 3: ExampleGen don't cache with new data, and do not lend themselves to fine tuning

If you want new data, there is basically no way to use cache as far as I can see. If you train with 100s of gigabytes, yuo'r:

  • Every run is ridiculously slow, incurring lots of traffic and writing massive artifacts every time, containing your whole dataset, even if only the latest 1% is different.
  • This makes fine-tuning and running pipelines on the schedule slow, expensive and the models less readily fine-tunable with new data.
  • You get billed a lot for object storage, as you are storing many almost identical copies of your dataset unless you manually clean them up.

How caching should work:

  • You make a queries with some sort of sortable key, such as the primary key or date.
  • Only new data is read from the database and stored into a new split.

This would make reruns actually quick and only incur the transform/training time. I'm not sure to what extent transform can be re-cached, but that should also be possible in cases where a full-pass is not required.

TLDR:

  • At present caching is ineffective and causes more problems than it helps
  • Datasets aren't cached when new data is desired, which makes everything slow and breaks "retries"
@1025KB
Copy link
Collaborator

1025KB commented Sep 3, 2021

Thanks for the advice!!

  1. TFX itself currently doesn't support retry, and outside retry might mess up TFX mlmd status (e.g., 2 output artifacts instead of 1, this is because downstream find two artifacts generated by retry in its input channel, but it only request one artifact), so I would suggest to turn off external retry currently.

  2. Currently QueryBasedExampleGen only check if query is the same or not, to support advanced logic, you need to custom driver to implement the fingerprint logic based on your database.

  3. Currently fingerprint is generated for the full input dataset based on file size and creation time for filebasedExampleGen, so if there are any change in the dataset, TFX will treat it as a new dataset.
    If data is incremental change, I wonder if span concept can be used, thus you don't need to reprocess the processed spans, here is an e2e example .

@vaskozl vaskozl changed the title Caching is not Useful: {Kube,Air}flow Retries do not work and ExampleGen's are cached usefully Caching is not Useful: {Kube,Air}flow Retries do not work and ExampleGen's cache isn't usefull Sep 3, 2021
@vaskozl
Copy link
Author

vaskozl commented Sep 3, 2021 via email

@EdwardCuiPeacock
Copy link
Contributor

TFX itself currently doesn't support retry, and outside retry might mess up TFX mlmd status (e.g., 2 output artifacts instead of 1, this is because downstream find two artifacts generated by retry in its input channel, but it only request one artifact), so I would suggest to turn off external retry currently.

Unless I am missing something, couldn't this be solved by looking at the newest artifact based on the timestamp?

We currently set up external retry with

    def pod_retry_on_error_failure():
        def _set_retry(container_op):
            # Argo retry: https://argoproj.github.io/argo-workflows/examples/#retrying-failed-or-errored-steps
            # default policy: OnFailure
            # May not work with tfx where `tfx.utils.get_only_uri_in_dir` function is used
            container_op.set_retry(
                num_retries=2,
            )

        return _set_retry

and append this function to the pipeline_operator_funcs argument of kubeflow_dag_runner.KubeflowDagRunnerConfig, which seems to work when the step failed. This at least serves as a safe guard for components failed due to network timeouts.

@singhniraj08 singhniraj08 self-assigned this May 4, 2023
@singhniraj08
Copy link
Contributor

@vaskozl,

You can refer Reading data from BigQuery with TFX and Vertex Pipelines for QueryBasedExampleGen example. Thank you!

@github-actions
Copy link

This issue has been marked stale because it has no recent activity since 7 days. It will be closed if no further activity occurs. Thank you.

@github-actions github-actions bot added the stale label May 12, 2023
@github-actions
Copy link

This issue was closed due to lack of activity after being marked stale for past 7 days.

@google-ml-butler
Copy link

Are you satisfied with the resolution of your issue?
Yes
No

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants