Skip to content

Commit

Permalink
Fix GlueHiveMetastore#batchGetPartition to handle unprocessedKeys
Browse files Browse the repository at this point in the history
BatchGetPartitionsRequest could potentially not return all requested
partition values. If Glue returns incomplete partitions, the query
will error out. This change is to handle unprocessedKeys from
BatchGetParititonsResults.
  • Loading branch information
cacts authored and findepi committed Jan 27, 2022
1 parent 6028e90 commit ab93d28
Showing 1 changed file with 35 additions and 15 deletions.
Expand Up @@ -128,6 +128,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Comparators.lexicographical;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -149,6 +150,7 @@
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toMap;
import static org.apache.hadoop.hive.metastore.TableType.MANAGED_TABLE;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
Expand Down Expand Up @@ -912,24 +914,42 @@ private Map<String, Optional<Partition>> getPartitionsByNames(Table table, List<
private List<Partition> batchGetPartition(Table table, List<String> partitionNames)
{
try {
List<Future<BatchGetPartitionResult>> batchGetPartitionFutures = new ArrayList<>();

for (List<String> partitionNamesBatch : Lists.partition(partitionNames, BATCH_GET_PARTITION_MAX_PAGE_SIZE)) {
List<PartitionValueList> partitionValuesBatch = mappedCopy(partitionNamesBatch, partitionName -> new PartitionValueList().withValues(toPartitionValues(partitionName)));
batchGetPartitionFutures.add(glueClient.batchGetPartitionAsync(new BatchGetPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionsToGet(partitionValuesBatch)));
}
List<PartitionValueList> pendingPartitions = partitionNames.stream()
.map(partitionName -> new PartitionValueList().withValues(toPartitionValues(partitionName)))
.collect(toCollection(ArrayList::new));

ImmutableList.Builder<Partition> resultsBuilder = ImmutableList.builderWithExpectedSize(partitionNames.size());

// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);
ImmutableList.Builder<Partition> resultsBuilder = ImmutableList.builderWithExpectedSize(partitionNames.size());
for (Future<BatchGetPartitionResult> future : batchGetPartitionFutures) {
future.get().getPartitions().stream()
.map(converter)
.forEach(resultsBuilder::add);

while (!pendingPartitions.isEmpty()) {
List<Future<BatchGetPartitionResult>> batchGetPartitionFutures = new ArrayList<>();
for (List<PartitionValueList> partitions : Lists.partition(pendingPartitions, BATCH_GET_PARTITION_MAX_PAGE_SIZE)) {
batchGetPartitionFutures.add(glueClient.batchGetPartitionAsync(new BatchGetPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionsToGet(partitions)));
}
pendingPartitions.clear();

for (Future<BatchGetPartitionResult> future : batchGetPartitionFutures) {
BatchGetPartitionResult batchGetPartitionResult = future.get();
List<com.amazonaws.services.glue.model.Partition> partitions = batchGetPartitionResult.getPartitions();
List<PartitionValueList> unprocessedKeys = batchGetPartitionResult.getUnprocessedKeys();

// In the unlikely scenario where batchGetPartition call cannot make progress on retrieving partitions, avoid infinite loop
if (partitions.isEmpty()) {
verify(!unprocessedKeys.isEmpty(), "Empty unprocessedKeys for non-empty BatchGetPartitionRequest and empty partitions result");
throw new TrinoException(HIVE_METASTORE_ERROR, "Cannot make progress retrieving partitions. Unable to retrieve partitions: " + unprocessedKeys);
}

partitions.stream()
.map(converter)
.forEach(resultsBuilder::add);
pendingPartitions.addAll(unprocessedKeys);
}
}

return resultsBuilder.build();
Expand Down

0 comments on commit ab93d28

Please sign in to comment.