Skip to content

Commit

Permalink
Implement CREATE OR REPLACE TABLE for delta lake connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 committed Apr 11, 2024
1 parent 5fb2e4e commit 26b0e08
Show file tree
Hide file tree
Showing 14 changed files with 1,036 additions and 45 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
Expand All @@ -38,6 +39,8 @@ public record DeltaLakeOutputTableHandle(
String schemaString,
ColumnMappingMode columnMappingMode,
OptionalInt maxColumnId,
boolean replace,
OptionalLong readVersion,
ProtocolEntry protocolEntry)
implements ConnectorOutputTableHandle
{
Expand All @@ -53,6 +56,7 @@ public record DeltaLakeOutputTableHandle(
requireNonNull(schemaString, "schemaString is null");
requireNonNull(columnMappingMode, "columnMappingMode is null");
requireNonNull(maxColumnId, "maxColumnId is null");
requireNonNull(readVersion, "readVersion is null");
requireNonNull(protocolEntry, "protocolEntry is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface DeltaLakeMetastore

void createTable(Table table, PrincipalPrivileges principalPrivileges);

void replaceTable(Table table, PrincipalPrivileges principalPrivileges);

void dropTable(SchemaTableName schemaTableName, String tableLocation, boolean deleteData);

void renameTable(SchemaTableName from, SchemaTableName to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public void createTable(Table table, PrincipalPrivileges principalPrivileges)
delegate.createTable(table, principalPrivileges);
}

@Override
public void replaceTable(Table table, PrincipalPrivileges principalPrivileges)
{
delegate.replaceTable(table.getDatabaseName(), table.getTableName(), table, principalPrivileges);
}

@Override
public void dropTable(SchemaTableName schemaTableName, String tableLocation, boolean deleteData)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import io.airlift.concurrent.MoreFutures;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -67,13 +69,16 @@
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CREATE_OR_REPLACE_TABLE_AS_OPERATION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.CREATE_OR_REPLACE_TABLE_OPERATION;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createDockerizedDeltaLakeQueryRunner;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.EXTENDED_STATISTICS_COLLECT_ON_WRITE;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.getConnectorService;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.getTableActiveFiles;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.QueryAssertions.getTrinoExceptionCause;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_TABLE;
Expand Down Expand Up @@ -2294,6 +2299,140 @@ public void testConcurrentInsertsReconciliationForBlindInserts()
testConcurrentInsertsReconciliationForBlindInserts(true);
}

@Test
public void testCreateOrReplaceTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_table", " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) {
assertThat(query("SELECT CAST(a AS bigint), b FROM " + table.getName()))
.matches("VALUES (BIGINT '42', -385e-1)");

assertUpdate("CREATE OR REPLACE TABLE %s (a bigint, b double)".formatted(table.getName()));
assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName());

assertTableVersion(table.getName(), 1L);
assertTableOperation(table.getName(), 1, CREATE_OR_REPLACE_TABLE_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableAs()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_table", " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) {
assertThat(query("SELECT CAST(a AS bigint), b FROM " + table.getName()))
.matches("VALUES (BIGINT '42', -385e-1)");

assertUpdate("CREATE OR REPLACE TABLE %s AS SELECT BIGINT '-53' a, DOUBLE '49.6' b".formatted(table.getName()), 1);
assertThat(query("SELECT CAST(a AS bigint), b FROM " + table.getName()))
.matches("VALUES (BIGINT '-53', 496e-1)");

assertTableVersion(table.getName(), 1L);
assertTableOperation(table.getName(), 1, CREATE_OR_REPLACE_TABLE_AS_OPERATION);
}
}

@Test
public void testCreateOrReplaceTableChangeColumnNamesAndTypes()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_table", " AS SELECT BIGINT '42' a, DOUBLE '-38.5' b")) {
assertThat(query("SELECT CAST(a AS bigint), b FROM " + table.getName()))
.matches("VALUES (BIGINT '42', -385e-1)");

assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " AS SELECT VARCHAR 'test' c, VARCHAR 'test2' d", 1);
assertThat(query("SELECT c, d FROM " + table.getName()))
.matches("VALUES (VARCHAR 'test', VARCHAR 'test2')");

assertTableVersion(table.getName(), 1L);
assertTableOperation(table.getName(), 1, CREATE_OR_REPLACE_TABLE_AS_OPERATION);
}
}

@RepeatedTest(3)
// Test from BaseConnectorTest
public void testCreateOrReplaceTableConcurrently()
throws Exception
{
int threads = 4;
int numOfCreateOrReplaceStatements = 4;
int numOfReads = 16;
CyclicBarrier barrier = new CyclicBarrier(threads + 1);
ExecutorService executor = newFixedThreadPool(threads + 1);
List<Future<?>> futures = new ArrayList<>();
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_or_replace", "(col integer)")) {
String tableName = table.getName();

getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 1 a");
assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1");

// One thread submits some CREATE OR REPLACE statements
futures.add(executor.submit(() -> {
barrier.await(30, SECONDS);
IntStream.range(0, numOfCreateOrReplaceStatements).forEach(index -> {
try {
getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT * FROM (VALUES (1), (2)) AS t(a) ");
} catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
throw new AssertionError("Unexpected concurrent CREATE OR REPLACE failure", trinoException);
} catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
}
});
return null;
}));
// Other 4 threads continue try to read the same table, none of the reads should fail.
IntStream.range(0, threads)
.forEach(threadNumber -> futures.add(executor.submit(() -> {
barrier.await(30, SECONDS);
IntStream.range(0, numOfReads).forEach(readIndex -> {
try {
MaterializedResult result = computeActual("SELECT * FROM " + tableName);
if (result.getRowCount() == 1) {
assertEqualsIgnoreOrder(result.getMaterializedRows(), List.of(new MaterializedRow(List.of(1))));
}
else {
assertEqualsIgnoreOrder(result.getMaterializedRows(), List.of(new MaterializedRow(List.of(1)), new MaterializedRow(List.of(2))));
}
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
throw new AssertionError("Unexpected concurrent CREATE OR REPLACE failure", trinoException);
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
}
});
return null;
})));
futures.forEach(Futures::getUnchecked);
getQueryRunner().execute("CREATE OR REPLACE TABLE " + tableName + " AS SELECT * FROM (VALUES (1), (2), (3)) AS t(a)");
assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3");
}
finally {
executor.shutdownNow();
executor.awaitTermination(30, SECONDS);
}
}

private void assertTableVersion(String tableName, long version)
{
assertThat(computeScalar(format("SELECT max(version) FROM \"%s$history\"", tableName))).isEqualTo(version);
}

private void assertTableOperation(String tableName, long version, String operation)
{
assertQuery("SELECT operation FROM \"%s$history\" WHERE version = %s".formatted(tableName, version),
"VALUES '%s'".formatted(operation));
}

private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitioned)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,37 @@ public void testReadV2CheckpointParquet()
.build());
}

@Test
public void testCreateOrReplaceTable()
{
assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)", ImmutableMultiset.of());

assertFileSystemAccesses("CREATE OR REPLACE TABLE test_create_or_replace (id VARCHAR, age INT)",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 762))
.add(new CacheOperation("Alluxio.readExternal", "00000000000000000000.json", 0, 762))
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000000.json", 0, 762))
.build());
assertUpdate("DROP TABLE test_create_or_replace");
}

@Test
public void testCreateOrReplaceTableAsSelect()
{
assertFileSystemAccesses(
"CREATE OR REPLACE TABLE test_create_or_replace_as_select AS SELECT 1 col_name", ImmutableMultiset.of());

assertFileSystemAccesses(
"CREATE OR REPLACE TABLE test_create_or_replace_as_select AS SELECT 1 col_name",
ImmutableMultiset.<CacheOperation>builder()
.add(new CacheOperation("Alluxio.readCached", "00000000000000000000.json", 0, 1004))
.add(new CacheOperation("Alluxio.readExternal", "00000000000000000000.json", 0, 1004))
.add(new CacheOperation("Alluxio.writeCache", "00000000000000000000.json", 0, 1004))
.build());

assertUpdate("DROP TABLE test_create_or_replace_as_select");
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
{
assertUpdate("CALL system.flush_metadata_cache()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,7 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo
assertQueryFails("COMMENT ON COLUMN " + tableName + ".foo IS NULL", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, '" + tableName + "', '7d')", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM TABLE(system.table_changes('tpch', '" + tableName + "'))", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("CREATE OR REPLACE TABLE " + tableName + " (id INTEGER)", "Metadata not found in transaction log for tpch." + tableName);
assertQuerySucceeds("CALL system.drop_extended_stats(CURRENT_SCHEMA, '" + tableName + "')");

// Avoid failing metadata queries
Expand Down

0 comments on commit 26b0e08

Please sign in to comment.