Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide way to process partition datasets one partition at a time #1413

Closed
datajoely opened this issue Apr 5, 2022 · 10 comments
Closed

Provide way to process partition datasets one partition at a time #1413

datajoely opened this issue Apr 5, 2022 · 10 comments
Labels
Issue: Feature Request New feature or improvement to existing feature

Comments

@datajoely
Copy link
Contributor

Description

Is your feature request related to a problem? A clear and concise description of what the problem is: "I'm always frustrated when ..."

PartitionedDataSet today provides a lazy method for loading each partition in a memory efficient way, however in order to save all partitions have to be loaded into memory. This can often cause out of memory issues if the sum of all partitions is very large. Currently there isn't an easy way to perform an 'intermediate' save within the node.

In the last few weeks several users trying to process a large amount of image data have raised this limitation independently:

Context

Why is this change important to you? How would you use it? How can it benefit other users?

The partitioned dataset is a wonderful tool, but in many cases users want to process one partition at a time - not in bulk. The current save mechanism is memory constrained.

Possible Implementation

(Optional) Suggest an idea for implementing the addition or change.

This needs more technical design - but it feels quite tangible. Currently things look like this:

image

Going forward it would be we could introduce a special node type that has the core assumption that the same number of input partitions as output partitions.

image

Here, as with the rest of Kedro - the node has no knowledge of how the data gets loaded or saved - but we allow the users to define a node that simple processes one partition at a time.

To introduce this I think we'd need to make a few changes to the library:

  • We would need to update PartitionedDataSet to be able to easily load and save a single partition. I've got this working on a branch called feature/partitioned-node as an experiment and think this change is useful as is because it would make hooks more powerful.
  • We would need to introduce some way of identifying this special type of node, possibly a new subclass. ParititionedNode or NodeIterator could work as names.
  • We would finally need to make changes to _call_node_run to essentially run this in a sort of batch mode.
@datajoely datajoely added the Issue: Feature Request New feature or improvement to existing feature label Apr 5, 2022
@deepyaman
Copy link
Member

deepyaman commented Apr 7, 2022

however in order to save all partitions have to be loaded into memory

PartitionedDataSet has supported lazily materializing data on save since 0.17.4. Return Callables as values in the dictionary returned by the node (to be saved by PartitionedDataSet) in order to take advantage of this functionality.

@datajoely
Copy link
Contributor Author

@deepyaman could you provide a snippet? This would be much simpler than my proposal and I'd love to include it in the docs

@deepyaman
Copy link
Member

@deepyaman could you provide a snippet? This would be much simpler than my proposal and I'd love to include it in the docs

I'll create/share an example later today.

@deepyaman
Copy link
Member

@datajoely Upon looking, there's already an example at the bottom of https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-save. I was going to create a "more realistic/relevant" example (image transpositions as a preprocessing step), but let me know if that's necessary now that we see there's already an example.

@datajoely
Copy link
Contributor Author

@deepyaman I'd love to see one, it's actually coming up multiple times a week on discord :)

@kadeshoe5
Copy link

Here is an example I made that worked for a node that inputted and outputted a partition:

def preprocess_all_data(partitioned_input: Dict[str, Callable[[], Any]]) -> Dict[str, Callable[[], Any]]:
    
    return {
        key: (lambda: _preprocess_partion(load_func())) for key, load_func in partitioned_input.items()
    }

Here you are using a function _preprocess_partion that would take a single data frame as input, and then applying it lazily using comprehension.

@datajoely
Copy link
Contributor Author

I'm actually closing this as I didn't realise there was a neat way of doing it!

@auggie246
Copy link

auggie246 commented Feb 28, 2024

Here is an example I made that worked for a node that inputted and outputted a partition:

def preprocess_all_data(partitioned_input: Dict[str, Callable[[], Any]]) -> Dict[str, Callable[[], Any]]:
    
    return {
        key: (lambda: _preprocess_partion(load_func())) for key, load_func in partitioned_input.items()
    }

Here you are using a function _preprocess_partion that would take a single data frame as input, and then applying it lazily using comprehension.

A late follow up but I have been trying to implement this but facing a weird error. Using your example, load_func() will always end up loading the last file in my folder.
I did manage to fix it by modifying your example.

 def preprocess_all_data(partitioned_input: Dict[str, Callable[[], Any]]) -> Dict[str, Callable[[], Any]]:
     
     return {
         key: (lambda load_func=load_func: _preprocess_partion(load_func())) for key, load_func in partitioned_input.items()
     }

The issue is related to Python's late binding closures, which can cause the lambda function to use the value of load_func as it exists at the end of the loop, rather than its value at each iteration. This is why all the lambda functions end up referring to the same load_func and hence, resulting in the same DataFrame every time I called them.

Hope this help anyone else who face this issue. Spent a day debugging this.

@datajoely
Copy link
Contributor Author

Amazing work @auggie246 we need to get this in the docs ASAP!

@datajoely
Copy link
Contributor Author

For anyone trying to spot the diff above

def preprocess_all_data(
  partitioned_input: Dict[str, Callable[[], Any]],
) -> Dict[str, Callable[[], Any]]:
  return {
-    key: (lambda: _preprocess_partion(load_func()))
+    key: (lambda load_func=load_func: _preprocess_partion(load_func()))
    for key, load_func in partitioned_input.items()
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Feature Request New feature or improvement to existing feature
Projects
None yet
Development

No branches or pull requests

4 participants