Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluator fails with ValueError in Airflow when caching is turned off and retries are allowed #2805

Open
codesue opened this issue Nov 11, 2020 · 21 comments
Assignees
Labels

Comments

@codesue
Copy link
Contributor

codesue commented Nov 11, 2020

When running a pipeline in Airflow, if enable_cache is set to False, but retries are allowed, a component can run more than once and generate more than one artifact. A downstream component that expects only one artifact will fail with a ValueError because it expects only one artifact.

{{base_task_runner.py:115}} INFO - Job 750: Subtask Evaluator     (len(input_dict[constants.MODEL_KEY])))
{{base_task_runner.py:115}} INFO - Job 750: Subtask Evaluator ValueError: There can be only one candidate model, there are 2.

As a workaround, I've been setting enable_cache to True to avoid unexpected behavior due to retries.

@casassg
Copy link
Member

casassg commented Nov 11, 2020

@charlesccychen @zhitaoli

Seems MLMD is reusing context and registering 2 output artifacts for the same component run instead of overwriting it on the 2nd run (at least that's my intuition)

We found the issue after we restarted execution on the Trainer without cache, and Evaluator was failing shortly after.

No rush or hotfix needed as we can workaround by using the cache, but would be nice to fix.

@1025KB
Copy link
Collaborator

1025KB commented Nov 12, 2020

Is retry a functionality provided by airflow? seems partial code is retried

@codesue
Copy link
Contributor Author

codesue commented Nov 12, 2020

Yes; if a component (operator) fails, Airflow can retry it: https://airflow.apache.org/docs/1.10.6/_api/airflow/operators/index.html?highlight=retry#package-contents

@1025KB
Copy link
Collaborator

1025KB commented Nov 12, 2020

“Seems MLMD is reusing context and registering 2 output artifacts for the same component run instead of overwriting it on the 2nd run” this is expected, MLMD won't do overwriting.

Is the first attempt success? Could you check the first artifact is PUBLISHED or PENDING?

@casassg
Copy link
Member

casassg commented Nov 13, 2020

To be clear, we are using TFX existing wrapper for Airflow, we dont have any custom code. This fails on current release 0.24.1.

The first attempt is succesful, but probably didnt finish. What happened for us was that:

  • Trainer started a first time
  • We restarted the cluster (including the workers) for a scheduled or adhoc release
  • Worker restart causes task to fail (even if code had not failed)
  • Airflow retries Trainer which registers another output artifact.

@codesue can you query MLMD? I can help as well :D

@1025KB
Copy link
Collaborator

1025KB commented Nov 13, 2020

My current guess is,

TFX uses MLMD to check the status, if previous execution failed before publishing results to MLMD (previous artifact is pending), retry shouldn't cause any issue

but if TFX already finished the execution of a certain component (everything is finalized in MLMD), and then retry rerun this component again, from TFX point of view it it doesn't know it's retry, thus the retry will be treated as a new execution of that component and causes issue.

@casassg
Copy link
Member

casassg commented Nov 23, 2020

Here's a similar issue we ran when an ExampleGen was retried by Airflow. Example artifacts pbtxt in MLMD:

id: 3495
type_id: 5
uri: "gs://some-bucket/BigQueryExampleGen/examples/3312"
custom_properties {
  key: "name"
  value {
    string_value: "examples"
  }
}
custom_properties {
  key: "producer_component"
  value {
    string_value: "BigQueryExampleGen"
  }
}
create_time_since_epoch: 1604792653508
last_update_time_since_epoch: 1604792653508

2nd artifact:

id: 3498
type_id: 5
uri: "gs:/some-bucket/BigQueryExampleGen/examples/3312"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "examples"
  }
}
custom_properties {
  key: "producer_component"
  value {
    string_value: "BigQueryExampleGen"
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
create_time_since_epoch: 1604795761297
last_update_time_since_epoch: 1604795948590

Error in downstream component (StatsGen):

Traceback (most recent call last):
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/models/taskinstance.py", line 922, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/shared/airflow_package/airflow.pex/.deps/apache_airflow-1.10.4+twtr23-py2.py3-none-any.whl/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/airflow/airflow_component.py", line 79, in _airflow_component_launcher
    launcher.launch()
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/launcher/base_component_launcher.py", line 205, in launch
    execution_decision.exec_properties)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/orchestration/launcher/in_process_component_launcher.py", line 67, in _run_executor
    executor.Do(input_dict, output_dict, exec_properties)
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/components/statistics_gen/executor.py", line 97, in Do
    examples = artifact_utils.get_single_instance(input_dict[EXAMPLES_KEY])
  File "/shared/airflow_package/airflow.pex/.deps/tfx-0.24.1-py3-none-any.whl/tfx/types/artifact_utils.py", line 67, in get_single_instance
    len(artifact_list)))
ValueError: expected list length of one but got 2

(There's no more artifacts with type_id = 5 under the same run context)

@rcrowe-google rcrowe-google added the Twitter Issues from the Twitter team label Dec 6, 2020
@rcrowe-google
Copy link
Contributor

b/173098817

@casassg
Copy link
Member

casassg commented Jan 19, 2021

Any updates on this?

My guess at this point is that on retry artifacts are not overwritten. Aka if I re-run a component with the same run_id and same pipeline_id, then output artifacts should be overwritten. Otherwise, we are going against our constraint to have single output artifact per component_id/run_id/pipeline_id

@casassg
Copy link
Member

casassg commented Feb 4, 2021

Steps to repro:

  • Create adhoc component that fails randomly (ex: assert random(0,1)==1) and gets restarted by Airflow to be completed)
  • Get a downstream component that has a data dependency from the previous component.
@component
def random_fail(model: OutputArtifact[Model]):
  assert random(0,1)==1
  with open(os.path.join(model.uri, 'model.txt')) as f:
    f.write('test')


@component
def downstream(model:InputArtifact[Model]):
  pass

c1 = random_fail()
c2 = downstream(c1.outputs['model'])

pipeline = Pipeline(components=[c1, c2], ...)

DAG = AirflowDagRunner(AirflowDagConfig({'retries': 3, 'schedule':None})).run(pipeline)

This example will fail on c2 if c1 has been retried randomly. There's probs less random ways to test this

@molejnik-mergebit
Copy link

Same issue here. Is there any ETA on this?

@nmelche
Copy link

nmelche commented Apr 13, 2021

Same issue here with tfx.orchestration.kubeflow.kubeflow_dag_runner.KubeflowDagRunner

@casassg
Copy link
Member

casassg commented Apr 13, 2021

Out of curiosity @nmelche how do you add retries on KubeflowDagRunner? Using https://github.com/kubeflow/pipelines/blob/master/samples/core/retry/retry.py?

@molejnik-mergebit
Copy link

Could the priority on this issue be raised? It is very annoying on bigger pipelines (2k nodes) to have to re-run entire pipeline if any of the nodes fails, even with the cache enabled. Is there a workaround to just re-run single (failed) node? Like manually deleting the artifact or altering the MLMD?

@YannisPap
Copy link

YannisPap commented May 12, 2021

Our evaluator also fails in GCP AI pipelines with caching = True.

I provide you with a screenshot, the evaluator logs and the requirements.txt (for the versions of used libraries)

I would appreciate even a workaround since it affects our production systems.

Let me know if you need any further details.
DeepinScreenshot_select-area_20210512091154
evaluator_logs.txt
requirements.txt

@muhammadtehseenSystemsltd

I'm facing the same issue in Kubeflow, in the CSVExample gen component. Caching is on.

@manalelidrissi
Copy link

Any updates on this issue ?

@molejnik-mergebit
Copy link

Can we have at least an estimation when this issue could be addressed?

@casassg
Copy link
Member

casassg commented Jul 27, 2021

CC @rcrowe-google

@zhitaoli
Copy link
Contributor

I tried to create a reproduction of Airflow retry and similar code pasted as @casassg but with a bit more determinism:

https://gist.github.com/zhitaoli/b2d92f8ad04d98d99974513563149d33

I was able to reproduce the error for once, but after upgrading to tfx 1.0.0 the issue was shadowed by the following error stack:

https://gist.github.com/zhitaoli/7dbaaa42abd8aa78cb54d52a266cd0ee

I'll dig a bit more with @hughmiao to see whether this is fixable.

Original error might be fixable with #4093 but without fixing above I cannot promise yet.

@molejnik-mergebit
Copy link

What about this issue? Can we have some update? Is there a workaround for this?

@zhitaoli zhitaoli removed their assignment Oct 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests