Skip to content

Commit

Permalink
Fix locking of non-partitioned transactional tables
Browse files Browse the repository at this point in the history
Without the fix we were using partition locking API in HMS even for
non-partitioned tables. It happened if during execution we successfully
completed predicate pushdown flow. Then in HiveTableHandle we had
non-empty partitions list. It contained a single element with synthetic
HivePartition object using UNPARTITIONED_ID id. It is used to pass
information about bucket filtering.

With the fix we are detecting such case in getValidWriteIds and locking
whole table instead.
  • Loading branch information
losipiuk committed Sep 6, 2021
1 parent 15c9c53 commit 8216444
Showing 1 changed file with 16 additions and 2 deletions.
Expand Up @@ -15,13 +15,15 @@

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.HiveMetastoreClosure;
import io.trino.plugin.hive.HivePartition;
import io.trino.plugin.hive.HiveTableHandle;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.spi.connector.SchemaTableName;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;

Expand Down Expand Up @@ -63,13 +65,25 @@ public AcidTransaction getTransaction()

public ValidTxnWriteIdList getValidWriteIds(HiveMetastoreClosure metastore, HiveTableHandle tableHandle)
{
List<SchemaTableName> lockedTables;
List<HivePartition> lockedPartitions;

if (tableHandle.getPartitionColumns().isEmpty() || tableHandle.getPartitions().isEmpty()) {
lockedTables = ImmutableList.of(tableHandle.getSchemaTableName());
lockedPartitions = ImmutableList.of();
}
else {
lockedTables = ImmutableList.of();
lockedPartitions = tableHandle.getPartitions().get();
}

// Different calls for same table might need to lock different partitions so acquire locks every time
metastore.acquireSharedReadLock(
identity,
queryId,
transactionId,
tableHandle.getPartitions().isEmpty() ? ImmutableList.of(tableHandle.getSchemaTableName()) : ImmutableList.of(),
tableHandle.getPartitions().orElse(ImmutableList.of()));
lockedTables,
lockedPartitions);

// For repeatable reads within a query, use the same list of valid transactions for a table which have once been used
return validHiveTransactionsForTable.computeIfAbsent(tableHandle.getSchemaTableName(), schemaTableName -> new ValidTxnWriteIdList(
Expand Down

0 comments on commit 8216444

Please sign in to comment.