New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement CREATE OR REPLACE TABLE for delta lake connector #19991
Implement CREATE OR REPLACE TABLE for delta lake connector #19991
Conversation
b2a6b28
to
3ad2c65
Compare
{ | ||
String tableName = "test_replace_table_with_schema_change_" + randomNameSuffix(); | ||
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (ts VARCHAR) " + | ||
"with (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', checkpoint_interval = 10)"); | ||
try { | ||
ImmutableList.Builder<QueryAssert.Row> expected = ImmutableList.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why necessarily is this change necessary. The code after this change looks slightly the same as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are extracting the INSERT operations to a static method so as to reuse them in the next commit.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
tableHandle.getProjectedColumns(), | ||
session); | ||
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties()); | ||
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test covering the fact that the extended statistics of the table are missing after CREATE OR REPLACE
table.
Maybe use extended_statistics_collect_on_write
session property on CREATE OR REPLACE TABLE AS SELECT
to showcase that the old statistics have been removed.
statisticsAccess.deleteExtendedStatistics(session, schemaTableName, location); | ||
} | ||
else { | ||
setRollback(() -> deleteRecursivelyIfExists(fileSystem, deltaLogDirectory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that this branch is for the situation where the transaction log exists , but we're not dealing with a CREATE OR REPLACE TABLE
statement.
This is however handled through the check
if (!replaceExistingTable && transactionLogFileExists) {
throw new TrinoException(
NOT_SUPPORTED,
"Using CREATE TABLE with an existing table content is disallowed, instead use the system.register_table() procedure.");
}
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
try { | ||
Location transactionLogDir = Location.of(getTransactionLogDir(location)); | ||
fileSystemFactory.create(session).deleteDirectory(transactionLogDir); | ||
if (!writeCommitted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeCommited
logic could be added in a preparatory commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the writeCommited logic is very specific to CORTAS operation so I'm not sure if adding them in a preparatory commit would help us here. WDYT ?
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java
Outdated
Show resolved
Hide resolved
"CREATE OR REPLACE TABLE " + table.getName() + " (a BIGINT) WITH (location = '%s_2')".formatted(location), | ||
"The provided location '%1$s_2' does not match the existing table location '%1$s'".formatted(location)); | ||
|
||
assertLatestTableOperation(table.getName(), CREATE_TABLE_OPERATION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe verify that the table location has not changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added additional assertion.
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java
Show resolved
Hide resolved
plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java
Show resolved
Hide resolved
e11de06
to
110b7ab
Compare
7b2c1b6
to
65a9146
Compare
assertThat(onTrino().executeQuery("SELECT to_iso8601(ts) FROM delta.default." + tableName)).containsOnly(expectedRows); | ||
} | ||
finally { | ||
dropDeltaTableWithRetry(tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use dropDeltaTableWithRetry here because the method is for Glue environment. Same for others, but please feel free to ignore as it's pre-existing.
dropDeltaTableWithRetry(tableName); | |
onTrino().executeQuery("DROP TABLE delta.default." + tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can handle it as a follow-up.
String tableName = "test_replace_table_with_schema_change_" + randomNameSuffix(); | ||
|
||
onTrino().executeQuery("CREATE TABLE delta.default." + tableName + " (ts VARCHAR) " + | ||
"with (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "', checkpoint_interval = 10)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Uppercase with
. Please feel free to ignore as it's pre-existing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
assertQueryFailure(() -> onDelta().executeQuery("" + | ||
"CREATE OR REPLACE TABLE default." + tableName + | ||
" (a INT,c INT) " + | ||
" USING delta " + | ||
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + | ||
" TBLPROPERTIES ('delta.appendOnly' = true)")) | ||
.hasMessageContaining("This table is configured to only allow appends"); | ||
|
||
assertQueryFailure(() -> onTrino().executeQuery("" + | ||
"CREATE OR REPLACE TABLE delta.default." + tableName + " (a INT,c INT)" + | ||
" WITH (location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "')")) | ||
.hasMessageContaining("Cannot replace a table when 'delta.appendOnly' is set to true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Delta Lake allow replacing tables when delta.appendOnly
is true in the existing table and the delta.appendOnly
is false in the new table definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think delta lake doesn't allow to replace tables with change in delta.appendOnly
it is being tested in L: 130 or is it something different ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to know what happens if we set 'delta.appendOnly' = false
at L135.
public void testCreateOrReplaceTableOnAppendOnlyTableFails() | ||
{ | ||
String tableName = "test_replace_on_append_only_table_fails_" + randomNameSuffix(); | ||
onDelta().executeQuery("" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be nice to use try-catch for consistency in this class.
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
...rino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java
Outdated
Show resolved
Hide resolved
assertLatestTableOperation(tableName, CREATE_OR_REPLACE_TABLE_OPERATION); | ||
} | ||
finally { | ||
assertUpdate("DROP TABLE " + tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This DROP TABLE may throw exceptions if CREATE OR REPLACE TABLE failed for some reason. I would move CREATE OR REPLACE TABLE before try
. Same for others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about DROP TABLE IF EXISTS
?
45d4af4
to
d9559b5
Compare
@ebyhr / @findinpath Thanks a lot for the review. AC |
core/trino-main/src/main/java/io/trino/sql/rewrite/ShowStatsRewrite.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % left-over
d9559b5
to
d9d6bc8
Compare
Pls rebase to address the code conflicts |
d9d6bc8
to
af8977a
Compare
assertTableOperationWithChangeInColumnMappingMode("name"); | ||
} | ||
|
||
public void assertTableOperationWithChangeInColumnMappingMode(String columnMappingMode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: testTableOperationWithChangeInColumnMappingMode sounds better. I expect assert*
methods do simple assertions. This method contains testing data setup.
assertThat(query("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*$', '') FROM " + tableName)) | ||
.skippingTypesCheck() | ||
.matches("VALUES '%s'".formatted(expectedLocation)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could use computeScalar
method to avoid skippingTypesCheck method:
assertThat(computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*$', '') FROM " + tableName))
.isEqualTo(expectedLocation);
By the way, this method doesn't work for empty tables and is inefficient when table contains many rows. That's why we use SHOW CREATE TABLE result in other Delta Lake tests.
Lines 357 to 367 in ac71d15
protected String getTableLocation(String tableName) | |
{ | |
Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); | |
Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()); | |
if (m.find()) { | |
String location = m.group(1); | |
verify(!m.find(), "Unexpected second match"); | |
return location; | |
} | |
throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that method was not used and we have replaced with SHOW CREATE
type of assertion. Removed that method.
af8977a
to
26b0e08
Compare
@ebyhr Thanks a lot for the review once again. |
/test-with-secrets sha=26b0e08b91321bb78fafaee70b87c8ec50db4e7a |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8648760016 |
26b0e08
to
4f82268
Compare
/test-with-secrets sha=4f82268a69d8ac97eba47d30138194a9c59ac593 |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/8657990824 |
4f82268
to
fb0a6f3
Compare
/test-with-secrets sha=fb0a6f37155b190956efc664562f10a806f3765f |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8658052735 |
...io/trino/tests/product/deltalake/TestDeltaLakeCreateOrReplaceTableAsSelectCompatibility.java
Outdated
Show resolved
Hide resolved
fb0a6f3
to
98e5548
Compare
/test-with-secrets sha=98e554834032de0a5417fcddf3374f08a4adbd29 |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/8665068329 |
Description
Introduces
CREATE OR REPLACE TABLE
operation for Delta lake connector. It comes with some restrictions like we can't run CORTAS on table with cdc enabled or on cdc enabled roles.Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: