Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested ParallelFor: can't resolve parameter from outer loop operation #2829

Closed
kierenj opened this issue Jan 12, 2020 · 18 comments · Fixed by #3029
Closed

Nested ParallelFor: can't resolve parameter from outer loop operation #2829

kierenj opened this issue Jan 12, 2020 · 18 comments · Fixed by #3029
Assignees
Labels
area/sdk/dsl/compiler area/sdk lifecycle/stale The issue / pull request is stale, any activities remove this label. status/triaged Whether the issue has been explicitly triaged

Comments

@kierenj
Copy link

kierenj commented Jan 12, 2020

What happened:
I'm creating a pipeline which runs lots of training jobs in parallel. Parameters are passed as arrays, and all combinations of parameters are used to run.

First, new data into the system is validated (first step). Once per pipeline run!
If one parameter changes, a different data set as a whole needs to be generated (this is the second step).
For all combinations of two other parameters, the data set needs to be transformed and converted into a different format (third step).
Then, all other parameters can use one of the exports generated.

The problem I think is that the output of export operations (from an outer ParallelFor loop) is being accessed from the inner operations (another ParallelFor loop within it).

Any advice greatly appreciated, thank you.

Error message:

invalid spec: templates.for-loop-for-loop-99fab5de-1.tasks.for-loop-for-loop-fea34ed3-2 failed to resolve {{inputs.parameters.create-dataset-dataset_id}}

Trimmed (pseudo-) code, which builds/uploads fine, but does not create any run successfully:

@dsl.pipeline(
    name='Hyperparameter Tuning',
    description='Hyperparameter Tuning'
)
def ht_op(
    someparam,
    dataset_label,
    a=[5.0],
    b=[30.0],
    epochs=20,
    c=[True],
    layers=[[{"lstm_nodes":128,"dropout":0.2},{"lstm_nodes":128,"dropout":0.2}]],
    learning_rate = [1e-3],
    decay = [1e-5]
    ):

    validate = validate_op()
    
    with dsl.ParallelFor(a) as _a:
        
        # one dataset per value of a
        new_dataset = create_dataset_op(
            label=("%s %ss") % (dataset_label, _a),
            a=_a,
            someparam=someparam
        )
        new_dataset.after(validate)
        
        # we'll need a new export for each (b,c) pair
        with dsl.ParallelFor(b) as _b:
            with dsl.ParallelFor(c) as _c:
                export_dataset = export_op(
                    dataset_id=new_dataset.output, # here's the problem?
                    dataset_label=dataset_label,
                    b=_b,
                    c=_c)

                with dsl.ParallelFor(layers) as _layers:
                    with dsl.ParallelFor(learning_rate) as _learning_rate:
                        with dsl.ParallelFor(decay) as _decay:
                            make_model = bb_make_model_op(
                                label=("%s %ss hptuning") % (dataset_label, _a),
                                dataset_id=new_dataset.output, # used again here
                                training_data_url=export_dataset.outputs['training_data_uri'],
                                validation_data_url=export_dataset.outputs['validation_data_uri'],
                                test_data_url=export_dataset.outputs['test_data_uri'],
                                layers=_layers,
                                learning_rate=_learning_rate,
                                decay=_decay,
                                version=export_dataset.outputs['version']
                            )

                            make_model.container.add_resource_request('cpu', cpu_request)
                            make_model.container.add_resource_request('memory', memory_request)
                            make_model.container.add_resource_limit('cpu', cpu_limit)
                            make_model.container.add_resource_limit('memory', memory_limit)

    
# compile pipeline now
pipeline_func = ht_op
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)
client.upload_pipeline(pipeline_package_path=pipeline_filename,pipeline_name='Hyperparameter Tuning')

What did you expect to happen:
Run to be created, graph to be displayed.

What steps did you take:
Ran the above code, created a run.

Other
This nested sample:

https://github.com/kubeflow/pipelines/blob/master/sdk/python/tests/compiler/testdata/withitem_nested.py#L42

...looks to work, but it doesn't use output from the outer loop. Should op11 here be able to take an input from an output of op1?

@numerology
Copy link

/assign @numerology
/assign @Ark-kun

@kierenj
Copy link
Author

kierenj commented Jan 18, 2020

@numerology @Ark-kun I wonder, might there be some workaround I could use here? If I don't nest the loops, can I store a reference to the operations from one loop, and access them from within another, top-level loop?

@rmgogogo rmgogogo added status/triaged Whether the issue has been explicitly triaged area/sdk labels Jan 22, 2020
@Ark-kun
Copy link
Contributor

Ark-kun commented Jan 30, 2020

We'll investigate this issue soon.

@giladwa1
Copy link

I'm still having the same problem. Is this fixed?

@narner90
Copy link

I'm also running into this issue

@Bobgy
Copy link
Contributor

Bobgy commented Sep 23, 2020

/reopen
/assign @Ark-kun

@k8s-ci-robot
Copy link
Contributor

@Bobgy: Reopened this issue.

In response to this:

/reopen
/assign @Ark-kun

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@k8s-ci-robot k8s-ci-robot reopened this Sep 23, 2020
@stale
Copy link

stale bot commented Dec 24, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Dec 24, 2020
@narner90
Copy link

I'm still having this issue, here is perhaps a simpler reproduction:

from typing import List

import kfp
import kfp.components as comp
from kfp import dsl


@dsl.pipeline()
def pipeline(
        outer_iterations: int = 3,
        inner_iterations: int = 2
):
    outer_range = int_range(outer_iterations)
    inner_range = int_range(inner_iterations)
    with dsl.ParallelFor(outer_range.output) as outer_index:
        with dsl.ParallelFor(inner_range.output) as inner_index:
            print_op(outer_index, inner_index)


@comp.func_to_container_op
def int_range(x: int) -> List[int]:
    return list(range(x))


@comp.create_component_from_func
def print_op(a: int, b: int) -> None:
    print(f'A: {a} B: {b}')


if __name__ == '__main__':
    kfp.Client().create_run_from_pipeline_func(pipeline, arguments={})

Error:

  File "omitted/venv/lib/python3.8/site-packages/kfp/compiler/_data_passing_rewriter.py", line 155, in fix_big_data_passing
    upstream_template_name = task_name_to_template_name[upstream_task_name]
KeyError: 'int-range-2'

Putting the inner_range inside the first for loop produces the desired output, but duplicates computation:

@dsl.pipeline()
def pipeline(
        outer_iterations: int = 3,
        inner_iterations: int = 2
):
    outer_range = int_range(outer_iterations)
    with dsl.ParallelFor(outer_range.output) as outer_index:
        inner_range = int_range(inner_iterations)
        with dsl.ParallelFor(inner_range.output) as inner_index:
            print_op(outer_index, inner_index)

@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Feb 24, 2021
@Bobgy Bobgy added this to Needs triage in KFP SDK Triage via automation Feb 26, 2021
@Bobgy
Copy link
Contributor

Bobgy commented Feb 26, 2021

/assign @chensun

@stale
Copy link

stale bot commented Jun 2, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jun 2, 2021
@narner90
Copy link

narner90 commented Jun 2, 2021

This is still an issue for me on the latest release

@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jun 2, 2021
@stale
Copy link

stale bot commented Sep 3, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Sep 3, 2021
@stale
Copy link

stale bot commented Mar 3, 2022

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@stale stale bot closed this as completed Mar 3, 2022
KFP SDK Triage automation moved this from Needs triage to Closed Mar 3, 2022
@dari-o
Copy link

dari-o commented Sep 7, 2022

This is still an issue. Can someone look into it? 🤔 I see people commenting but that it has not been resolved but it was automatically closed

@dari-o
Copy link

dari-o commented Sep 7, 2022

/reopen

@google-oss-prow
Copy link

@dari-o: You can't reopen an issue/PR unless you authored it or you are a collaborator.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@Jyothipyxis
Copy link

Still facing this issue, is this solved or is there any workaround for the same?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sdk/dsl/compiler area/sdk lifecycle/stale The issue / pull request is stale, any activities remove this label. status/triaged Whether the issue has been explicitly triaged
Projects
Development

Successfully merging a pull request may close this issue.