Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for schema in saving dask parquet (#1736) #1746

Merged
merged 4 commits into from Nov 29, 2022

Conversation

avsolatorio
Copy link
Contributor

@avsolatorio avsolatorio commented Aug 1, 2022

Signed-off-by: Aivin V. Solatorio avsolatorio@gmail.com

Description

Resolves: #1736

Development notes

This change allows the use of the schema argument in the dask.to_parquet API from kedro's dask.ParquetDataSet.

A custom parser for the schema is implemented. The parser supports all kinds of schema declaration accepted by the underlying dask.to_parquet API.

The documentation was updated to show an example of the grammar for defining the schema in the catalog.yml

The change requires the parsing of the _save_args for the schema key and handles the transformation of the fields to a pyarrow.DataType or pyarrow.Schema accordingly.

Tests have also been written for this change.

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the RELEASE.md file
  • Added tests to cover my changes

@datajoely
Copy link
Contributor

Thanks for raising this @avsolatorio it's looking real good!

@avsolatorio
Copy link
Contributor Author

@datajoely I have an idea, but this might take some time.

I'm looking at using ast to explicitly parse the input expressions. Then I will implement something that rebuilds the field definitions based on the parsed data. Does that sound acceptable? :)

I'll make this PR a draft until I finish that.

@avsolatorio avsolatorio marked this pull request as draft August 1, 2022 18:31
@deepyaman
Copy link
Member

I don't think PyArrow provides a native way to define complex schema as text (as mentioned in your issue), although simple stuff like pa.schema({'hi': 'u1'}) == pa.schema({'hi': pa.uint8()}) works. However, Fugue provides a way to do so. Maybe we should just use their logic, since it's non-trivial? The requirements for https://github.com/fugue-project/triad also look pretty lightweight, so I don't see an issue with depending on it.

@avsolatorio
Copy link
Contributor Author

I don't think PyArrow provides a native way to define complex schema as text (as mentioned in your issue), although simple stuff like pa.schema({'hi': 'u1'}) == pa.schema({'hi': pa.uint8()}) works. However, Fugue provides a way to do so. Maybe we should just use their logic, since it's non-trivial? The requirements for https://github.com/fugue-project/triad also look pretty lightweight, so I don't see an issue with depending on it.

@deepyaman thanks for looking into this! I checked Fugue, and I don't think it supports complex schema structures such as nested list types. For example, I don't see any way to represent a field that has this representation list_(list_(int64)).

I have implemented a parser instead to supplant the use of eval, and I think it works relatively well.

@avsolatorio avsolatorio marked this pull request as ready for review August 2, 2022 02:32
@deepyaman
Copy link
Member

I don't think PyArrow provides a native way to define complex schema as text (as mentioned in your issue), although simple stuff like pa.schema({'hi': 'u1'}) == pa.schema({'hi': pa.uint8()}) works. However, Fugue provides a way to do so. Maybe we should just use their logic, since it's non-trivial? The requirements for https://github.com/fugue-project/triad also look pretty lightweight, so I don't see an issue with depending on it.

@deepyaman thanks for looking into this! I checked Fugue, and I don't think it supports complex schema structures such as nested list types. For example, I don't see any way to represent a field that has this representation list_(list_(int64)).

Ugh, you're right. I wonder if they'd be open to a simple change to support this?

I have implemented a parser instead to supplant the use of eval, and I think it works relatively well.

I think something to this extent would work, but I'm personally in favor of a solution that doesn't involve maintaining a schema parser, if we can. I assume the subset of people using complex schemas in dask.ParquetDataSet via Kedro is very limited, so, while I think we should solve this need, I'd love for it to be as straightforward as possible.

Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @avsolatorio, thank you so much for taking this on.

I don't know much about dask and or parquet, so I'd like to ask some questions:

  1. I see you've added pyarrow as a dependency to the dataset. If a user just wants to use the existing functionality they will now to also need to install this dependency. Would it perhaps make sense to have a separate dataset that makes use of pyarrow, or do you think in general any user that uses dask/parquet would probably use pyarrow as well?
  2. You mention: "Alternatively, a full pyarrow.Schema type may also be provided if desired." How exactly would that work?
  3. In the schema a user provides, is it necessary to provide all columns with datatypes?
  4. How would more complex datatypes work? Perhaps a more advanced example would be useful as well.
    I guess I'm mainly trying to understand how many use cases this covers or if it's rather specific.

kedro/extras/datasets/dask/parquet_dataset.py Outdated Show resolved Hide resolved
@deepyaman
Copy link
Member

  • I see you've added pyarrow as a dependency to the dataset. If a user just wants to use the existing functionality they will now to also need to install this dependency. Would it perhaps make sense to have a separate dataset that makes use of pyarrow, or do you think in general any user that uses dask/parquet would probably use pyarrow as well?

That's a good catch. I don't think it's necessary to add pyarrow as a dependency for the Kedro dataset, because the underlying read_parquet uses pyarrow if installed, else fastparquet. schema wouldn't be used with fastparquet, anyway, so the choice to install pyarrow would be up to the user.

@avsolatorio
Copy link
Contributor Author

avsolatorio commented Aug 2, 2022

  1. I see you've added pyarrow as a dependency to the dataset. If a user just wants to use the existing functionality they will now to also need to install this dependency. Would it perhaps make sense to have a separate dataset that makes use of pyarrow, or do you think in general any user that uses dask/parquet would probably use pyarrow as well?

@MerelTheisenQB you're very much correct. While I think pyarrow is widely used, this is an optional dependency to dask as well. And as @deepyaman pointed out, dask's to_parquet API falls back to fastparquet when pyarrow engine is not installed. In that case, the schema is not used by dask.

  1. You mention: "Alternatively, a full pyarrow.Schema type may also be provided if desired." How exactly would that work?

So dask's to_parquet API accepts the following signature for its schema argument: (i) None, (ii) "infer" where the schema is infered from a sample of the data, (iii) a dictionary defining the columns and the intended field types, and (iv) a pyarrow.Schema instance.

In the parser that I implemented, it allows for all of these specifications in the catalog.yml. Specifically, if the user would like to specify the full schema it is possible by doing something like:

save_args:
  schema: schema([("col1", int64()), ("col2", list_(list_(int64)))])
  1. In the schema a user provides, is it necessary to provide all columns with datatypes?

Not necessarily. If the passed value is a dictionary of col:field type then it will only cast the types for the explicitly defined column data type.

  1. How would more complex datatypes work? Perhaps a more advanced example would be useful as well.
    I guess I'm mainly trying to understand how many use cases this covers or if it's rather specific.

I discovered this issue when I was trying to create a dataset where I had to store in a column a list of tokenized segments of text, e.g., sentences in a paragraph.

I need to use this data in a transformer model that requires the ordering of a sequence of segmented text data. Therefore the data structure that my pipeline output has this form list_(list_(int64())). All is good while doing the processing since these are all represented as objects, but it fails when I actually store the data.

Indeed, I'm not sure how many would have this use case. I just thought that it would be good to support the functionalities of the underlying backends since kedro has the save_args argument exposed anyway.

I think @deepyaman's PR in Fugue is a very good alternative to decouple this dependency. Nonetheless, Fugue strictly depends on pyarrow. So it is inevitable that pyarrow will become an implicit dependency on kedro.

@merelcht
Copy link
Member

merelcht commented Aug 3, 2022

Responding again to your answers on my questions:

  1. Okay perfect, let's remove the dependency in that case and rely on the fall back mechanism from Dask.
  2. That makes sense, thanks for explaining. I think it would be useful to add some additional documentation with examples of the schema signatures you can add.
  3. Great, thanks for clarifying 🙂

On question 4:

  1. How would more complex datatypes work? Perhaps a more advanced example would be useful as well.
    I guess I'm mainly trying to understand how many use cases this covers or if it's rather specific.

I discovered this issue when I was trying to create a dataset where I had to store in a column a list of tokenized segments of text, e.g., sentences in a paragraph.

I need to use this data in a transformer model that requires the ordering of a sequence of segmented text data. Therefore the data structure that my pipeline output has this form list_(list_(int64())). All is good while doing the processing since these are all represented as objects, but it fails when I actually store the data.

Indeed, I'm not sure how many would have this use case. I just thought that it would be good to support the functionalities of the underlying backends since kedro has the save_args argument exposed anyway.

Thanks for explaining your need for this functionality in more detail, your use case definitely makes sense. And I appreciate your thoughts on adding the support in Kedro, it would be good to have this functionality for other users who run into this issue. I think my main concern with this implementation is explainability and maintainability. The first point is mainly around if this solution makes sense to others and is intuitive enough so they would make use of it. I think that can be solved by adding detailed docs and examples. The second point about maintainability echos @deepyaman's thoughts above for maintaining a schema parser, which wouldn't be ideal for the team.

I think @deepyaman's PR in Fugue is a very good alternative to decouple this dependency. Nonetheless, Fugue strictly depends on pyarrow. So it is inevitable that pyarrow will become an implicit dependency on kedro.

The above sounds okay to me, because pyarrow will only be a dependency for users that need this dataset, but not for every kedro framework user.

@deepyaman
Copy link
Member

@avsolatorio We've discussed this among the maintainer team, and while we think it's important to support this functionality, we'd like to do it in a way that would result in the least maintenance on our end--especially given our current expectation that the use case of complex schema is pretty niche. To this extent, our preference are, in order:

  1. Use fugue.triad for parsing schema, including complex schema. I've reached out to one of the maintainers to verify if it will be feasible to add the nested schema parsing logic.
  2. Use fugue.triad for parsing schema, even if it doesn't support nested schema. Maybe we can use it with a little patch applied in the Kedro dataset code, if and until my PR there is merged. :D
  3. Implement a custom schema parser.

While we appreciate the work you've done, and think it's really cool, we probably won't go the third route. For now, this would also mean using a custom local dataset (would be true regardless, until next release). That being said, would you be open to updating your PR (or opening a new one) to leverage fugue.triad for this?

@deepyaman
Copy link
Member

  1. Use fugue.triad for parsing schema, including complex schema. I've reached out to one of the maintainers to verify if it will be feasible to add the nested schema parsing logic.

Confirmed with the maintainers that this is just a pending bug, so will try and get it resolved this weekend!

@avsolatorio
Copy link
Contributor Author

That being said, would you be open to updating your PR (or opening a new one) to leverage fugue.triad for this?

@deepyaman I'm very much open to updating the PR to use fugue.triad for this functionality. 👌🏽

@deepyaman
Copy link
Member

That being said, would you be open to updating your PR (or opening a new one) to leverage fugue.triad for this?

@deepyaman I'm very much open to updating the PR to use fugue.triad for this functionality. 👌🏽

@avsolatorio Great! Triad 0.6.7 was released a few hours ago, and it has the bug fix to support nested schema. :)

@avsolatorio
Copy link
Contributor Author

@avsolatorio Great! Triad 0.6.7 was released a few hours ago, and it has the bug fix to support nested schema. :)

Hello, @deepyaman. I have already updated the PR to use triad for this functionality. Kindly let me know if there are any specific implementation details that I should consider further. Thank you! 😃

@datajoely
Copy link
Contributor

This is excellent work @avsolatorio really appreciate the effort here 💪

Copy link
Member

@deepyaman deepyaman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great overall! Took a quick pass and left 2 comments; I'll try and go through it more properly later.


if isinstance(schema, dict):
pa_schema = triad.Schema(
",".join([":".join(i) for i in schema.items()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can construct triad.Schema directly from a dict; see https://github.com/fugue-project/triad/blob/master/triad/collections/schema.py#L59.

An example with string values:

>>> Schema({"a": "int", "b": "float"}).pyarrow_schema
a: int32
b: float

I don't know that you can do more complex stuff, like nested lists, but I think it's a bit unnatural that you define the top level as a dict but the nested lists as strings (as in the test cases).

I think if you've got a power-user, they can just construct the schema string for more complex cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I chose to implement it this way because of how dask.to_parquet distinguishes between a dict-like vs. a pa.Schema-like value for its schema argument.

If we directly cast a dict input into a schema as your snippet shows, triad would return a schema containing the key:value explicitly declared in the dict which would prevent dask from inferring the types for the other fields (if any). It could cause unexpected behaviors since there could be a case where a user simply wants to explicitly override the inferred field types for some of the columns and let dask do the inference for the rest.

In some sense, the dict input is a more general case since we can choose to specify the field types only for certain fields or for all the fields.

it's a bit unnatural that you define the top level as a dict but the nested lists as strings (as in the test cases).

Ah, I was simulating how the parsed arguments from the catalog.yml would be passed into the dataset.

Since in the catalog.yml, the specification looks like:

save_args:
  schema:
    col1: int64
    col2: "[[int64]]"

then, I suppose that would be passed as a dict with this structure {"col1": "int64", "col2": "[[int64]]"}.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my reference (and anybody else who may be looking at this and not familiar with the way dask.to_parquet works), the code that does the partial inference that @avsolatorio describes is here: https://github.com/dask/dask/blob/2022.6.0/dask/dataframe/io/parquet/arrow.py#L500-L529

In that case, I agree that it makes sense to not convert to a full PyArrow schema before passing to dask.to_parquet, and instead pass a field name-type mapping. Perhaps the exception is if somebody passes a pa.Schema object as schema, in which case we should replicate the Dask behavior of not performing inference? It's a bit confusing...

Taking all of this into account, maybe it's simplest and most explicit to:

  1. Accept a string, dict, pa.Schema, or whatever else Triad takes to construct a triad.Schema for the schema argument.
  2. Add an infer_schema argument (that defaults to True), based on which the schema arg is either used as is or as overrides for the inferred schema.

As for converting the triad.Schema object to a field-type mapping goes, you can use the fields property on triad.Schema (no need to extract the pyarrow_schema and iterate over that yourself).

Let me know if that makes sense?

"""
schema = self._save_args.get("schema")

if isinstance(schema, dict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also support the case in which somebody passes a pa.Schema? I assume so, since that's the native Dask expectation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's a good point!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using pa.Schema is already supported since whatever value that is passed in _save_args["schema"] will also be passed to Dask's to_parquet.

@deepyaman
Copy link
Member

I'm so sorry about the delay on this. I told myself I'd take a look this weekend, but I got busy with other things. I will try and make sure to take a look at it later this week and push it through!

Copy link
Member

@deepyaman deepyaman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the very overdue response. I've finally had the chance to give this a much more thorough look, and made a suggestion. Let me know if you agree, or if I'm missing something again.


if isinstance(schema, dict):
pa_schema = triad.Schema(
",".join([":".join(i) for i in schema.items()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my reference (and anybody else who may be looking at this and not familiar with the way dask.to_parquet works), the code that does the partial inference that @avsolatorio describes is here: https://github.com/dask/dask/blob/2022.6.0/dask/dataframe/io/parquet/arrow.py#L500-L529

In that case, I agree that it makes sense to not convert to a full PyArrow schema before passing to dask.to_parquet, and instead pass a field name-type mapping. Perhaps the exception is if somebody passes a pa.Schema object as schema, in which case we should replicate the Dask behavior of not performing inference? It's a bit confusing...

Taking all of this into account, maybe it's simplest and most explicit to:

  1. Accept a string, dict, pa.Schema, or whatever else Triad takes to construct a triad.Schema for the schema argument.
  2. Add an infer_schema argument (that defaults to True), based on which the schema arg is either used as is or as overrides for the inferred schema.

As for converting the triad.Schema object to a field-type mapping goes, you can use the fields property on triad.Schema (no need to extract the pyarrow_schema and iterate over that yourself).

Let me know if that makes sense?

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Sep 20, 2022
@merelcht merelcht mentioned this pull request Nov 7, 2022
10 tasks
@merelcht
Copy link
Member

merelcht commented Nov 7, 2022

Hi @avsolatorio do you still want to complete this PR, or should someone from the team take-over? We'd like to get all PRs related to datasets to be merged soon now we're moving our datasets code to a different package (see our medium blog post for more details)

@avsolatorio
Copy link
Contributor Author

Hi @avsolatorio do you still want to complete this PR, or should someone from the team take-over? We'd like to get all PRs related to datasets to be merged soon now we're moving our datasets code to a different package (see our medium blog post for more details)

Hello @merelcht , apologies for not being able to get back and close this PR earlier. I'll finalize this and update with @deepyaman 's comments.

@merelcht
Copy link
Member

Hi @avsolatorio, do you think you'll have time to complete this in the next few days? We are going to do a release early next week and need any dataset changes to be merged by then. Otherwise, we'll close this PR and ask you to re-open it on the new datasets repo when it's ready.

Signed-off-by: Aivin V. Solatorio <avsolatorio@gmail.com>
@avsolatorio
Copy link
Contributor Author

Hi @avsolatorio, do you think you'll have time to complete this in the next few days? We are going to do a release early next week and need any dataset changes to be merged by then. Otherwise, we'll close this PR and ask you to re-open it on the new datasets repo when it's ready.

Hello @merelcht PR updated!

@deepyaman kindly check because I decided not to implement the infer_schema argument. How it is implemented now is exactly how Dask expects to accept a schema. That is, passing a pa.Schema instance will be passed directly to Dask, and it should specify a full schema for all columns. The implementation here basically forms an interface to handle the arbitrary field typing supported by passing a dictionary to Dask's to_parquet method.

The infer_schema argument seems redundant since if I don't want the schema to be inferred, I can just pass the full pa.Schema instance with all the necessary field type declarations. If I want inference, I can just choose not to specify a schema, or override inference for a few fields by specifying the fields' type declarations in a dictionary.

Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution and your patience with our reviews @avsolatorio 🙏 ⭐
I especially like the clear doc-strings you've added to explain the schema functionality.

Copy link
Member

@deepyaman deepyaman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avsolatorio Thank you for the updates! I think this makes sense, and the way you've documented when inference happens (i.e. if you specify a dict) is great.

Look forward to seeing some more people use this and see how they like the added power! :)

RELEASE.md Outdated Show resolved Hide resolved
kedro/extras/datasets/dask/parquet_dataset.py Outdated Show resolved Hide resolved
Copy link
Contributor

@jmholzer jmholzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Thanks for the contribution 😃

Two minor code optimisation suggestions from me, nothing blocking.

kedro/extras/datasets/dask/parquet_dataset.py Outdated Show resolved Hide resolved
Comment on lines 186 to 202
# Create a schema from values that triad can handle directly
triad_schema = triad.Schema(
{k: v for k, v in schema.items() if not isinstance(v, str)}
)

# Handle the schema keys that are represented as string and add them to the triad schema
triad_schema.update(
triad.Schema(
",".join(
[
":".join([k, v])
for k, v in schema.items()
if isinstance(v, str)
]
)
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner to build the schema in one loop, would something like the following work?

Suggested change
# Create a schema from values that triad can handle directly
triad_schema = triad.Schema(
{k: v for k, v in schema.items() if not isinstance(v, str)}
)
# Handle the schema keys that are represented as string and add them to the triad schema
triad_schema.update(
triad.Schema(
",".join(
[
":".join([k, v])
for k, v in schema.items()
if isinstance(v, str)
]
)
)
)
triad_schema_dict = {}
for k, v in schema.items():
if isinstance(v, str):
...
else:
...
triad_schema = triad.Schema(triad_schema_dict)

merelcht and others added 3 commits November 29, 2022 10:43
Co-authored-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
@merelcht merelcht merged commit bb902b1 into kedro-org:main Nov 29, 2022
@avsolatorio
Copy link
Contributor Author

@merelcht @deepyaman @jmholzer, thanks for all the suggestions! Apologies that I was not able to work on the final set of comments. I was traveling, and I wasn't able to find time to implement the suggestions. Nonetheless, great to see this PR merged! 🥳

@avsolatorio
Copy link
Contributor Author

@merelcht btw, would this PR qualify for this: #2050 (comment). Would be super to be added on this list #2064 (comment)! 🔋

@merelcht
Copy link
Member

@merelcht btw, would this PR qualify for this: #2050 (comment). Would be super to be added on this list #2064 (comment)! 🔋

Absolutely! Let me create a new PR to add you 🙂

@merelcht
Copy link
Member

Oh actually you're already there 😄 https://github.com/kedro-org/kedro/blob/main/RELEASE.md#contributions-from-the-kedroid-community

@avsolatorio
Copy link
Contributor Author

Oh actually you're already there 😄 https://github.com/kedro-org/kedro/blob/main/RELEASE.md#contributions-from-the-kedroid-community

Yay! Thank you so much! 🥳

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for defining a pyarrow schema for dask.ParquetDataSet in catalog.yml
5 participants