Skip to content

Commit

Permalink
Use virtual bucketing for temporary tables
Browse files Browse the repository at this point in the history
  • Loading branch information
jaystarshot committed Jun 4, 2024
1 parent 37c9f33 commit 232bb3c
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 6 deletions.
14 changes: 14 additions & 0 deletions presto-docs/src/main/sphinx/admin/cte-materialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ Flag to enable or disable the pushdown of common filters and projects into the m

Use the ``cte_filter_and_projection_pushdown_enabled`` session property to set on a per-query basis.

``hive.cte-virtual-bucket-count``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``128``

The number of buckets to be used for materializing CTEs in queries.
This setting determines how many buckets should be used when materializing the CTEs, potentially affecting the performance of queries involving CTE materialization.
A higher number of partitions might improve parallelism but also increases overhead in terms of memory and network communication.

Recommended value: 4 - 10x times the size of the cluster.

Use the ``hive.cte_virtual_bucket_count`` session property to set on a per-query basis.

``hive.temporary-table-storage-format``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -48,7 +49,9 @@
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveSessionProperties.getCteVirtualBucketCount;
import static com.facebook.presto.hive.HiveUtil.getRegularColumnHandles;
import static com.facebook.presto.hive.metastore.PrestoTableType.TEMPORARY_TABLE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -279,10 +282,13 @@ private static int hashBytes(int initialValue, Slice bytes)
return result;
}

public static Optional<HiveBucketHandle> getHiveBucketHandle(Table table)
public static Optional<HiveBucketHandle> getHiveBucketHandle(ConnectorSession session, Table table)
{
Optional<HiveBucketProperty> hiveBucketProperty = table.getStorage().getBucketProperty();
if (!hiveBucketProperty.isPresent()) {
if (table.getTableType().equals(TEMPORARY_TABLE)) {
return Optional.of(HiveBucketHandle.createVirtualBucketHandle(getCteVirtualBucketCount(session)));
}
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public class HiveClientConfig
private HiveStorageFormat temporaryTableStorageFormat = ORC;
private HiveCompressionCodec temporaryTableCompressionCodec = HiveCompressionCodec.SNAPPY;
private boolean shouldCreateEmptyBucketFilesForTemporaryTable;

private int cteVirtualBucketCount = 128;
private boolean usePageFileForHiveUnsupportedType = true;

private boolean pushdownFilterEnabled;
Expand Down Expand Up @@ -1334,6 +1336,20 @@ public boolean isParquetPushdownFilterEnabled()
return parquetPushdownFilterEnabled;
}

@Config("hive.cte-virtual-bucket-count")
@ConfigDescription("Number of buckets allocated per materialized CTE. (Recommended value: 4 - 10x times the size of the cluster)")
public HiveClientConfig setCteVirtualBucketCount(int cteVirtualBucketCount)
{
this.cteVirtualBucketCount = cteVirtualBucketCount;
return this;
}

@NotNull
public int getCteVirtualBucketCount()
{
return cteVirtualBucketCount;
}

@Config("hive.parquet.pushdown-filter-enabled")
@ConfigDescription("Experimental: enable complex filter pushdown for Parquet")
public HiveClientConfig setParquetPushdownFilterEnabled(boolean parquetPushdownFilterEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2993,7 +2993,7 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
Table table = metastore.getTable(metastoreContext, hiveTableHandle)
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent()) {
return Optional.empty();
}
Expand Down Expand Up @@ -3040,7 +3040,7 @@ public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForInsert(Conn
Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (hiveBucketHandle.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,10 @@ private Optional<HiveBucketHandle> getBucketHandle(
{
// never ignore table bucketing for temporary tables as those are created such explicitly by the engine request
if (table.getTableType().equals(TEMPORARY_TABLE)) {
return getHiveBucketHandle(table);
return getHiveBucketHandle(session, table);
}

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(table);
Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent() || shouldIgnoreTableBucketing(session)) {
return Optional.empty();
}
Expand Down Expand Up @@ -382,7 +382,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
.map(partition -> partition.orElseThrow(() -> new VerifyException("partition must exist")))
.collect(toImmutableList());

Optional<HiveBucketHandle> bucketHandle = shouldIgnoreTableBucketing(session) ? Optional.empty() : getHiveBucketHandle(table);
Optional<HiveBucketHandle> bucketHandle = shouldIgnoreTableBucketing(session) ? Optional.empty() : getHiveBucketHandle(session, table);
return new HivePartitionResult(
ImmutableList.copyOf(partitionColumns),
table.getDataColumns(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class HiveSessionProperties
public static final String PARQUET_PUSHDOWN_FILTER_ENABLED = "parquet_pushdown_filter_enabled";
public static final String ADAPTIVE_FILTER_REORDERING_ENABLED = "adaptive_filter_reordering_enabled";
public static final String VIRTUAL_BUCKET_COUNT = "virtual_bucket_count";
public static final String CTE_VIRTUAL_BUCKET_COUNT = "cte_virtual_bucket_count";
public static final String MAX_BUCKETS_FOR_GROUPED_EXECUTION = "max_buckets_for_grouped_execution";
public static final String OFFLINE_DATA_DEBUG_MODE_ENABLED = "offline_data_debug_mode_enabled";
public static final String FAIL_FAST_ON_INSERT_INTO_IMMUTABLE_PARTITIONS_ENABLED = "fail_fast_on_insert_into_immutable_partitions_enabled";
Expand Down Expand Up @@ -399,6 +400,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Number of virtual bucket assigned for unbucketed tables",
0,
false),
integerProperty(
CTE_VIRTUAL_BUCKET_COUNT,
"Number of virtual bucket assigned for bucketed cte materialization temporary tables",
hiveClientConfig.getCteVirtualBucketCount(),
false),
integerProperty(
MAX_BUCKETS_FOR_GROUPED_EXECUTION,
"maximum total buckets to allow using grouped execution",
Expand Down Expand Up @@ -901,6 +907,15 @@ public static int getVirtualBucketCount(ConnectorSession session)
return virtualBucketCount;
}

public static int getCteVirtualBucketCount(ConnectorSession session)
{
int virtualBucketCount = session.getProperty(CTE_VIRTUAL_BUCKET_COUNT, Integer.class);
if (virtualBucketCount < 0) {
throw new PrestoException(INVALID_SESSION_PROPERTY, format("%s must not be negative: %s", CTE_VIRTUAL_BUCKET_COUNT, virtualBucketCount));
}
return virtualBucketCount;
}

public static boolean isOfflineDataDebugModeEnabled(ConnectorSession session)
{
return session.getProperty(OFFLINE_DATA_DEBUG_MODE_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public void testDefaults()
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(60, TimeUnit.SECONDS))
.setMaxConcurrentQuickStatsCalls(100)
.setMaxConcurrentParquetQuickStatsCalls(500)
.setCteVirtualBucketCount(128)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)));
}

Expand Down Expand Up @@ -289,6 +290,7 @@ public void testExplicitPropertyMappings()
.put("hive.quick-stats.parquet.file-metadata-fetch-timeout", "30s")
.put("hive.quick-stats.parquet.max-concurrent-calls", "399")
.put("hive.quick-stats.max-concurrent-calls", "101")
.put("hive.cte-virtual-bucket-count", "256")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.build();

Expand Down Expand Up @@ -410,6 +412,7 @@ public void testExplicitPropertyMappings()
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS))
.setMaxConcurrentParquetQuickStatsCalls(399)
.setMaxConcurrentQuickStatsCalls(101)
.setCteVirtualBucketCount(256)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down

0 comments on commit 232bb3c

Please sign in to comment.