Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kedro pipeline iterate over the iterator if a node returns an iterator #2859

Open
Sage0614 opened this issue Jul 28, 2023 · 4 comments
Open
Labels
Community Issue/PR opened by the open-source community

Comments

@Sage0614
Copy link

Description

when you define a node to return an iterator, kedro pipeline will iterate over the iterator

Context

I am aware kedro support generator function in node, it is good, however, kedro shouldn't assume what user want to do with the generator, and should pass it as what it is by default, rather than implicitly guess when people want to execute the generator, and should provide explicit argument in pipeline.node whether user want to keep it as it is or execute the generator.

in general, people expect the pipeline to work as pure function by default, and other behavior should be explicit

currently, there's no way to specify that I want to keep the iterator as it is as output, and the iterator is always exhausted when running the node.

Steps to Reproduce

in nodes.py:

class SampleIter():
    def __init__(self):
        self.curr = 0
        self.max = 10
    def __iter__(self):
        return self
    def __next__(self):
        if self.curr < self.max:
            self.curr += 1
            return self
        else:
            raise StopIteration
        

def node_iter():
    return SampleIter()

def node_call(iter):
    res = 0
    for i in iter:
        res += i.curr
    print(res)
    return res 

in pipeline.py:

from kedro.pipeline import Pipeline, node

from .nodes import node_iter, node_call

def create_pipeline(**kwargs)-> Pipeline:
    return Pipeline(
        [
            node(
                func=node_iter, 
                inputs=None, 
                outputs= "iter"),
            node(
                func=node_call,
                inputs = "iter",
                outputs = "res")
        ]
    )

Expected Result

res should be 55

Actual Result

res is 0

  • Kedro version used (pip show kedro or kedro -V): kedro, version 0.18.11
  • Python version used (python -V): Python 3.10.12
  • Operating system and version: windows WSL2 of Ubuntu 22.04.1 LTS
@noklam
Copy link
Contributor

noklam commented Jul 28, 2023

Thank you for reporting this, we will have a look at this soon. This example looks artificial, did you encounter this problem when you are using certain libraries?

@noklam
Copy link
Contributor

noklam commented Jul 28, 2023

Potentially a bug of #2161

@Sage0614
Copy link
Author

Thank you for reporting this, we will have a look at this soon. This example looks artificial, did you encounter this problem when you are using certain libraries?

no, my use case is I create a parameterized sampler which have pretty complicated behavior where each iteration depends on the status of previous iteration, and I don't know how many iteration it will generate in advance, so it make sense to implement it as a iterator, which I have several down stream task to consume the sampler and generate analytics

@astrojuanlu astrojuanlu added the Community Issue/PR opened by the open-source community label Sep 9, 2023
@deepyaman
Copy link
Member

I think generator functions only work for passing to datasets, but can't be passed through to other nodes (via other datasets)? This is something I expected to work, but ran into an issue with, in https://github.com/deepyaman/partitioned-dataset-demo/tree/dd2d05f14fac0d2ff7fb4a949e8aac062dc70431:

(kedro) deepyaman@deepyaman-mac new-kedro-project % kedro run
[05/31/24 13:48:45] INFO     Kedro project new-kedro-project                                                                                                                                      session.py:324
[05/31/24 13:48:46] INFO     Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains.                                                   sequential_runner.py:64
                             https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously                                                                              
                    INFO     Loading data from params:n (MemoryDataset)...                                                                                                                   data_catalog.py:483
                    INFO     Running node: generate_emails([params:n]) -> [emails]                                                                                                                   node.py:361
                    INFO     Saving data to emails (PartitionedDataset)...                                                                                                                   data_catalog.py:525
                    INFO     Completed 1 out of 4 tasks                                                                                                                                  sequential_runner.py:90
                    INFO     Loading data from emails (PartitionedDataset)...                                                                                                                data_catalog.py:483
                    INFO     Running node: capitalize_content([emails]) -> [capitalized_emails]                                                                                                      node.py:361
                    INFO     Saving data to capitalized_emails (PartitionedDataset)...                                                                                                       data_catalog.py:525
                    INFO     Completed 2 out of 4 tasks                                                                                                                                  sequential_runner.py:90
                    INFO     Loading data from capitalized_emails (PartitionedDataset)...                                                                                                    data_catalog.py:483
                    INFO     Running node: extract_content([capitalized_emails]) -> [contents]                                                                                                       node.py:361
                    INFO     Saving data to contents (MemoryDataset)...                                                                                                                      data_catalog.py:525
                    INFO     Saving data to contents (MemoryDataset)...                                                                                                                      data_catalog.py:525
                    INFO     Saving data to contents (MemoryDataset)...                                                                                                                      data_catalog.py:525
                    INFO     Completed 3 out of 4 tasks                                                                                                                                  sequential_runner.py:90
                    INFO     Loading data from contents (MemoryDataset)...                                                                                                                   data_catalog.py:483
                    INFO     Running node: tokenize([contents]) -> [tokens]                                                                                                                          node.py:361
                    INFO     Saving data to tokens (PartitionedDataset)...                                                                                                                   data_catalog.py:525
                    INFO     Completed 4 out of 4 tasks                                                                                                                                  sequential_runner.py:90
                    INFO     Pipeline execution completed successfully.                                                                                                                            runner.py:119

Would expect tokenize to process all three outputs from contents.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
Status: No status
Development

No branches or pull requests

4 participants