-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support SQL MERGE in the Trino engine and Hive and Kudu connectors #7386
Support SQL MERGE in the Trino engine and Hive and Kudu connectors #7386
Conversation
c8e9014
to
55edb9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not a review)
{ | ||
throw new UnsupportedOperationException("This connector does not support row merge"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the connector may need to do a prolonged IO operation.
if it does it synchronously, the query may be non-cancellable
add CompletableFuture<?> isBlocked()
and use it in MergeOperator.isBlocked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As side conversations confirmed, this is correctly handled by the fact that AbstractRowChangeOperator
and ScanFilterAndProjectOperator
both talk to the pageSource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me where AbstractRowChangeOperator
can return blocked?
i.e. It does not feel correct to push more updates into an updatable page source that declares "i am blocked", does it?
(also, if something required side conversations to confirm, it needs to be documented)
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/RowChangeParadigm.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/RowChangeParadigm.java
Outdated
Show resolved
Hide resolved
e8e58b8
to
f637166
Compare
General question.. how do you plan to document this in the connectors. Specifically what do you document in the connectors where it is supported (Kudu and Hive) and what do we document in all others? I suggest that we add a Limitations section to all other connectors and add wdyt @electrum ? |
This is a repeated suggestion from @mosabua, and there is certainly a problem to be resolved - - how can you tell if a connector supports DELETE or UPDATE or MERGE? There seem to be three choices:
I'm not sure which alternative is best, but the current situation, where you can only tell if a connector supports an operation by trying it and seeing it fail doesn't seem ideal. |
f637166
to
e1d32ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only gone through a part of the execution, and took a look at the StatementAnalyzer
.
I had difficulties understanding the page transformations.
One thing I'm concerned about is synthesizing AST in the StatementAnalyzer
. That logic should probably be moved to the Planner.
core/trino-main/src/main/java/io/trino/operator/ChangeOnlyUpdatedColumnsMergeProcessor.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/ChangeOnlyUpdatedColumnsMergeProcessor.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/DuplicateRowFinder.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/RowChangeProcessor.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/RowChangeProcessor.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
// The final version of MergeAnalysis, with the finalQuery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The finalQuery
should not be created in the Analyzer. This belongs to the Planner. cc @martint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get @martint's take on it.
Did you take a look at https://github.com/djsstarburst/trino/blob/david.stryker/support-sql-merge/docs/src/main/sphinx/develop/supporting-merge.rst? Can you suggest where additional documentation would make it clearer?
That's what I started out doing. What I found was that it seemed to require re-inventing all the analysis that went on in the StatementAnalysis phase. Moreover, the result seemed fragile. The implementation of I'd like to get @martint's thoughts on this question. |
e3ca571
to
1944681
Compare
} | ||
|
||
@Test(groups = HIVE_TRANSACTIONAL, timeOut = TEST_TIMEOUT) | ||
public void testMergeUnBucketedUnPartitionedFailure() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What Failure is here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a log line at the end that implies the problem:
`log.info("Verifying MERGE on Hive fails - - and it shouldn't");`
But it needs a real comment. Here is the comment I added:
/**
* This test demonstrates a failure of Hive to verify the result of a MERGE operation,
* specifically, Hive fails to recognize the delete_delta file written by the MERGE. I
* captured the HDFS delta and delete_delta files and verified that they are correct.
* I used Wireshark to capture the traffic between Trino and the Hive metastore during
* the MERGE, and it was all as expected. I tried to vary the test to understand the
* issue, but almost any change I made to the test caused Hive to correctly verify the
* MERGE.
* TODO: Determine what is causing the Hive verification failure
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. This looks like a bug somewhere and requires an action item. Can you please make sure this gets properly addressed?
|
||
String sql = format("MERGE INTO %s t USING %s s ON (t.purchase = s.purchase)", targetTable, sourceTable) + | ||
" WHEN MATCHED AND s.purchase = 'limes' THEN DELETE" + | ||
" WHEN MATCHED THEN UPDATE SET customer = CONCAT(t.customer, '_', s.customer)" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a test that would update the partition key (purchase here)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several of the tests update partition and bucket columns. For example, testMergeSimpleSelectPartitioned
updates the address
partition column at: ccc584f#diff-ee4c5c10f12500119c780c8c1ba6b88df410f13b0282bd69e315fe32c5938ffaR1336
However, I would love to have suggestions for more tests to write. I'm always happy to write more tests, but lack imagination as to what would be good ones 😞
1944681
to
9212cdd
Compare
core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java
Outdated
Show resolved
Hide resolved
3c58d20
to
f78f9b8
Compare
a974359
to
777dfce
Compare
This commit adds support for SQL MERGE in the Trino engine. It introduces an enum RowChangeParadigm, which characterizes how a connector modifies rows. Hive and Iceberg will use the DELETE_ROW_AND_INSERT_ROW paradigm, since they represent an updated row as a deleted row and an inserted row. Kudu will use the CHANGE_ONLY_UPDATED_COLUMNS paradigm. Each paradigm corresponds to an implementation of the RowChangeProcessor interface. The intent is to retrofit SQL UPDATE to use the same RowChangeParadigm/Processor mechanism. The SQL MERGE implementation allows update of all columns, including partition or bucket columns, and the Trino engine performs redistribution to ensure that the updated rows end up on the appropriate nodes.
This commit adds SQL MERGE support in the Hive connector and a raft of MERGE tests to verify that it works.
777dfce
to
ee06f4e
Compare
This PR is closed in favor of the improved implementation in #7933 |
This PR consists of three commits that add support for SQL MERGE in the Trino engine, the Hive connector and the Kudu connector. The implementation is structured so that most of the work happens in the Trino engine, so adding support in a connector is pretty simple.
The SQL MERGE implementation allows update of all columns, including partition or bucket columns, and the Trino engine performs redistribution to ensure that the updated rows end up on the appropriate nodes.
The Trino engine commit introduces an enum
RowChangeParadigm
, which characterizes how a connector modifies rows. Hive uses and Iceberg will use theDELETE_ROW_AND_INSERT_ROW
paradigm, since they represent an updated row as a deleted row and an inserted row. Kudu uses theCHANGE_ONLY_UPDATED_COLUMNS
paradigm.Each paradigm corresponds to an implementation of the
RowChangeProcessor
interface. After this PR is merged, the intent is to retrofit SQL UPDATE to use the same RowChangeParadigm/Processor
mechanism.Extensive documentation on the internal MERGE architecture can be found in the developer doc supporting-merge.rst.
This is a big, complicated PR, and requires close review. @kasiafi, I'm hoping can find time to do your usual excellent job of identifying what doesn't make sense and what needs to be improved.
For #7708