Skip to content

Commit

Permalink
Fix inserting into transactional table when task_writer_count > 1
Browse files Browse the repository at this point in the history
fixes: #9149
  • Loading branch information
homar authored and findepi committed Jan 13, 2022
1 parent 1b0d119 commit 20a38b0
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 10 deletions.
Expand Up @@ -691,8 +691,6 @@ private StreamProperties(

checkArgument(distribution != SINGLE || this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Single stream must be partitioned on empty set");
checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Multiple streams must not be partitioned on empty set");

this.ordered = ordered;
checkArgument(!ordered || distribution == SINGLE, "Ordered must be a single stream");
Expand Down
Expand Up @@ -352,4 +352,18 @@ public void testNewDirectoryPermissions()
{
// Alluxio metastore does not support create operations
}

@Override
public void testInsertBucketedTransactionalTableLayout()
throws Exception
{
// Alluxio metastore does not support insert/update/delete operations
}

@Override
public void testInsertPartitionedBucketedTransactionalTableLayout()
throws Exception
{
// Alluxio metastore does not support insert/update/delete operations
}
}
Expand Up @@ -68,6 +68,7 @@ public class HivePageSink

private final HiveWriterFactory writerFactory;

private final boolean isTransactional;
private final int[] dataColumnInputIndex; // ordinal of columns (not counting sample weight column)
private final int[] partitionColumnsInputIndex; // ordinal of columns (not counting sample weight column)

Expand Down Expand Up @@ -98,6 +99,7 @@ public class HivePageSink
public HivePageSink(
HiveWriterFactory writerFactory,
List<HiveColumnHandle> inputColumns,
boolean isTransactional,
Optional<HiveBucketProperty> bucketProperty,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -112,6 +114,7 @@ public HivePageSink(

requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");

this.isTransactional = isTransactional;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.maxOpenWriters = maxOpenWriters;
this.writeVerificationExecutor = requireNonNull(writeVerificationExecutor, "writeVerificationExecutor is null");
Expand Down Expand Up @@ -361,7 +364,9 @@ private int[] getWriterIndexes(Page page)
HiveWriter writer = writers.get(writerIndex);
if (writer != null) {
// if current file not too big continue with the current writer
if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
// for transactional tables we don't want to split output files because there is an explicit or implicit bucketing
// and file names have no random component (e.g. bucket_00000)
if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
continue;
}
// close current writer
Expand Down
Expand Up @@ -173,6 +173,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
return new HivePageSink(
writerFactory,
handle.getInputColumns(),
handle.isTransactional(),
handle.getBucketProperty(),
pageIndexerFactory,
hdfsEnvironment,
Expand Down
Expand Up @@ -171,6 +171,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.builder;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
Expand Down Expand Up @@ -227,6 +228,7 @@
import static io.trino.plugin.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveTableRedirectionsProvider.NO_REDIRECTIONS;
import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand Down Expand Up @@ -5094,10 +5096,27 @@ protected static List<ColumnMetadata> filterNonHiddenColumnMetadata(Collection<C
private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty());
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty(), false);
}

private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns, Optional<HiveBucketProperty> bucketProperty)
private void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty)
throws Exception
{
createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, false);
}

protected void createEmptyTable(
SchemaTableName schemaTableName,
HiveStorageFormat hiveStorageFormat,
List<Column> columns,
List<Column> partitionColumns,
Optional<HiveBucketProperty> bucketProperty,
boolean isTransactional)
throws Exception
{
Path targetPath;
Expand All @@ -5113,14 +5132,18 @@ private void createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, Optional.empty());
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();

ImmutableMap.Builder<String, String> tableParamBuilder = ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId());
if (isTransactional) {
tableParamBuilder.put(TRANSACTIONAL, "true");
}
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(Optional.of(tableOwner))
.setTableType(TableType.MANAGED_TABLE.name())
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, session.getQueryId()))
.setParameters(tableParamBuilder.build())
.setDataColumns(columns)
.setPartitionColumns(partitionColumns);

Expand Down Expand Up @@ -5241,14 +5264,27 @@ public void testPreferredInsertLayout()
@Test
public void testInsertBucketedTableLayout()
throws Exception
{
insertBucketedTableLayout(false);
}

@Test
public void testInsertBucketedTransactionalTableLayout()
throws Exception
{
insertBucketedTableLayout(true);
}

protected void insertBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_bucketed_table");
try {
List<Column> columns = ImmutableList.of(
new Column("column1", HIVE_STRING, Optional.empty()),
new Column("column2", HIVE_LONG, Optional.empty()));
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(), Optional.of(bucketProperty), transactional);

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down Expand Up @@ -5277,6 +5313,19 @@ public void testInsertBucketedTableLayout()
@Test
public void testInsertPartitionedBucketedTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(false);
}

@Test
public void testInsertPartitionedBucketedTransactionalTableLayout()
throws Exception
{
insertPartitionedBucketedTableLayout(true);
}

protected void insertPartitionedBucketedTableLayout(boolean transactional)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_partitioned_table");
try {
Expand All @@ -5285,7 +5334,7 @@ public void testInsertPartitionedBucketedTableLayout()
new Column("column1", HIVE_STRING, Optional.empty()),
partitioningColumn);
HiveBucketProperty bucketProperty = new HiveBucketProperty(ImmutableList.of("column1"), BUCKETING_V1, 4, ImmutableList.of());
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty));
createEmptyTable(tableName, ORC, columns, ImmutableList.of(partitioningColumn), Optional.of(bucketProperty), transactional);

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Expand Down
Expand Up @@ -72,6 +72,8 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveTransactionalTable
extends HiveProductTest
Expand Down Expand Up @@ -1900,6 +1902,58 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(true);
}

@Test
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThanOne()
{
unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(false);
}

private void unbucketedTransactionalTableWithTaskWriterCountGreaterThanOne(boolean isPartitioned)
{
withTemporaryTable(format("test_unbucketed%s_transactional_table_with_task_writer_count_greater_than_one", isPartitioned ? "_partitioned" : ""), true, isPartitioned, NONE, tableName -> {
onTrino().executeQuery(format(
"CREATE TABLE %s " +
"WITH (" +
"format='ORC', " +
"transactional=true " +
"%s" +
") AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 0", tableName, isPartitioned ? ", partitioned_by = ARRAY['orderpriority']" : ""));
onTrino().executeQuery("SET SESSION scale_writers = true");
onTrino().executeQuery("SET SESSION writer_min_size = '4kB'");
onTrino().executeQuery("SET SESSION task_writer_count = 4");
onTrino().executeQuery("SET SESSION hive.target_max_file_size = '1MB'");

onTrino().executeQuery(
format(
"INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority " +
"FROM tpch.sf1000.orders LIMIT 100000", tableName));
assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000));
int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount();
int expectedNumberOfPartitions = isPartitioned ? 5 : 1;
assertEquals(numberOfCreatedFiles, expectedNumberOfPartitions, format("There should be only %s files created", expectedNumberOfPartitions));

int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName));
assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %% 2 = 0", tableName))).containsOnly(row(0));

int sizeOnTrinoWithWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %% 2 = 1", tableName)).rows().size();
int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

assertEquals(sizeOnHiveWithWhere, sizeOnTrinoWithWhere);
assertEquals(sizeOnTrinoWithWhere, sizeOnTrinoWithoutWhere);
assertTrue(sizeBeforeDeletion > sizeOnTrinoWithoutWhere);
});
}

private void hdfsDeleteAll(String directory)
{
if (!hdfsClient.exist(directory)) {
Expand Down

0 comments on commit 20a38b0

Please sign in to comment.