Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQL MERGE in the Trino engine and five connectors #7933

Merged

Conversation

djsstarburst
Copy link
Member

@djsstarburst djsstarburst commented May 16, 2021

This PR is a second take on implementing SQL MERGE. It consists commits that add support for SQL MERGE in the Trino engine and in the Hive, Kudu, Raptor, Iceberg and Delta Lake connectors. The implementation is structured so that most of the work happens in the Trino engine, so adding support in a connector is pretty simple.

The SQL MERGE implementation allows update of all columns, including partition or bucket columns, and the Trino engine performs redistribution to ensure that the updated rows end up on the appropriate nodes.

The Trino engine commit introduces an enum RowChangeParadigm, which characterizes how a connector modifies rows. Hive uses and Iceberg will use the DELETE_ROW_AND_INSERT_ROW paradigm, since they represent an updated row as a deleted row and an inserted row. Kudu uses the CHANGE_ONLY_UPDATED_COLUMNS paradigm.

Each paradigm corresponds to an implementation of the RowChangeProcessor interface. After this PR is merged, the intent is to retrofit SQL UPDATE to use the same RowChangeParadigm/Processor mechanism.

Extensive documentation on the internal MERGE architecture can be found in the developer doc supporting-merge.rst.

Fixes #7708

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kudu commit looks good

@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch from d65286e to f88718f Compare May 26, 2021 00:07
@djsstarburst
Copy link
Member Author

djsstarburst commented May 26, 2021

Thanks for the great comments, @electrum. I did everything you suggested.

@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch 2 times, most recently from 3108a8d to db83bfe Compare May 27, 2021 13:27
Copy link
Member

@kasiafi kasiafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of questions and some comments. I've gone through the docs, and partially through the analysis.

docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
Copy link
Member

@kasiafi kasiafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments regarding the analyzer. Initial comments on the planner part.

@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch 2 times, most recently from 1b878ef to 238eb2d Compare June 16, 2021 17:31
@djsstarburst
Copy link
Member Author

djsstarburst commented Jun 16, 2021

Thanks for the great first batch of comments, @kasiafi! I believe I've addressed the comments from yesterday except those listed below. It would be great if you could resolve the comments you think have been handled to your satisfaction.

I haven't addressed the more profound comments made 4 hours ago yet, and some of them will require coaching from you or @martint.

Here are the comments from yesterday that I haven't addressed:

  • Does DuplicateRowFinder need to compare the writeRedistribution columns?
  • Will matched target table rowIds really come out in order such that DuplicateRowFinder is guaranteed to identify them?
  • Implementing multiple assignment.
  • Addressing your comment: "Instead of assigning a scope to an Identifier, the aliased table should parse as AliasedRelation."
  • Addressing your comment: "What if the table was a materialized view?"

@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch 2 times, most recently from 6038c7f to b373e2b Compare June 16, 2021 19:03
@findepi
Copy link
Member

findepi commented Jun 17, 2021

re #7933 (comment)

target table rowIds would be partitioned among nodes

@djsstarburst can you please point me to a document outlining how MERGE interacts with connectors?

i would like to learn about the following

  • what are the assumption on rowIds, can rowIds carry un-updated columns
  • how should a connector construct rowIds if it needs to create deletion delta files for the sake of updates (e.g. a separate deletion file for an input file which would mark all the rows that got updated)
  • what is table handle lifecycle for MERGE. for example, how MERGE interacts with partition, file and file chunk pruning

Copy link
Member

@kasiafi kasiafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some comments regarding the previously reviewed part. Additionally, I answered some of your replies directly. I resolved all conversations except those that require a follow-up.

I plan to review next portions of code, and put my comments in a new batch.

Comment on lines 225 to 228
if (underlyingBlock instanceof RowBlock) {
List<Block> newRowIdChildrenBuilder = new ArrayList<>();
rowIdBlock.getChildren().stream()
.map(block -> block.getPositions(rowIdPositions, 0, totalPositions))
.forEach(newRowIdChildrenBuilder::add);
return RowBlock.fromFieldBlocks(
totalPositions,
Optional.empty(),
newRowIdChildrenBuilder.toArray(new Block[] {}));
}
else {
return rowIdBlock.getPositions(rowIdPositions, 0, totalPositions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why RowBlock is special-cased here?
What if underlyingBlock is a DictionaryBlock over a RowBlock? Would it require special-casing as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had endless trouble with this, and it's one of the main things I hoped review would shed light on.

I had hoped that I could just call rowIdBlock.getPositions(...) and end up with a consistent view of the resulting block. However, when I tried that, way downstream in the Driver I would see out-of-range array references. My assumption is that I'm doing something wrong, but I wasn't successful debugging the problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had endless trouble with this, and it's one of the main things I hoped review would shed light on.

Sorry that i cannot help. Add a TODO comment here, warning the reader we don't exactly know why it's written the way it's written

Comment on lines 244 to 230
Arrays.fill(nulls, true);
if (underlyingBlock instanceof RowBlock) {
return RowBlock.fromFieldBlocks(positionCount, Optional.of(nulls), rowIdBlock.getChildren().toArray(new Block[]{}));
}
else {
return ArrayBlock.fromElementBlock(positionCount, Optional.of(nulls), new int[positionCount], underlyingBlock);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this actually depend on rowIdType?

also, direct use of ArrayBlock is not correct. Typically you would use io.trino.spi.type.Type#createBlockBuilder(io.trino.spi.block.BlockBuilderStatus, int) to construct a block of values for given type.

Here, however, you actually want to create a single-value NULL block (nativeValueToBlock may be helpful) and wrap it in a RunLengthEncodedBlock instead

Copy link
Member

@kasiafi kasiafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments and questions regarding the planner part. I still have a few classes to review.

@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch 2 times, most recently from f4a18f7 to 083ab11 Compare June 17, 2021 15:10
docs/src/main/sphinx/sql/merge.rst Show resolved Hide resolved
docs/src/main/sphinx/sql/merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/merge.rst Show resolved Hide resolved
docs/src/main/sphinx/sql/merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/develop/supporting-merge.rst Outdated Show resolved Hide resolved
@djsstarburst djsstarburst force-pushed the david.stryker/support-sql-merge-final branch from fb91326 to 541f751 Compare August 2, 2022 23:14
@electrum electrum force-pushed the david.stryker/support-sql-merge-final branch 3 times, most recently from b6beecb to 16f4b38 Compare August 3, 2022 21:15
@electrum electrum force-pushed the david.stryker/support-sql-merge-final branch 5 times, most recently from 361e835 to 3a2089a Compare August 4, 2022 03:39
electrum and others added 10 commits August 4, 2022 14:47
This version works under emulation on M1 Macs.
This allows the engine to make the decision about how many nodes to
use as appropriate, based on the number of workers or hash partition
count session property. This is also required for MERGE so that the
insert and update layouts can use the same mapping.
This commit adds support for SQL MERGE in the Trino engine.
It introduces an enum RowChangeParadigm, which characterizes
how a connector modifies rows.  Hive and Iceberg will use the
DELETE_ROW_AND_INSERT_ROW paradigm, since they represent an
updated row as a deleted row and an inserted row.  Kudu will
use the CHANGE_ONLY_UPDATED_COLUMNS paradigm.

Each paradigm corresponds to an implementation of the
RowChangeProcessor interface.  The intent is to retrofit SQL
UPDATE to use the same RowChangeParadigm/Processor mechanism.

The SQL MERGE implementation allows update of all columns,
including partition or bucket columns, and the Trino engine
performs redistribution to ensure that the updated rows
end up on the appropriate nodes.

MERGE processing is extensively documented in the new
file in the developer documentation, supporting-merge.rst.
This commit adds SQL MERGE support in the Hive connector and
a raft of MERGE tests to verify that it works.
@electrum electrum force-pushed the david.stryker/support-sql-merge-final branch from 3a2089a to 1d2fabd Compare August 4, 2022 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

MERGE statement
9 participants