Skip to content

fix(sdk): allow upstream group to be resolved as dependency of task #11946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

stijntratsaertit
Copy link
Contributor

Description of your changes:

This pull request addresses a KeyError that occurs during pipeline compilation under specific circumstances involving nested conditional logic. The double if structure is needed to make sure the downstream_op can always resume after the if check, which it does not do with the usual one-if structure when it resolves to False.

It seems like the current implementation, introduced in the diff of tag 2.0.2 -> 2.0.3, does not consider a group to be an eligeble dependency. The concern with the fix I implemented is that it clashes with this comment left by @connor-mccarthy :

    # dependent tasks is tasks on which .after was called and can only be the names of PipelineTasks, not TasksGroups

I have prepared the fix which makes the added test pass, but I'm open to discussion, as I'm not close enough to the project to fully understand the possible restriction of the DAG dependecy tracking.

Failing test result

➜ python -m pytest sdk/python/kfp/compiler/compiler_test.py::TestCompilePipeline::test_pipeline_task_depending_on_nested_conditional_task
============================================================================== test session starts ==============================================================================
platform linux -- Python 3.12.3, pytest-8.3.5, pluggy-1.6.0
rootdir: /home/sky/projects/kfp
configfile: pytest.ini
collected 1 item                                                                                                                                                                

sdk/python/kfp/compiler/compiler_test.py F                                                                                                                                [100%]

=================================================================================== FAILURES ====================================================================================
__________________________________________________ TestCompilePipeline.test_pipeline_task_depending_on_nested_conditional_task __________________________________________________

self = <kfp.compiler.compiler_test.TestCompilePipeline testMethod=test_pipeline_task_depending_on_nested_conditional_task>

    def test_pipeline_task_depending_on_nested_conditional_task(self):
    
        @dsl.component
        def dummy_op(message: str):
            print(message)
    
>       @dsl.pipeline(name="test-nested-condition-pipeline")

sdk/python/kfp/compiler/compiler_test.py:977: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
sdk/python/kfp/dsl/pipeline_context.py:71: in pipeline
    return component_factory.create_graph_component_from_func(
sdk/python/kfp/dsl/component_factory.py:708: in create_graph_component_from_func
    return graph_component.GraphComponent(
sdk/python/kfp/dsl/graph_component.py:71: in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
sdk/python/kfp/compiler/pipeline_spec_builder.py:1929: in create_pipeline_spec
    dependencies = compiler_utils.get_dependencies(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pipeline = <kfp.dsl.pipeline_context.Pipeline object at 0x7f9bd6cfcda0>
task_name_to_parent_groups = {'dummy-op': ['6b98c00ae3484b8fb0c511d79f7c5247', 'condition-1', 'condition-2', 'dummy-op'], 'dummy-op-2': ['6b98c00ae3484b8fb0c511d79f7c5247', 'dummy-op-2']}
group_name_to_parent_groups = {'condition-1': ['6b98c00ae3484b8fb0c511d79f7c5247', 'condition-1'], 'condition-2': ['6b98c00ae3484b8fb0c511d79f7c5247', 'condition-1', 'condition-2']}
group_name_to_group = {'6b98c00ae3484b8fb0c511d79f7c5247': <kfp.dsl.tasks_group.TasksGroup object at 0x7f9bd6cfcf20>, 'condition-1': <kfp.dsl.tasks_group.If object at 0x7f9bd7d47140>, 'condition-2': <kfp.dsl.tasks_group.If object at 0x7f9bd6cfcfe0>}
condition_channels = defaultdict(<class 'set'>, {'dummy-op': {{{channel:task=;name=should_run_op;type=Boolean;}}}, 'dummy-op-2': set()})

    def get_dependencies(
        pipeline: pipeline_context.Pipeline,
        task_name_to_parent_groups: Mapping[str, List[str]],
        group_name_to_parent_groups: Mapping[str, List[str]],
        group_name_to_group: Mapping[str, tasks_group.TasksGroup],
        condition_channels: Dict[str, pipeline_channel.PipelineChannel],
    ) -> Mapping[str, List[GroupOrTaskType]]:
        """Gets dependent groups and tasks for all tasks and groups.
    
        Args:
            pipeline: The instantiated pipeline object.
            task_name_to_parent_groups: The dict of task name to list of parent
                groups.
            group_name_to_parent_groups: The dict of group name to list of
                parent groups.
            group_name_to_group: The dict of group name to group.
            condition_channels: The dict of task name to a set of pipeline
                channels referenced by its parent condition groups.
    
        Returns:
            A Mapping where key is group/task name, value is a list of dependent
            groups/tasks. The dependencies are calculated in the following way:
            if task2 depends on task1, and their ancestors are
            [root, G1, G2, task1] and [root, G1, G3, G4, task2], then G3 is
            dependent on G2. Basically dependency only exists in the first
            uncommon ancesters in their ancesters chain. Only sibling
            groups/tasks can have dependencies.
    
        Raises:
            RuntimeError: if a task depends on a task inside a condition or loop
                group.
        """
        dependencies = collections.defaultdict(set)
        for task in pipeline.tasks.values():
            upstream_task_names: Set[Union[pipeline_task.PipelineTask,
                                           tasks_group.TasksGroup]] = set()
            task_condition_inputs = list(condition_channels[task.name])
            all_channels = task.channel_inputs + task_condition_inputs
            upstream_task_names.update(
                {channel.task for channel in all_channels if channel.task})
            # dependent tasks is tasks on which .after was called and can only be the names of PipelineTasks, not TasksGroups
            upstream_task_names.update(
>               {pipeline.tasks[after_task] for after_task in task.dependent_tasks})
E           KeyError: 'condition-1'

sdk/python/kfp/compiler/compiler_utils.py:731: KeyError

Checklist:

@google-oss-prow google-oss-prow bot requested a review from DharmitD May 30, 2025 23:06
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link

Hi @stijntratsaertit. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

🚫 This command cannot be processed. Only organization members or owners can use the commands.

@stijntratsaertit stijntratsaertit force-pushed the allow-upstream-group-to-be-resolved-as-dependency-of-task branch from 80797c8 to 8d22b32 Compare May 30, 2025 23:09
Signed-off-by: Stijn Tratsaert <stijn.tratsaert.it@gmail.com>
@stijntratsaertit stijntratsaertit force-pushed the allow-upstream-group-to-be-resolved-as-dependency-of-task branch from 8d22b32 to 4fd6a01 Compare May 30, 2025 23:11
Signed-off-by: Stijn Tratsaert <stijn.tratsaert.it@gmail.com>
@stijntratsaertit stijntratsaertit force-pushed the allow-upstream-group-to-be-resolved-as-dependency-of-task branch from 4fd6a01 to 455f713 Compare May 30, 2025 23:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant