New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sdk] Task.after() on Condition OpsGroup vs ContainerOp #5422
Comments
/kind question |
@Tomcli Per my understanding:
Of course it should work with data-passing too, in the op-after-op case (the op-after-DAG cannot use it, for obvious reasons). That is --- unless condition's status is exposed as a pipelineparam, which would introduce additional interesting possibilities. A component which runs after the condition could check if the condition DAG was run or not. |
/assign @chensun |
/assign @neuromage |
This is unexpected behavior. I did my own experiment and got and found several compiler bugs: import kfp
from kfp import components
@components.create_component_from_func
def produce_str() -> str:
return "Hello world"
@components.create_component_from_func
def consume_str(message: str):
print(message)
def my_pipeline():
hello_str = produce_str().output
with kfp.dsl.Condition(hello_str == "Hello world"):
produce_task = produce_str()
consume_str(produce_task.output)
if __name__ == '__main__':
kfp.compiler.Compiler().compile(my_pipeline, '2021-04-09 Check contitionals.yaml') Error: Now trying the same with artifacts: from kfp.components import InputPath, OutputPath, create_component_from_func
@create_component_from_func
def produce_file(output_path: OutputPath()):
from pathlib import Path
Path(output_path).write_text(output_path)
@create_component_from_func
def consume_file(message_path: InputPath()):
from pathlib import Path
message = Path(message_path).read_text()
print(message)
def my_file_pipeline():
hello_str = produce_str().output
with kfp.dsl.Condition(hello_str == "Hello world"):
produce_file_task = produce_file()
consume_file(produce_file_task.output)
with kfp.dsl.Condition(hello_str == "Tails"):
produce_file2_task = produce_file()
consume_file(produce_file2_task.output)
if __name__ == '__main__':
import kfp
kfp.compiler.Compiler().compile(my_file_pipeline, '2021-04-09 Check conditionals.arifacts.yaml') The first part works fine, but the second consume task fails with To sum it up, I've found 3 issues:
P.S. I'm not sure using |
Thanks @Ark-kun. Good to know this is a bug in the KFP Argo compiler. We are able to work around on the KFP Tekton compiler because we decided not to put conditional tasks into a sub-dag to optimize it for Tekton. We want to make sure KFP Argo also compiles dependency on conditional task, so we are not diverging the runtime behavior with the same KFP DSL. |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it. |
We have few questions on how the condition is constructed in Argo. e.g. using the below condition example
https://github.com/kubeflow/pipelines/blob/master/samples/core/condition/condition.py
When we try to define a containerOp that run after dsl.condition or containerOp inside the condition block, it creates the same result when compiles to Argo. We are wondering is it the expected DSL behavior where the task dependencies should point to the Argo condition task in both cases?
In Argo dag:
Thanks.
related docs: https://docs.google.com/document/d/1QPWKoeiPFDcI1JWH-nMe7x_xIJekL11bhu7Fp3fNT24/edit#
The text was updated successfully, but these errors were encountered: