Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds partition support to SparkHiveDataSet #745

Closed
wants to merge 7 commits into from
Closed

Adds partition support to SparkHiveDataSet #745

wants to merge 7 commits into from

Conversation

brendalf
Copy link

@brendalf brendalf commented Apr 5, 2021

Description

SparkHiveDataSet doesn't not support partitions like SparkDataSet. As discussed in #725, @AntonyMilneQB proposed to implement this by adding a new argument partition_by inside SparkHiveDataSet constructor.
The partition_by argument was meant to be the partition where the data should be inserted, eg: partition_by="ref = 1", to be used later, inside _save, like this insert into {table} partition ({partition_by}) select * from tmp.
I'm opening this PR as a draft for discussion because I ran over some problems while I was implementing this approach.

Resolves #725

Development notes

Here are the problems:
1. If the table that doesn't exist yet
In this case, the SparkHiveDataSet is going to create with _create_empty_hive_table(self, data) but the partition_by can be used here straight away because _create_empty_hive_table needs a list of names and types to make the partition, something like PARTITIONED BY (ref integer).

I tried to replace the partition_by argument with the partitioned_by argument, to be used inside the create table, and add a new argument partition, that handles the partition where the data should be inserted, to _save. But to do that, we would need to change the save method of AbstractDataSet, so I discarded it.

I also tried to increase the @AntonyMilneQB approach by implementing two arguments to the SparkHiveDataSet constructor: partition (handles where the data should be inserted) and partitioned_by (handles the columns where the table should be partitioned). The problem here is that the _create_empty_hive_table(self, data) use CREATE TABLE AS SELECT (CTAS) and we can't use PARTITIONED BY with it. The can solve this by replacing the CTAS with the regular CREATE TABLE, but I don't know how to get the list of name and valid Spark SQL data types from the data variable that is passed to _create_empty_hive_table(self, data).

What everyone thinks? Do you know how to implement the create table with partition? Can you see a different approach to solve problem 1?

For now, the implementation reflects the @AntonyMilneQB approach and it only works if the partitioned table is already created.
But that leaves us if the second problem.

2. How can we validate partitioned tables?
The SparkHiveDataSet uses the .load() to generate the self._table_columns when the table already exists, but the load method loads the table with all columns, including the partitioned by columns, and that makes the self._validate_save fails because the user table to insert doesn't have the partition columns.

I tried to get around this by getting the partition columns names and remove these columns from self._table_columns, but I don't know if this is the best approach.

What everyone thinks?

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change and added my name to the list of supporting contributions in the RELEASE.md file
  • Added tests to cover my changes

Notice

  • I acknowledge and agree that, by checking this box and clicking "Submit Pull Request":

  • I submit this contribution under the Apache 2.0 license and represent that I am entitled to do so on behalf of myself, my employer, or relevant third parties, as applicable.

  • I certify that (a) this contribution is my original creation and / or (b) to the extent it is not my original creation, I am authorised to submit this contribution on behalf of the original creator(s) or their licensees.

  • I certify that the use of this contribution as authorised by the Apache 2.0 license does not violate the intellectual property rights of anyone else.

@brendalf brendalf changed the base branch from master to develop April 5, 2021 13:03
@brendalf brendalf marked this pull request as ready for review April 6, 2021 10:51
@brendalf brendalf requested a review from idanov as a code owner April 6, 2021 10:51
@brendalf brendalf changed the title [WiP] Adds partition support to SparkHiveDataSet Adds partition support to SparkHiveDataSet Apr 6, 2021
@brendalf brendalf marked this pull request as draft April 6, 2021 10:52
Copy link
Contributor

@antonymilne antonymilne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your work here and for detailing where you've got to so well! This looks like a great start. I've left some minor comments, but I'm afraid I don't know enough about spark to consider the problems you've raised. Let me try and get some people who are more knowledgeable about spark to take a look 🙂

Comment on lines +161 to +167
self._partition = partition

# get the name of each partition
self._partitions: List[str] = []
if self._partition is not None:
for partition_set in self._partition.split(","):
self._partitions.append(partition_set.split("=")[0].strip())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to just directly make the function argument partition: List[str] = None, as is done for table_pk? This would do away with the need for string manipulation here.

Copy link
Author

@brendalf brendalf Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, but with just the list of column names, we can't identify a partition. Maybe we can make partition get a list of tuples, where each position of the list identifies the column name and value. What do you think?

Copy link
Contributor

@antonymilne antonymilne Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. That could indeed work, but now I'm wondering exactly what the partition string in the insert could look like. e.g. we can have partition_1 = x, partition_2 = y but we could also just have partition_1, partition_2 right? Do you know what is the full specification of the syntax of this string?

Now I'm actually also wondering why SparkHiveDataSet is using dynamically built SQL queries in the first place rather than the Python API, which would presumably make this sort of thing much easier. Is that impossible when doing the hive thing? @jiriklein

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @AntonyMilneQB, sorry for the late reply.
That's right, the partition string will be something like partition_1 = x, partition_2 = y if the partition type is static or partition_1, partition_2 if the partition is dynamic.

kedro/extras/datasets/spark/spark_hive_dataset.py Outdated Show resolved Hide resolved
kedro/extras/datasets/spark/spark_hive_dataset.py Outdated Show resolved Hide resolved
@jiriklein
Copy link
Contributor

Hi @brendalf - I'm looking at this and I think SparkHiveDataSet deserves a proper rewrite to:

  1. Align with the wider kedro.extras.datasets API, especially with SparkDataSet
  2. Avoid as many SQL query string injections as possible
    I'll be looking at this in the next couple of days so please let me know if you have any questions or concerns.

@jiriklein
Copy link
Contributor

Hi @brendalf
After some research, my proposal is to replace the current _save() functionality with the following:

def _create_hive_table(self, data: DataFrame, mode: str = None):
    _mode: str = mode or self._write_mode
    data.write.saveAsTable(
        self._full_table_address,
        mode=_mode,
        format=self._format,
        partitionBy=self._partition_by,
        **self._save_args,
    )

And removing the StagedDataSet altogether. This then fully supports your use-case for leveraging partitionBy. Does this make sense to you and your use-case?

@brendalf
Copy link
Author

Hi @jiriklein, how are you?
Sorry for the late reply.

I agree with you that SparkHiveDataSet deserves a proper rewrite and your proposal LGTM.

@jiriklein
Copy link
Contributor

Hi @brendalf, hope you're well!
SparkHiveDataSet has now been fully rewritten and partitionBy support has been added, including access to other save_args.
You can find the changes in the latest develop code. If you wish to wait for a proper release, you can expect these changes to materialise in version 0.18.0.
Hope this helps!

@jiriklein jiriklein closed this May 5, 2021
@jiriklein jiriklein self-assigned this May 5, 2021
@brendalf brendalf deleted the feature/partition-args-in-sparkhive-dataset branch May 5, 2021 15:24
@brendalf
Copy link
Author

brendalf commented May 5, 2021

Perfect!

@WaleedxxxxjJz
Copy link

Description

SparkHiveDataSet doesn't not support partitions like SparkDataSet. As discussed in #725, @AntonyMilneQB proposed to implement this by adding a new argument partition_by inside SparkHiveDataSet constructor.

The partition_by argument was meant to be the partition where the data should be inserted, eg: partition_by="ref = 1", to be used later, inside _save, like this insert into {table} partition ({partition_by}) select * from tmp.

I'm opening this PR as a draft for discussion because I ran over some problems while I was implementing this approach.

Resolves #725

Development notes

Here are the problems:

1. If the table that doesn't exist yet

In this case, the SparkHiveDataSet is going to create with _create_empty_hive_table(self, data) but the partition_by can be used here straight away because _create_empty_hive_table needs a list of names and types to make the partition, something like PARTITIONED BY (ref integer).

I tried to replace the partition_by argument with the partitioned_by argument, to be used inside the create table, and add a new argument partition, that handles the partition where the data should be inserted, to _save. But to do that, we would need to change the save method of AbstractDataSet, so I discarded it.

I also tried to increase the @AntonyMilneQB approach by implementing two arguments to the SparkHiveDataSet constructor: partition (handles where the data should be inserted) and partitioned_by (handles the columns where the table should be partitioned). The problem here is that the _create_empty_hive_table(self, data) use CREATE TABLE AS SELECT (CTAS) and we can't use PARTITIONED BY with it. The can solve this by replacing the CTAS with the regular CREATE TABLE, but I don't know how to get the list of name and valid Spark SQL data types from the data variable that is passed to _create_empty_hive_table(self, data).

What everyone thinks? Do you know how to implement the create table with partition? Can you see a different approach to solve problem 1?

For now, the implementation reflects the @AntonyMilneQB approach and it only works if the partitioned table is already created.

But that leaves us if the second problem.

2. How can we validate partitioned tables?

The SparkHiveDataSet uses the .load() to generate the self._table_columns when the table already exists, but the load method loads the table with all columns, including the partitioned by columns, and that makes the self._validate_save fails because the user table to insert doesn't have the partition columns.

I tried to get around this by getting the partition columns names and remove these columns from self._table_columns, but I don't know if this is the best approach.

What everyone thinks?

Checklist

  • Read the contributing guidelines

  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress

  • Updated the documentation to reflect the code changes

  • Added a description of this change and added my name to the list of supporting contributions in the RELEASE.md file

  • Added tests to cover my changes

Notice

  • I acknowledge and agree that, by checking this box and clicking "Submit Pull Request":

  • I submit this contribution under the Apache 2.0 license and represent that I am entitled to do so on behalf of myself, my employer, or relevant third parties, as applicable.

  • I certify that (a) this contribution is my original creation and / or (b) to the extent it is not my original creation, I am authorised to submit this contribution on behalf of the original creator(s) or their licensees.

  • I certify that the use of this contribution as authorised by the Apache 2.0 license does not violate the intellectual property rights of anyone else.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Partition as args in SparkHiveDataSet
4 participants