Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support DataSourceV2 sources #321

Closed
wants to merge 6 commits into from

Conversation

andrei-ionescu
Copy link
Contributor

@andrei-ionescu andrei-ionescu commented Jan 11, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for DataSourceV2.

The following changes are in this PR and each of them are separate commits:

  • Use LogicaPlan instead of LogicalRelation. This gives us the possibility to add support for other kinds of relations like Spark datas source version 2.
  • Add support for DataSourceV2.

Does this PR introduce any user-facing change?

The source interfaces has changed. Now instead of using LogicalRelation as parameter type it now uses LogicalPlan. There has been added support for DataSourceV2 sources.

Detailed information can be found in the #318 proposal.

How was this patch tested?

  1. All already present tests are passing
  2. Locally & Databricks Runtime tests based on the Hyperspace usage API in Apache Spark.

@andrei-ionescu andrei-ionescu changed the title Datasourcev2 Support DataSourceV2 sources Jan 11, 2021
Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, as there's no test, could you open the next PR including IceBergIntegrationTest based on this PR? so that we could check the code works as expected.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me!

Could you add a simple test case for "delete" hybrid scan in IcebergIntegrationTest?
Since Hybrid Scan test refactoring is on the way (#274), it's difficult to add Iceberg Hybrid Scan tests until the refactoring change is merged.

@imback82 Could you have a look at this? Thanks!

@andrei-ionescu
Copy link
Contributor Author

@sezruby The integration test for Iceberg is in the #320. I cannot add a test for Iceberg in this PR because this one is for DataSourceV2 support only.

@sezruby
Copy link
Collaborator

sezruby commented Jan 14, 2021

@andrei-ionescu Yep, I checked it; the request is adding a test case for Hybrid Scan with deleted files. You can test it by:

  • create index with lineage column & verify index application
    • use partitioned data to remove files easily & test partition spec
  • remove few files in the source data
  • verify index application with hybrid scan disabled => shouldn't be applied
  • verify index application with hybrid scan enabled

Please use TestConfig.HybridScanEnabled to enable Hybrid Scan. The hybrid scan configs are updated today https://github.com/microsoft/hyperspace/pull/300/files#diff-0f1f2bb40109f283e9c668b4755f3a68aa2ca68653d4a80c8f39de345f86cc6eR111

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Jan 14, 2021

@sezruby I integrated this second round of feedback and rebased the PR on top of today's master.

In regards to the "Hybrid Scan with deleted files" I have some questions:

  1. Is there a test for it for non-DataSourceV2 sources that I can use as inspiration?
  2. What to you see so specific to Hybrid Scan + DataSourceV2 that is not covered by the already present tests?

There is a complexity added to it: DataSourceV2 needs an implementation (like IcebergSource) which I don't know what it means in terms of implementation effort.

I'll add the test on the IcebergIntegrationTests on #320.

@sezruby
Copy link
Collaborator

sezruby commented Jan 14, 2021

@andrei-ionescu
1 => You can refer
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file gets deleted and some appended to source data." in DeltaLakeIntegrationTest, except for the refreshIndex part in the test. If hybrid scan config is enabled, we don't need to refresh index. In the test, there're both deleted & appended files, but we could test with deleted files. (and both)

2 => Yes, we need to check the exact plan transformation of Hybrid Scan + Datasource v2, but let's just check if the plan is transformed or not for now. Few things I'd like to check:

  • deleted files after index creation => Hybrid Scan should be applied
  • partitioned source data & appended files after index creation => Hybrid Scan should handle the appended data properly

And could you share the result of query.queryExecution.optimizedPlan/sparkPlan after applying the index + Hybrid Scan on IcebergSource? for the following case:

  • appended files only (the current test)
  • deleted files only
  • appended files + deleted files

Yep I'll check the IcebergIntegrationTest in #320.

Thanks for the work 👍 👍

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Jan 14, 2021

@sezruby The DataSourceV2, DataSourceV2Relation are just API. They don't necessary have anything to do with files on disk. There can be implementation over files on disk but in the same time it can be over other means (like a Kafka reader). This is very similar to LogicalRelation where you can have relations over files or not.

Asking for a test for DataSourceV2 (DataSourceV2Relation) is like asking for tests over LogicalRelation.

If I try to add a test for it first I need to implement the source based on the DataSourceV2 API and with some file attached logic which has to be used for tests. This would mean to add both the new source and the new *FilesBasedSource in index/sources.

IcebergSource is an implementation over DataSourceV2 API and works with files. It's the perfect place for such test.

@sezruby
Copy link
Collaborator

sezruby commented Jan 14, 2021

Sorry for the confusion, I meant Hybrid Scan + Iceberg. Please add the tests in #320 :)

We need to make sure that this change works as expected before merging it.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Jan 14, 2021

@sezruby I added the requested tests in https://github.com/microsoft/hyperspace/pull/320/files#diff-ce1f32f296e1683385beb0fe1954b154710c0ba0120f028167afbe5953347dd3 similar to the DeltaLakeIntegrationTests ones.

BTW, it did show up a place where I missed adding the pattern matching on DataSourceV2Relation, so it was a good call to have them added in #320. Thanks @sezruby!

@andrei-ionescu andrei-ionescu force-pushed the datasourcev2 branch 4 times, most recently from e65c924 to c74b9f5 Compare January 15, 2021 20:59
@sezruby sezruby requested a review from imback82 January 18, 2021 09:31
sezruby
sezruby previously approved these changes Jan 19, 2021
Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick review. I will do a more thorough review this week. Since this is touching the core parts, I want to make sure this is reviewed thoroughly.

@apoorvedave1 / @pirz Can you please take a look?

@@ -211,6 +213,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// Extract partition keys, if original data is partitioned.
val partitionSchemas = df.queryExecution.optimizedPlan.collect {
case LogicalRelation(HadoopFsRelation(_, pSchema, _, _, _, _), _, _, _) => pSchema
case DataSourceV2Relation(_, _, _, _, uSchema) => uSchema.getOrElse(StructType(Nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think we should have DataSourceV2Relation specific code here. Can we move this to the source provider API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both LogicalRelation and DataSourceV2Relation are on the same level. Both directly extend LeafNode. If LogicalRelation is present here I would say that DataSourceV2Relation should also be here, as in this PR we open up to DataSourceV2 Spark API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get your argument on the same level. Why can't we introduce partitionSchema to the source provider? I think we missed moving this into source provider since default/delta have the same implementation; we can have the different implementation (matching FileIndex) for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Delta is not built on top of DataSourceV2 Spark API thus it's not the same implementation.
  2. Both LogicalRelation and DataSourceV2Relation are first "child" from LeafNode, both directly extend LeafNode.
             LeafNode
              //  \\
             //    \\
            //      \\
LogicalRelation    DataSourceV2Relation
  1. This is the PR addressing support for DataSourceV2 which is Spark not Iceberg

Copy link
Contributor

@imback82 imback82 Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that rules shouldn't directly work with LogicalRelation or DataSourceV2Relation. I think we can abstract that out. Source provider can choose which relation it supports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I will create a PR to your branch this week.

case baseRelation @ LogicalRelation(
_ @HadoopFsRelation(location: FileIndex, _, _, _, _, _),
baseOutput,
_,
_) =>
val (filesDeleted, filesAppended) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrei-ionescu Could you also add some comments for "guides" in this file. I see that many lines are being moved / refactored without the logic being changed. It would help reviewers if you add something like "this part has moved to line blah, refactored to "foo", etc. Thanks.

@andrei-ionescu andrei-ionescu force-pushed the datasourcev2 branch 3 times, most recently from 3616107 to 37100fa Compare January 29, 2021 20:55
@imback82
Copy link
Contributor

imback82 commented Feb 2, 2021

@andrei-ionescu we are planning to do the next release at the end of February. I marked the milestone for this PR accordingly.

@imback82 imback82 added this to the February 2021 (v0.5.0) milestone Feb 2, 2021
@imback82 imback82 added the enhancement New feature or request label Feb 2, 2021
@andrei-ionescu
Copy link
Contributor Author

@imback82 Thanks! Is there anything else that I have to do on my side?

@imback82
Copy link
Contributor

imback82 commented Feb 2, 2021

Not at the moment. I will finish reviewing this PR this week. Thanks!

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrei-ionescu I went thru this PR once more. I see lots of code that depends on LogicalRelation and DataSourceV2Relation in rules. I think we can solve this by introducing one level of abstraction (so that rules don't need to worry about handling specific relations). I will create a PR to your branch in coming days.

@@ -211,6 +213,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// Extract partition keys, if original data is partitioned.
val partitionSchemas = df.queryExecution.optimizedPlan.collect {
case LogicalRelation(HadoopFsRelation(_, pSchema, _, _, _, _), _, _, _) => pSchema
case DataSourceV2Relation(_, _, _, _, uSchema) => uSchema.getOrElse(StructType(Nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get your argument on the same level. Why can't we introduce partitionSchema to the source provider? I think we missed moving this into source provider since default/delta have the same implementation; we can have the different implementation (matching FileIndex) for them.

val relation = makeHadoopFsRelation(index, v2Relation)
val updatedOutput =
output.filter(attr => relation.schema.fieldNames.contains(attr.name))
new LogicalRelation(relation, updatedOutput, None, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we lose anything by going from DataSourceV2Relation to LogicalRelation?

Copy link
Contributor Author

@andrei-ionescu andrei-ionescu Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is to overwrite the plan with the relation that the index dataset uses. The index uses Parquet files which are not related to DataSourceV2 API in any way.

@imback82
Copy link
Contributor

@andrei-ionescu Sorry for being late to create the PR. I just created a draft PR to your branch: andrei-ionescu#1.

I haven't finished it, but you can see that duplicate logics to handle LogicalRelation and DatasourceV2Relation can be removed by SourceRelation abstraction; and some of the SourceProvider APIs like allFiles can be moved to SourceRelation, which makes more sense.

I tried to finish the PR on your branch, but I am introducing some changes to the existing code and need to revert some of your code in RuleUtils.scala, so I think it may be better to create a fresh PR to master repo.

Are you OK with it? I will make you a co-author since you inspired the refactoring. And once the PR is done, you can convert this PR to to implement SourceRelation for DatasourceV2Relation, which should be simple. WDYT?

@andrei-ionescu
Copy link
Contributor Author

@imback82 I'm ok with it and thanks for your involvement. I would like to ask you to do it ASAP because I will need to update the implementation for supporting Iceberg too.

@imback82
Copy link
Contributor

Thanks, I'll have it by tomorrow (oof today)

case _ => false
}
case v2: DataSourceV2Relation =>
v2.options.exists(_.equals(IndexConstants.INDEX_RELATION_IDENTIFIER))
Copy link
Contributor

@imback82 imback82 Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: where are you injecting this option to v2.options? Looks like this will always be false?

@andrei-ionescu
Copy link
Contributor Author

Closing this PR because of the new work from @imback82 - PR #355. I created this new Iceberg format table related PR only: #358.

@andrei-ionescu andrei-ionescu deleted the datasourcev2 branch February 22, 2021 20:42
@imback82
Copy link
Contributor

@andrei-ionescu we are planning to do the next release at the end of February. I marked the milestone for this PR accordingly.

@andrei-ionescu would it be OK for you if we do the release at the end of March. It seems more realistic to do bi-monthly releases to pack more features. But if this is a blocker for you, we can do a minor release this week. Please let me know. Thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
advanced issue This is the tag for advanced issues which involve major design changes or introduction enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Support Iceberg table format [FEATURE REQUEST]: Add support for Iceberg table format
4 participants