Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the usage of generator functions in nodes #2161

Merged
merged 12 commits into from
Jan 6, 2023

Conversation

idanov
Copy link
Member

@idanov idanov commented Dec 23, 2022

Description

Until now Kedro nodes were not very suitable for chunkwise processing of data. While nothing prevents a user to create a dataset loading the data as a generator, the only way one could process that data is by aggregating their result in memory and only then saving it.

This mode robs the chunkwise processing of data of its main use, namely minimising the memory footprint of the processing function. The changes in this PR make it possible to use generator functions in a Kedro pipeline and save the results by calling the dataset's save method separately for each chunk. Now the only thing a user needs to do in their function is:

for chunk in input:
    # process data chunk somehow
    yield processed_chunk

For taking advantage of this functionality, users need to create custom datasets which would operate in create-or-append-only mode, since the saving of the data. chunks happens over several invocations of the .save method. Chunk-wise data loading is already supported by existing Pandas datasets through the chunksize= load parameter, IIRC.

Note that the current implementation doesn't work with is_async data loading and saving.

Development notes

There are two main changes:

  1. _outputs_to_dictionary in node.py
  2. _run_node_sequential in runner.py

In 1., the main job is to convert a single generator to one or more generators, mapped to dataset names:

  • when the node has one dataset as input, it returns the generator straight away in a dictionary with one key
  • when the node has a list/tuple of outputs, it returns a dictionary with that list/tuple as keys, and the generator split into corresponding generators as values
  • when the node has a dictionary mapping, it returns a dictionary with the generator split into a number of generators as values, mapped to the given names as keys

In order to manipulate the generators without invoking them, a combination of itertools and more_itertools functions are used.

In 2., the outputs are tested for being iterators, i.e. coming from generators. If that's the case, the generators are interleaved in a round-robin fashion and paired with their corresponding names. Example:

generator_a = (x for x in range(10))
generator_b = (x for x in range(10, 20))
outputs = {"a":  generator_a, "b": generator_b}
# after the processing the items stream will look like:
items = [("a", 0), ("b", 10), ("a", 1), ("b", 11), ...]

The new iterator will take the first element from each generator and pair them with the dataset name, then it will take the second element from each generator and pair them with their dataset name, and so on, until the generators finish. All generators are ensured to finish at the same time, because they originate from the same function generator.

This means that once the user function yields the data, each part will be saved to it's corresponding dataset, and then the function will continue processing the next chunk, yield the data, save it, then again process, yield, save, and so on.

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the RELEASE.md file
  • Added tests to cover my changes

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
@idanov idanov self-assigned this Dec 23, 2022
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
@idanov idanov marked this pull request as ready for review December 23, 2022 23:36
@idanov idanov added Component: Framework Issue/PR that addresses core framework functionality Component: IO Issue/PR addresses data loading/saving/versioning and validation, the DataCatalog and DataSets labels Dec 23, 2022
Copy link
Contributor

@jmholzer jmholzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are really cool, I love the efficient use of functions from itertools and more_itertools.

I left a few comments, all on code style, I couldn't find any logical problems though I haven't tested these changes manually yet.

kedro/pipeline/node.py Show resolved Hide resolved
kedro/pipeline/node.py Outdated Show resolved Hide resolved
kedro/runner/runner.py Show resolved Hide resolved
tests/runner/test_run_node.py Show resolved Hide resolved
Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a really interesting PR to review! I learned a lot, but also still have a lot of questions 😁

I was also going to comment we should add docs for this and then saw you added an issue already 👍

Makefile Show resolved Hide resolved
kedro/pipeline/node.py Outdated Show resolved Hide resolved
kedro/runner/runner.py Outdated Show resolved Hide resolved
kedro/pipeline/node.py Outdated Show resolved Hide resolved
kedro/pipeline/node.py Outdated Show resolved Hide resolved
kedro/pipeline/node.py Show resolved Hide resolved
kedro/runner/runner.py Show resolved Hide resolved
kedro/runner/runner.py Outdated Show resolved Hide resolved
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Generators cannot refer to themselves in their definition, or they will fail when used.

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks really good now! Great addition 👍

Copy link
Contributor

@jmholzer jmholzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I really like this PR. The code is in a good state.

Would you like someone to manually test the changes before merging?

Copy link
Member Author

@idanov idanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmholzer I think the manual test will be part of the documentation writing here: #2170

@idanov idanov merged commit fcf3ab4 into main Jan 6, 2023
@idanov idanov deleted the feature/enable-generator-nodes branch January 6, 2023 14:27
jmholzer pushed a commit that referenced this pull request Jan 9, 2023
* Modify the node and run_node to enable generator nodes

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Add tests to cover all types of generator functions

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Fail on running a generator node with async load/save

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Lint my code changes

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Add changelog to RELEASE.md

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Simplify the usage of spy and clarify with a comment

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Improve error messaging

* Improve readability slightly in certain places

* Correct the expected error message in node run tests

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Revert the eager evaluation of the result in _from_dict

Generators cannot refer to themselves in their definition, or they will fail when used.

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

* Add a comment for the eager evaluation

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>

Signed-off-by: Ivan Danov <idanov@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Framework Issue/PR that addresses core framework functionality Component: IO Issue/PR addresses data loading/saving/versioning and validation, the DataCatalog and DataSets
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants