Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing Extra Parameters to Custom Dataset #1723

Open
brendalf opened this issue Jul 25, 2022 · 17 comments
Open

Passing Extra Parameters to Custom Dataset #1723

brendalf opened this issue Jul 25, 2022 · 17 comments

Comments

@brendalf
Copy link

brendalf commented Jul 25, 2022

Description

Hello there.
I created a custom dataset to handle our Spark Delta Tables.
The problem is that the custom dataset needs a replace-where string defining what partition should be overwritten after the data is generated inside the node.
Catalog definition:

revenue:
  type: path.DeltaTableDataSet
  namespace: test
  table: revenue
  save_args:
    -

I can't use the parameters inside the save_args key for the custom dataset because the replace values are also calculated during execution depending on other pipeline parameters, like DATE_START and LOOKBACK.

I tried to create a class to be the interface between the nodes and the custom catalog, this class holds the Spark Dataframe and extra values, but Kedro fails when trying to convert to a pickle:
Node return:

return SparkPlan(
    df=revenue,
    replace_where=[ 
        f"date >= '{from_date}' and date <= '{date_end}'"
    ]
)

Custom Dataset save method:

def _save(self, plan: SparkPlan) -> None:
    """Saves data to the specified filepath"""
    logger = logging.getLogger(self._table_name)
    logger.info(plan.replace_where)

Error received:

kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
cannot pickle '_thread.RLock' object

Questions:

  1. Is there a way to provide runtime values to the dataset together with the data?
  2. Could I put these values in the context and retrieve them inside the custom dataset?
    • I saw a method to load the current kedro context, but that method was removed.

Edit 1 - 2022-07-25:

The error above was happening because I typed the wrong dataset in the node outputs, so Kedro tried to save as a MemoryDataset.
I solved the problem of sending extra parameters by using this SparkPlan wrapper around every save and load from my custom dataset.

@deepyaman
Copy link
Member

@brendalf On your specific error, can you try returning a dictionary from your function and constructing the SparkPlan object inside _save from that dictionary?

Questions:

  1. Is there a way to provide runtime values to the dataset together with the data?

  2. Could I put these values in the context and retrieve them inside the custom dataset?

    • I saw a method to load the current kedro context, but that method was removed.

Since Kedro tries to abstract data saving/loading from logic, I don't think this is directly supported. Of the top of my head, what you could do is return these runtime values from nodes, either explicitly or using hooks to pass that extra output.

@brendalf
Copy link
Author

brendalf commented Jul 25, 2022

Hi @deepyaman.
It's working now, both returning a SparkPlan or a dict.
I realized that I had a typo in the catalog dataset name.
Thanks.

Since Kedro tries to abstract data saving/loading from logic, I don't think this is directly supported. Of the top of my head, what you could do is return these runtime values from nodes, either explicitly or using hooks to pass that extra output.

Can you provide a short example?

@datajoely
Copy link
Contributor

Are you running this with ParallelRunner? That's a common issue here.

@brendalf
Copy link
Author

No. I'm not

@brendalf
Copy link
Author

brendalf commented Jul 25, 2022

Although I solved the issue by wrapping the data and the parameters inside a class, I think it would be good to have this feature handled by Kedro in the future.
Thanks for the support. Should I close this?

@datajoely
Copy link
Contributor

datajoely commented Jul 25, 2022

Hi @brendalf I've just realised this is possibly resolved by tweaking the copy_mode of the memory dataset when passed into the next node:

https://kedro.readthedocs.io/en/latest/_modules/kedro/io/memory_dataset.html

@noklam
Copy link
Contributor

noklam commented Jul 25, 2022

kedro.io.core.DataSetError: Failed while saving data to data set MemoryDataSet().
cannot pickle '_thread.RLock' object

These errors almost always come from serialization, I think we had similar issue with TensorFlow object, quick solution is the copy_mode that Joel mentioned above.

@datajoely
Copy link
Contributor

@noklam do you think we could catch this pickling error and recommend the solution? It's a hard one to debug for users in this situation.

@brendalf
Copy link
Author

Hi @datajoely
In my case I didn't wanted to save in a MemoryDataset, that was happening because I had a type between the data catalog entry and the name I actually wrote as output for the node.
I think the problem happened because the memory dataset tried to serialize a spark dataframe object.

@datajoely
Copy link
Contributor

Sorry - MemoryDataSet is used to dynamically pass data between nodes automatically, if you look at the implementation we automatically do this for native Spark dataframes:

image

So you can do this by explicitly declaring MemoryDataSets in the catalog.

I also think if you were to subclass our spark.SparkDataSet or spark.DeltaTableDataSet you would benefit from this too.

@brendalf
Copy link
Author

Do you think it would be nice to have in the future a way to send runtime calculated values as extra parameters to the dataset?
For now, I solved by wrapping the values and the dataset inside a class, that my custom dataset accepts to save.
If not, we can close this issue.

@noklam
Copy link
Contributor

noklam commented Jul 27, 2022

@brendalf Could you provide an example of that?

@datajoely
Copy link
Contributor

@brendalf or perhaps - why can't you just return runtime data as inputs to another node, does it need to be in the DataSet implementation?

@brendalf
Copy link
Author

My custom dataset needs to receive two things:

  1. Spark data to be saved.
  2. Replace where query using values calculated inside the node.

Example:
I need to replace the data inside the Delta Table for a specific set of dates.
I have a date_start, a date_end and a lookback parameters defined inside the parameters.yml and then inside the node I actually load data from date_start - lookback to date_end, so I need to replace the same dates in the output table.

I thought about three solutions to solve this:

  1. Create a class that accepts the data and the replace where query, so the node can send everything I need to the custom dataset that accepts this class instead of just the spark dataset.
  2. I could push the replace where values to the parameters from inside the node and retrieve them in the custom dataset.
  3. I could instantiate my Custom Dataset inside the node and call the save method, without using the catalog.

I actually solved the problem with the first approach, but it's problematic, since now when I want to join nodes together, nodes downstream won't receive the spark dataset plan with lazy evaluation anymore, but a instance of this class.

I couldn't find how to implement the second approach.
Maybe Kedro could automatically send the context to the dataset as kwargs?

The problem with the third one is that I want to keep using the data catalog.

@brendalf
Copy link
Author

brendalf commented Aug 3, 2022

Hello folks, any news here?

@datajoely
Copy link
Contributor

I think this option is most common amongst the community:

Create a class that accepts the data and the replace where query, so the node can send everything I need to the custom dataset that accepts this class instead of just the spark dataset.

In Kedro the nodes should be pure python functions with no knowledge of IO, so you should never have a context available there.

@astrojuanlu
Copy link
Member

I can't use the parameters inside the save_args key for the custom dataset because the replace values are also calculated during execution depending on other pipeline parameters, like DATE_START and LOOKBACK.

The question of dynamic datasets like these has come up recently in some user conversations. We haven't started thinking on how to do it yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

6 participants