Skip to content

Commit

Permalink
Implement CREATE OR REPLACE TABLE for delta lake connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 committed Mar 20, 2024
1 parent ff55da4 commit e11de06
Show file tree
Hide file tree
Showing 11 changed files with 756 additions and 44 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
Expand All @@ -43,6 +44,8 @@ public class DeltaLakeOutputTableHandle
private final ColumnMappingMode columnMappingMode;
private final OptionalInt maxColumnId;
private final String schemaString;
private final boolean replace;
private final OptionalLong readVersion;
private final ProtocolEntry protocolEntry;

@JsonCreator
Expand All @@ -58,6 +61,8 @@ public DeltaLakeOutputTableHandle(
@JsonProperty("schemaString") String schemaString,
@JsonProperty("columnMappingMode") ColumnMappingMode columnMappingMode,
@JsonProperty("maxColumnId") OptionalInt maxColumnId,
@JsonProperty("replace") boolean replace,
@JsonProperty("readVersion") OptionalLong readVersion,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
Expand All @@ -71,6 +76,8 @@ public DeltaLakeOutputTableHandle(
this.schemaString = requireNonNull(schemaString, "schemaString is null");
this.columnMappingMode = requireNonNull(columnMappingMode, "columnMappingMode is null");
this.maxColumnId = requireNonNull(maxColumnId, "maxColumnId is null");
this.replace = replace;
this.readVersion = requireNonNull(readVersion, "readVersion is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
}

Expand Down Expand Up @@ -149,6 +156,18 @@ public OptionalInt getMaxColumnId()
return maxColumnId;
}

@JsonProperty
public boolean isReplace()
{
return replace;
}

@JsonProperty
public OptionalLong getReadVersion()
{
return readVersion;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface DeltaLakeMetastore

void createTable(Table table, PrincipalPrivileges principalPrivileges);

void replaceTable(Table table, PrincipalPrivileges principalPrivileges);

void dropTable(SchemaTableName schemaTableName, String tableLocation, boolean deleteData);

void renameTable(SchemaTableName from, SchemaTableName to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public void createTable(Table table, PrincipalPrivileges principalPrivileges)
delegate.createTable(table, principalPrivileges);
}

@Override
public void replaceTable(Table table, PrincipalPrivileges principalPrivileges)
{
delegate.replaceTable(table.getDatabaseName(), table.getTableName(), table, principalPrivileges);
}

@Override
public void dropTable(SchemaTableName schemaTableName, String tableLocation, boolean deleteData)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.plugin.deltalake.DeltaLakeCdfPageSink.CHANGE_DATA_FOLDER_NAME;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CHANGE_DATA_FEED_COLUMN_NAMES;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CREATE_OR_REPLACE_TABLE_AS_OPERATION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CREATE_OR_REPLACE_TABLE_OPERATION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CREATE_TABLE_OPERATION;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand Down Expand Up @@ -137,7 +142,8 @@ protected QueryRunner createQueryRunner()
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
return switch (connectorBehavior) {
case SUPPORTS_REPORTING_WRITTEN_BYTES -> true;
case SUPPORTS_CREATE_OR_REPLACE_TABLE,
SUPPORTS_REPORTING_WRITTEN_BYTES -> true;
case SUPPORTS_ADD_FIELD,
SUPPORTS_AGGREGATION_PUSHDOWN,
SUPPORTS_CREATE_MATERIALIZED_VIEW,
Expand Down Expand Up @@ -1740,6 +1746,290 @@ public void testSupportedNonPartitionedColumnMappingNoneWrites()
testSupportedNonPartitionedColumnMappingWrites("write_stats_as_json_column_mapping_none", false);
}

@Test
public void testCreateOrReplaceTableOnNonExistingTable()
{
String tableName = "create_or_replace_table" + randomNameSuffix();
try {
assertUpdate("CREATE OR REPLACE TABLE " + tableName + " (id BIGINT)");
assertLatestTableOperation(tableName, CREATE_OR_REPLACE_TABLE_OPERATION);
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testCreateOrReplaceTableAsSelectOnNonExistingTable()
{
String tableName = "create_or_replace_table_as_select_" + randomNameSuffix();
try {
assertUpdate("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 1 as colA", 1);
assertLatestTableOperation(tableName, CREATE_OR_REPLACE_TABLE_AS_OPERATION);
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testCreateOrReplaceTableAsSelectWithSwappedColumns()
{
testCreateOrReplaceTableAsSelectWithSwappedColumns(ColumnMappingMode.ID);
testCreateOrReplaceTableAsSelectWithSwappedColumns(ColumnMappingMode.NAME);
testCreateOrReplaceTableAsSelectWithSwappedColumns(ColumnMappingMode.NONE);
}

private void testCreateOrReplaceTableAsSelectWithSwappedColumns(ColumnMappingMode columnMappingMode)
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_create_or_replace_with_column",
" AS SELECT 'abc' colA, BIGINT '1' colB")) {
assertThat(query("SELECT colA, colB FROM " + table.getName()))
.matches("VALUES (CAST('abc' AS VARCHAR), BIGINT '1')");

assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (column_mapping_mode='" + columnMappingMode.name() + "') AS SELECT BIGINT '42' colA, 'def' colB", 1);

assertThat(query("SELECT colA, colB FROM " + table.getName()))
.matches("VALUES (BIGINT '42', CAST('def' AS VARCHAR))");

assertLatestTableOperation(table.getName(), CREATE_OR_REPLACE_TABLE_AS_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableChangeUnpartitionedTableIntoPartitioned()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " AS SELECT BIGINT '22' a, CAST('some data' AS VARCHAR) b")) {
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (partitioned_by=ARRAY['a']) AS SELECT BIGINT '42' a, 'some data' b UNION ALL SELECT BIGINT '43' a, 'another data' b", 2);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (BIGINT '42', CAST('some data' AS VARCHAR)), (BIGINT '43', CAST('another data' AS VARCHAR))");

assertLatestTableOperation(table.getName(), CREATE_OR_REPLACE_TABLE_AS_OPERATION);

assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName()))
.matches("CREATE TABLE delta.test_schema.%s \\(\n".formatted(table.getName()) +
" a bigint,\n" +
" b varchar\n" +
"\\)\n" +
"WITH \\(\n" +
" location = '.*',\n" +
" partitioned_by = ARRAY\\['a']\n" +
"\\)");
}
}

@Test
public void testCreateOrReplaceTableChangePartitionedTableIntoUnpartitioned()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_create_or_replace_",
" WITH (partitioned_by=ARRAY['a']) AS SELECT BIGINT '42' a, 'some data' b UNION ALL SELECT BIGINT '43' a, 'another data' b")) {
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT BIGINT '42' a, 'some data' b UNION ALL SELECT BIGINT '43' a, 'another data' b", 2);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (BIGINT '42', CAST('some data' AS VARCHAR)), (BIGINT '43', CAST('another data' AS VARCHAR))");

assertLatestTableOperation(table.getName(), CREATE_OR_REPLACE_TABLE_AS_OPERATION);

assertThat((String) computeScalar("SHOW CREATE TABLE " + table.getName()))
.matches("CREATE TABLE delta.test_schema.%s \\(\n".formatted(table.getName()) +
" a bigint,\n" +
" b varchar\n" +
"\\)\n" +
"WITH \\(\n" +
" location = '.*'\n" +
"\\)");
}
}

@Test
public void testCreateOrReplaceTableTableCommentIsRemoved()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_", " (a BIGINT) COMMENT 'This is a table'")) {
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (a BIGINT COMMENT 'This is a column')");
assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName());

assertThat(getColumnComment(table.getName(), "a"))
.isEqualTo("This is a column");
assertThat(getTableComment(getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow(), table.getName()))
.isNull();
assertLatestTableOperation(table.getName(), CREATE_OR_REPLACE_TABLE_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableWithEnablingCdcProperty()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_with_cdv", " (a BIGINT)")) {
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " (c BIGINT) WITH (change_data_feed_enabled = true)",
"CREATE OR REPLACE is not supported for tables with change data feed enabled");
}
}

@Test
public void testCreateOrReplaceTableAsWithEnablingCdcProperty()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_with_cdv", " (a BIGINT)")) {
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " WITH (change_data_feed_enabled = true) AS SELECT 1 new_column",
"CREATE OR REPLACE is not supported for tables with change data feed enabled");
}
}

@Test
public void testCreateOrReplaceOnCdcEnabledTables()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_with_cdv", " (a BIGINT) WITH (change_data_feed_enabled = true)")) {
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " (d BIGINT)",
"CREATE OR REPLACE is not supported for tables with change data feed enabled");
}
}

@Test
public void testCreateOrReplaceTableAsOnCdcEnabledTables()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_with_cdv", " (a BIGINT) WITH (change_data_feed_enabled = true)")) {
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 1 new_column",
"CREATE OR REPLACE is not supported for tables with change data feed enabled");
}
}

@Test
public void testCreateOrReplaceTableWithSameLocationForManagedTable()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_create_or_replace_with_same_location_",
" (a BIGINT)")) {
HiveMetastore metastore = TestingDeltaLakeUtils.getConnectorService(getDistributedQueryRunner(), HiveMetastoreFactory.class)
.createMetastore(Optional.empty());
String location = metastore.getTable("test_schema", table.getName()).orElseThrow().getStorage().getLocation();
assertTableType("test_schema", table.getName(), MANAGED_TABLE.name());
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (location = '" + location + "') AS SELECT 'abc' as colA", 1);
assertTableType("test_schema", table.getName(), MANAGED_TABLE.name());
}
}

@Test
public void testCreateOrReplaceTableWithChangeInLocationForManagedTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace_change_location_", " (a BIGINT) ")) {
String location = "s3://%s/%s".formatted(bucketName, randomNameSuffix());
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " (a BIGINT) WITH (location = '%s')".formatted(location),
"The provided location '%s' does not match the existing table location '.*'".formatted(location));

assertLatestTableOperation(table.getName(), CREATE_TABLE_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableWithChangeInLocationForExternalTable()
{
String location = "s3://%s/%s".formatted(bucketName, randomNameSuffix());
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_create_or_replace_change_location_",
" (a BIGINT) WITH (location = '%s')".formatted(location))) {
assertQueryFails(
"CREATE OR REPLACE TABLE " + table.getName() + " (a BIGINT) WITH (location = '%s_2')".formatted(location),
"The provided location '%1$s_2' does not match the existing table location '%1$s'".formatted(location));

assertLatestTableOperation(table.getName(), CREATE_TABLE_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableWithNoLocationSpecifiedForExternalTable()
{
String location = "s3://%s/%s".formatted(bucketName, randomNameSuffix());
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"create_or_replace_with_no_location_",
" (a BIGINT) WITH (location = '%s')".formatted(location))) {
assertTableType("test_schema", table.getName(), EXTERNAL_TABLE.name());
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 'abc' as colA", 1);
assertTableType("test_schema", table.getName(), EXTERNAL_TABLE.name());
}
}

@Test
public void testCreateOrReplaceTableWithNoLocationSpecifiedForManagedTable()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"create_or_replace_with_no_location_",
" (a BIGINT)")) {
assertTableType("test_schema", table.getName(), MANAGED_TABLE.name());
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 'abc' as colA", 1);
assertTableType("test_schema", table.getName(), MANAGED_TABLE.name());
}
}

@Test
public void testCreateOrReplaceTableWithStatsUpdated()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"create_or_replace_for_stats_",
" AS SELECT 1 as colA")) {
assertQuery(
"SHOW STATS FOR " + table.getName(),
"VALUES" +
"('cola', null, 1.0, 0.0, null, '1', '1')," +
"(null, null, null, null, 1.0, null, null)");

assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (colA BIGINT) ");
assertUpdate("INSERT INTO " + table.getName() + " VALUES null", 1);
assertQuery(
"SHOW STATS FOR " + table.getName(),
"VALUES" +
"('cola', 0.0, 0.0, 1.0, null, null, null)," +
"(null, null, null, null, 1.0, null, null)");
}
}

@Test
public void testCreateOrReplaceTableAsWithStatsUpdated()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"create_or_replace_for_stats_",
" AS SELECT 1 as colA")) {
assertQuery(
"SHOW STATS FOR " + table.getName(),
"VALUES" +
"('cola', null, 1.0, 0.0, null, '1', '1')," +
"(null, null, null, null, 1.0, null, null)");

assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT 25 colb ", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES (null)", 1);
assertQuery(
"SHOW STATS FOR " + table.getName(),
"VALUES" +
"('colb', null, 1.0, 0.5, null, '25', '25')," +
"(null, null, null, null, 2.0, null, null)");
}
}

private void assertLatestTableOperation(String tableName, String operation)
{
assertQuery("SELECT operation FROM \"%s$history\" ORDER BY version DESC LIMIT 1".formatted(tableName),
"VALUES '%s'".formatted(operation));
}

private void assertTableType(String schemaName, String tableName, String tableType)
{
HiveMetastore metastore = TestingDeltaLakeUtils.getConnectorService(getDistributedQueryRunner(), HiveMetastoreFactory.class)
.createMetastore(Optional.empty());
assertThat(metastore.getTable(schemaName, tableName).orElseThrow().getTableType()).isEqualTo(tableType);
}

private void testSupportedNonPartitionedColumnMappingWrites(String resourceName, boolean statsAsJsonEnabled)
throws Exception
{
Expand Down

0 comments on commit e11de06

Please sign in to comment.