Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted Split Scheduling #16668

Merged
merged 2 commits into from Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 14 additions & 9 deletions presto-docs/src/main/sphinx/admin/properties.rst
Expand Up @@ -555,14 +555,19 @@ Node Scheduler Properties
* **Type:** ``integer``
* **Default value:** ``100``

The target value for the total number of splits that can be running for
each worker node.
The target value for the number of splits that can be running for
each worker node, assuming all splits have the standard split weight.

Using a higher value is recommended if queries are submitted in large batches
(e.g., running a large group of reports periodically) or for connectors that
produce many splits that complete quickly. Increasing this value may improve
query latency by ensuring that the workers have enough splits to keep them
fully utilized.
produce many splits that complete quickly but do not support assigning split
weight values to express that to the split scheduler. Increasing this value
may improve query latency by ensuring that the workers have enough splits to
keep them fully utilized.

When connectors do support weight based split scheduling, the number of splits
assigned will depend on the weight of the individual splits. If splits are
small, more of them are allowed to be assigned to each worker to compensate.

Setting this too high will waste memory and may result in lower performance
due to splits not being balanced across workers. Ideally, it should be set
Expand All @@ -575,10 +580,10 @@ Node Scheduler Properties
* **Type:** ``integer``
* **Default value:** ``10``

The number of outstanding splits that can be queued for each worker node
for a single stage of a query, even when the node is already at the limit for
total number of splits. Allowing a minimum number of splits per stage is
required to prevent starvation and deadlocks.
The number of outstanding splits with the standard split weight that can be
queued for each worker node for a single stage of a query, even when the
node is already at the limit for total number of splits. Allowing a minimum
number of splits per stage is required to prevent starvation and deadlocks.

This value must be smaller than ``node-scheduler.max-splits-per-node``,
will usually be increased for the same reasons, and has similar drawbacks
Expand Down
Expand Up @@ -198,6 +198,9 @@ public class HiveClientConfig

private boolean verboseRuntimeStatsEnabled;

private boolean sizeBasedSplitWeightsEnabled = true;
private double minimumAssignedSplitWeight = 0.05;

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -1682,4 +1685,31 @@ public int getMaterializedViewMissingPartitionsThreshold()
{
return this.materializedViewMissingPartitionsThreshold;
}

@Config("hive.size-based-split-weights-enabled")
public HiveClientConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled)
{
this.sizeBasedSplitWeightsEnabled = sizeBasedSplitWeightsEnabled;
return this;
}

public boolean isSizeBasedSplitWeightsEnabled()
{
return sizeBasedSplitWeightsEnabled;
}

@Config("hive.minimum-assigned-split-weight")
@ConfigDescription("Minimum weight that a split can be assigned when size based split weights are enabled")
public HiveClientConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
{
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
return this;
}

@DecimalMax("1.0") // standard split weight
@DecimalMin(value = "0", inclusive = false)
public double getMinimumAssignedSplitWeight()
{
return minimumAssignedSplitWeight;
}
}
Expand Up @@ -129,6 +129,8 @@ public final class HiveSessionProperties
public static final String VERBOSE_RUNTIME_STATS_ENABLED = "verbose_runtime_stats_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_ENABLED = "dwrf_writer_stripe_cache_enabled";
private static final String DWRF_WRITER_STRIPE_CACHE_SIZE = "dwrf_writer_stripe_cache_size";
public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -617,7 +619,27 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
DWRF_WRITER_STRIPE_CACHE_SIZE,
"Maximum size of DWRF stripe cache to be held in memory",
orcFileWriterConfig.getDwrfStripeCacheMaxSize(),
false));
false),
booleanProperty(
SIZE_BASED_SPLIT_WEIGHTS_ENABLED,
"Enable estimating split weights based on size in bytes",
hiveClientConfig.isSizeBasedSplitWeightsEnabled(),
false),
new PropertyMetadata<>(
MINIMUM_ASSIGNED_SPLIT_WEIGHT,
"Minimum assigned split weight when size based split weighting is enabled",
DOUBLE,
Double.class,
hiveClientConfig.getMinimumAssignedSplitWeight(),
false,
value -> {
double doubleValue = ((Number) value).doubleValue();
if (!Double.isFinite(doubleValue) || doubleValue <= 0 || doubleValue > 1) {
throw new PrestoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value));
}
return doubleValue;
},
value -> value));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1077,4 +1099,14 @@ public static DataSize getDwrfWriterStripeCacheeMaxSize(ConnectorSession session
{
return session.getProperty(DWRF_WRITER_STRIPE_CACHE_SIZE, DataSize.class);
}

public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session)
{
return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class);
}

public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}
}
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class HiveSplit
private final Optional<EncryptionInformation> encryptionInformation;
private final Map<String, String> customSplitInfo;
private final Set<ColumnHandle> redundantColumnDomains;
private final SplitWeight splitWeight;

@JsonCreator
public HiveSplit(
Expand All @@ -90,7 +92,8 @@ public HiveSplit(
@JsonProperty("cacheQuota") CacheQuotaRequirement cacheQuotaRequirement,
@JsonProperty("encryptionMetadata") Optional<EncryptionInformation> encryptionInformation,
@JsonProperty("customSplitInfo") Map<String, String> customSplitInfo,
@JsonProperty("redundantColumnDomains") Set<ColumnHandle> redundantColumnDomains)
@JsonProperty("redundantColumnDomains") Set<ColumnHandle> redundantColumnDomains,
@JsonProperty("splitWeight") SplitWeight splitWeight)
{
checkArgument(start >= 0, "start must be positive");
checkArgument(length >= 0, "length must be positive");
Expand Down Expand Up @@ -136,6 +139,7 @@ public HiveSplit(
this.encryptionInformation = encryptionInformation;
this.customSplitInfo = ImmutableMap.copyOf(requireNonNull(customSplitInfo, "customSplitInfo is null"));
this.redundantColumnDomains = ImmutableSet.copyOf(redundantColumnDomains);
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
}

@JsonProperty
Expand Down Expand Up @@ -296,6 +300,13 @@ public Set<ColumnHandle> getRedundantColumnDomains()
return redundantColumnDomains;
}

@JsonProperty
@Override
public SplitWeight getSplitWeight()
{
return splitWeight;
}

@Override
public Object getInfo()
{
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.InternalHiveSplit.InternalHiveBlock;
import com.facebook.presto.hive.util.AsyncQueue;
import com.facebook.presto.hive.util.AsyncQueue.BorrowResult;
import com.facebook.presto.hive.util.SizeBasedSplitWeightProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
Expand Down Expand Up @@ -55,6 +56,8 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.hive.HiveSessionProperties.isSizeBasedSplitWeightsEnabled;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.CLOSED;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.FAILED;
import static com.facebook.presto.hive.HiveSplitSource.StateKind.INITIAL;
Expand Down Expand Up @@ -94,6 +97,7 @@ class HiveSplitSource

private final CounterStat highMemorySplitSourceCounter;
private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();
private final HiveSplitWeightProvider splitWeightProvider;

private HiveSplitSource(
ConnectorSession session,
Expand Down Expand Up @@ -121,6 +125,7 @@ private HiveSplitSource(
this.maxInitialSplitSize = getMaxInitialSplitSize(session);
this.useRewindableSplitSource = useRewindableSplitSource;
this.remainingInitialSplits = new AtomicInteger(maxInitialSplits);
this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider();
}

public static HiveSplitSource allAtOnce(
Expand Down Expand Up @@ -506,7 +511,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
cacheQuotaRequirement,
internalSplit.getEncryptionInformation(),
internalSplit.getCustomSplitInfo(),
internalSplit.getPartitionInfo().getRedundantColumnDomains()));
internalSplit.getPartitionInfo().getRedundantColumnDomains(),
splitWeightProvider.weightForSplitSizeInBytes(splitBytes)));

internalSplit.increaseStart(splitBytes);

Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.spi.SplitWeight;

public interface HiveSplitWeightProvider
{
SplitWeight weightForSplitSizeInBytes(long splitSizeInBytes);

static HiveSplitWeightProvider uniformStandardWeightProvider()
{
return (splitSizeInBytes) -> SplitWeight.standard();
}
}
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.util;

import com.facebook.presto.hive.HiveSplitWeightProvider;
import com.facebook.presto.spi.SplitWeight;
import io.airlift.units.DataSize;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class SizeBasedSplitWeightProvider
implements HiveSplitWeightProvider
{
private final double minimumWeight;
// The configured size for being used to break files into splits,
// this size corresponds to a "standard" weight proportion of 1.0
private final double targetSplitSizeInBytes;
pettyjamesm marked this conversation as resolved.
Show resolved Hide resolved

public SizeBasedSplitWeightProvider(double minimumWeight, DataSize targetSplitSize)
{
checkArgument(Double.isFinite(minimumWeight) && minimumWeight > 0 && minimumWeight <= 1, "minimumWeight must be > 0 and <= 1, found: %s", minimumWeight);
this.minimumWeight = minimumWeight;
long targetSizeInBytes = requireNonNull(targetSplitSize, "targetSplitSize is null").toBytes();
checkArgument(targetSizeInBytes > 0, "targetSplitSize must be > 0, found: %s", targetSplitSize);
this.targetSplitSizeInBytes = (double) targetSizeInBytes;
}

@Override
public SplitWeight weightForSplitSizeInBytes(long splitSizeInBytes)
{
// Clamp the value be between the minimum weight and 1.0 (standard weight)
return SplitWeight.fromProportion(Math.min(Math.max(splitSizeInBytes / targetSplitSizeInBytes, minimumWeight), 1.0));
}
}
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -153,7 +154,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
NO_CACHE_REQUIREMENT,
Optional.empty(),
ImmutableMap.of(),
ImmutableSet.of());
ImmutableSet.of(),
SplitWeight.standard());

TableHandle tableHandle = new TableHandle(
new ConnectorId(HIVE_CATALOG),
Expand Down
Expand Up @@ -156,7 +156,9 @@ public void testDefaults()
.setVerboseRuntimeStatsEnabled(false)
.setPartitionLeaseDuration(new Duration(0, TimeUnit.SECONDS))
.setMaterializedViewMissingPartitionsThreshold(100)
.setLooseMemoryAccountingEnabled(false));
.setLooseMemoryAccountingEnabled(false)
.setSizeBasedSplitWeightsEnabled(true)
.setMinimumAssignedSplitWeight(0.05));
}

@Test
Expand Down Expand Up @@ -274,6 +276,8 @@ public void testExplicitPropertyMappings()
.put("hive.loose-memory-accounting-enabled", "true")
.put("hive.verbose-runtime-stats-enabled", "true")
.put("hive.materialized-view-missing-partitions-threshold", "50")
.put("hive.size-based-split-weights-enabled", "false")
.put("hive.minimum-assigned-split-weight", "1.0")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -387,7 +391,9 @@ public void testExplicitPropertyMappings()
.setVerboseRuntimeStatsEnabled(true)
.setPartitionLeaseDuration(new Duration(4, TimeUnit.HOURS))
.setMaterializedViewMissingPartitionsThreshold(50)
.setLooseMemoryAccountingEnabled(true);
.setLooseMemoryAccountingEnabled(true)
.setSizeBasedSplitWeightsEnabled(false)
.setMinimumAssignedSplitWeight(1.0);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.JoinCompiler;
Expand Down Expand Up @@ -255,7 +256,8 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
NO_CACHE_REQUIREMENT,
Optional.empty(),
ImmutableMap.of(),
ImmutableSet.of());
ImmutableSet.of(),
SplitWeight.standard());

TableHandle tableHandle = new TableHandle(
new ConnectorId(HIVE_CATALOG),
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.spi.SplitWeight;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -71,7 +72,8 @@ public void testGenerateCacheQuota()
NO_CACHE_REQUIREMENT,
Optional.empty(),
ImmutableMap.of(),
ImmutableSet.of());
ImmutableSet.of(),
SplitWeight.standard());

CacheQuota cacheQuota = HivePageSourceProvider.generateCacheQuota(split);
CacheQuota expectedCacheQuota = new CacheQuota(".", Optional.empty());
Expand Down Expand Up @@ -106,7 +108,8 @@ public void testGenerateCacheQuota()
new CacheQuotaRequirement(PARTITION, Optional.of(DataSize.succinctDataSize(1, DataSize.Unit.MEGABYTE))),
Optional.empty(),
ImmutableMap.of(),
ImmutableSet.of());
ImmutableSet.of(),
SplitWeight.standard());

cacheQuota = HivePageSourceProvider.generateCacheQuota(split);
expectedCacheQuota = new CacheQuota(SCHEMA_NAME + "." + TABLE_NAME + "." + PARTITION_NAME, Optional.of(DataSize.succinctDataSize(1, DataSize.Unit.MEGABYTE)));
Expand Down