Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted Split Scheduling #16668

Merged
merged 2 commits into from Nov 29, 2021
Merged

Conversation

pettyjamesm
Copy link
Contributor

@pettyjamesm pettyjamesm commented Aug 31, 2021

Docs are pending but basically this change:

  • Adds a SplitWeight field to ConnectorSplit which allows connectors to indicate “this split is smaller than normal by a factor of X” (for hive, the current only implementation, this is based on “size in bytes”, ie: small files)
  • Changes NodeScheduler and related classes to assign splits workers based on their weight instead of just the split count alone

The effect of the above means that when splits are sized appropriately, no behavior changes- but when splits are small (ie: when the hive connector is processing small files) the worker split queues are allowed to be deeper to compensate which significantly improves performance.

Description of Changes

Changes to presto-spi and presto-main

ConnectorSplits now carry a SplitWeight, which by default returns a "standard" split weight but which can be overridden by connector implementations to influence split scheduling behaviors.

All NodeScheduler split assignment decisions are now based on node total and task queued split weight totals instead of split counts, except for "task unacknowledged split counts" (a pre-existing behavior controlled by NodeSchedulerConfig(node-scheduler.max-unacknowledged-splits-per-task)). That configuration is now much more significant since it can be used to control how large individual task update requests sent from the coordinator to workers can get when a large number of splits with small weights are scheduled.

Changes to presto-hive

A version of split weighting for the hive connector is included, which is enabled by default but can be disabled by
setting the Hive session property size_based_split_weights_enabled=false or the hive configuration property hive.size-based-split-weights-enabled=false. When disabled, all splits are assigned the standard weight.

When enabled, splits are assigned their weight based on their size in bytes relative to hive.max-split-size. Splits that are 1/2 of the max split size will be weighted as 1/2 of the standard split weight. In this implementation, no split will be
assigned a weight smaller than the value set by the minimum_assigned_split_weight hive session property or the
hive.minimum-assigned-split-weight configuration property (default: 5). This provides a mechanism to control how aggressively the scheduler will respond to the presence of small files. With the current standard split
weight of 100, this means that split queues will at most be scheduled 20x deeper when all splits are smaller than 1/20th of the max split size.

Currently, splits that are greater than the hive.max-split-size value (eg: unsplittable files) are also assigned the standard split weight, such that any given assigned weight will always fall between the minimum assigned and standard weight. This is an implementation choice for the Hive connector, but not a strict requirement on the behavior that connectors might choose to implement in the future.

Benchmarks

TPCH scale factor 10GB suite datasets were generated in both Parquet and JSON:

Dataset Rows Per-File Typical Resulting File Size Compression
Parquet Normal 10M 210MB Snappy
Parquet Small 3,000 140KB Snappy
JSON Normal 10M 2GB Uncompressed
JSON Small 3,000 1MB Uncompressed
  • The relatively small 10GB scale factor was chosen because otherwise the small file datasets generated way too many S3 objects and the data generation process would fail because of S3 throttling
  • The JSON files were uncompressed so that the normal files dataset splits would still be considered splittable. Compressed files are not splittable and sending a few large splits to workers for single threaded processing on workers would not have been effective at comparing the effect of scheduler behavior.

TPCH suite execution time geomean measurements, collected on a cluster of r5.8xlarge instances, with 5 worker nodes (and one coordinator):

Baseline

File Format Small File Geomean Normal File Geomean
Parquet 32.86 2.6
JSON 32.18 5.4

Improved

File Format Small File Geomean Small File Imrovement Normal File Geomean Normal File Improvement
Parquet 14.04 ~2.34x 2.61 unchanged
JSON 8.73 ~3.68x 5.33 unchanged

Note, that before weighted scheduling, both parquet and JSON small files performed about the same because were bottlenecked on split scheduling throughput and latency. With weighted scheduling enabled, the bottleneck becomes worker I/O and decoding throughput so parquet and JSON perform differently as a result.

== RELEASE NOTES ==

General Changes
* Add support for scheduling splits to workers based using a computed weight for each split

Hive Changes
* Add support for weighing splits according to their file size, allowing deeper worker split queues when files are small

@yulongfufu
Copy link
Contributor

@yulongfufu yulongfufu commented Sep 1, 2021

@pettyjamesm hi, I glanced over the PR, it seems that you want to add the weight according to the file size(hive), and schedule it by split weight instead of split count. I guess it may resolve the skew problem, does we have any test report? In addition, is it also useful to the divisible format e.g. parquet ? : )

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Sep 1, 2021

I glanced over the PR, it seems that you want to add the weight according to the file size(hive), and schedule it by split weight instead of split count.

That's correct, all scheduling decisions now operate based on split weights (except for NodeSchedulerConfig(node-scheduler.max-unacknowledged-splits-per-task) which is now much more important since it controls the number of splits sent to workers in a single task update request, ie: the task update request size). Connectors can choose to implement logic that determines the weight of a given split to influence the effective queue depths, and the implementation I've added here for Hive uses the split size in bytes compared to the Hive target split size (from HiveClientConfig(hive.max-split-size)) to assign weights between the minimum allowed weight (controlled via the new config: HiveClientConfig(hive.minimum-assigned-split-weight)) and the standard weight. Right now the hive implementation will never weight splits above the standard weight, but the implementation in presto-main does not forbid doing so.

does we have any test report? In addition, is it also useful to the divisible format e.g. parquet ?

I'll add the details of the experiment setup I used to the PR description shortly, but I tested this with JSON and Parquet small files and saw a ~2.5x improvement for parquet and ~4x improvement for JSON on the entire TPCH suite for small files with these changes and no change for "normal" size files (as expected). The parquet performance is less dramatic than the JSON scenario because the individual small reads against S3 for each data page are much less efficient than a single streaming read like you get with JSON. In either case, this change dramatically increases worker utilization whereas before the workers would complete work more quickly than the coordinator could deliver splits, which is the problem the scheduler side change attempts to address. If you couple it with smarter parquet I/O strategies, then the parquet small file performance can increase even more dramatically (in one specific scenario tested, pre-reading parquet files < 1MB in size into memory improved by ~8x).

@pettyjamesm pettyjamesm marked this pull request as ready for review Sep 1, 2021
@pettyjamesm pettyjamesm requested a review from highker Sep 1, 2021
@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 2 times, most recently from eb2efad to e1bc2b3 Compare Sep 2, 2021
@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Sep 3, 2021

@pettyjamesm What was the cluster size the benchmark was ran on? And how many files were there?

@yulongfufu
Copy link
Contributor

@yulongfufu yulongfufu commented Sep 3, 2021

saw a ~2.5x improvement for parquet and ~4x improvement

It's a great test result! And we also do some work when it comes to small files: we don't change the size of HiveClientConfig(hive.max-split-size) dynamically. Instead, we combine some small splits to a mutiple-split until it's size meet the maxSplitBytes(e.g. 64MB). Then we read these splits at the same time. Our test result is also very good. But it has some conflict to yours. I need to think how to merge your feature to our company branch.
Looking forward to your merge, and I will test it in our company.

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Sep 3, 2021

What was the cluster size the benchmark was ran on? And how many files were there?

@yingsu00 The cluster used r5.8xlarge instances with 5 worker nodes and one coordinator. I don't have the datasets anymore to confirm the exact total file count in the 6 TPCH tables, but metrics I still have laying around for individual queries corroborate that the file counts matched the TPCH SF10 schema row count. The small file datasets limited each file to 3,000 rows and the "normal" datasets limited each file to 10M rows, meaning for example- the lineitem table contained 60M rows comprised of ~20,000 files in the small file datasets and 6 files in the "normal" datasets.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 3 times, most recently from 7017ad7 to 6122d94 Compare Sep 21, 2021
@tdcmeehan tdcmeehan added the rm label Sep 27, 2021
Copy link
Contributor

@kewang1024 kewang1024 left a comment

Can we resolve the conflict and update the PR to the newest (From trinodb/trino#9059)

@@ -587,7 +591,28 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
ENABLE_LOOSE_MEMORY_BASED_ACCOUNTING,
"Enable loose memory accounting to avoid OOMing existing queries",
hiveClientConfig.isLooseMemoryAccountingEnabled(),
false));
false),
booleanProperty(
Copy link
Contributor

@kewang1024 kewang1024 Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this feature can be controlled per query level, the node split stats (getTotalSplitsWeight / getQueuedSplitsWeightForStage) would actually not be accurate

For example, the feature is turned off by default, and only one query has this feature turned on. For this query, the split stats from previous queries (which are still running) are actually using different split weight calculation standards than this query

Copy link
Contributor Author

@pettyjamesm pettyjamesm Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an environment with some queries enabling this feature and other queries not enabling it:

  • It's true that different weight calculations would be used for the splits associated with the queries (disabled: all splits are the "standard" weight, enabled: each splits weight is calculated based on size in bytes)
  • I wouldn't say that this means that the node split stats are not accurate, at all times the values are computed as the sum of the weight of the splits assigned assigned- it's just that the weights have been computed differently between queries. It's worth noting that the getQueuedSplitsWeightForStage() is a per-task metric, so there will be no interaction between queries on that metric, but the node level getTotalSplitsWeight() will have an interaction between the two queries. In a sense, this is unavoidable so long as this setting exists- but I don't think it's actually a problem since the current implementation when enabled will only weight splits as "smaller than standard" and never "larger than standard". The net effect is that queries with small files will still be allowed a higher level of parallelism than before, even when other queries with small files have this feature disabled.

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Oct 5, 2021

Can we resolve the conflict and update the PR to the newest (From trinodb/trino#9059)

Yeah, I'm hoping to drive the conversations in that PR to a conclusion before re-syncing the two PR's since the effort to make revisions to both open PR as feedback comes in is high, and it sounds like there will be at least one more round of changes to come.

@kewang1024
Copy link
Contributor

@kewang1024 kewang1024 commented Oct 5, 2021

saw a ~2.5x improvement for parquet and ~4x improvement

It's a great test result! And we also do some work when it comes to small files: we don't change the size of HiveClientConfig(hive.max-split-size) dynamically. Instead, we combine some small splits to a mutiple-split until it's size meet the maxSplitBytes(e.g. 64MB). Then we read these splits at the same time. Our test result is also very good. But it has some conflict to yours. I need to think how to merge your feature to our company branch. Looking forward to your merge, and I will test it in our company.

Hey, @yulongfufu it sounds interesting, can you share with us your PR as well?

@yulongfufu
Copy link
Contributor

@yulongfufu yulongfufu commented Oct 8, 2021

can you share with us your PR as well?

I pick a demo #16850 from my branch.
For now, It is used to Hive(of course, It can be amended to fit all connectors). Please be free to bring out any questions, I will do more work to improve it.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 4 times, most recently from 3c5c13e to d3da02d Compare Oct 11, 2021
@yingsu00 yingsu00 self-requested a review Oct 14, 2021
Copy link
Contributor

@yingsu00 yingsu00 left a comment

Hi @pettyjamesm thanks for submitting this fix.

The PR basically looks good, but there is a small catch:
You introduced split weight but still kept node-scheduler.max-splits-per-node config property. Now the per node split quota is (100*max-splits-per-node). However, since small splits get small weight like and large splits get 100, the splits scheduled to a node may well be exceeding node-scheduler.max-splits-per-node. This contradicts with its literal meaning. E.g. you set node-scheduler.max-splits-per-node to 100, but a node may get up to 2000 splits. So the "max" splits is no longer max splits per node. This may be confusing for the users.

How about we change node-scheduler.max-splits-per-node to node-scheduler.max-weighted-splits-per-node, and make the default weight 1 for maximal size? Small splits would have weights less than 1, e.g. 0.05. In this way the splits per node may be higher but the weighted splits may never exceed node-scheduler.max-weighted-splits-per-node . What do you think?

checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask");
this.maxSplitsWeightPerNode = SplitWeight.standardWeightOfSplitCount(maxSplitsPerNode);
Copy link
Contributor

@yingsu00 yingsu00 Oct 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static import SplitWeight.standardWeightOfSplitCount
the next line too


public TestSplitLocal()
{
this(SplitWeight.standard());
Copy link
Contributor

@yingsu00 yingsu00 Oct 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the tests only covers the standard weight. Is there any test covering the non-standard case? If not can we add some tests?

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 4 times, most recently from 9596ba9 to 8e081dc Compare Nov 5, 2021
@@ -25,6 +26,7 @@
public final class SplitWeight
{
private static final long UNIT_VALUE = 100;
Copy link
Contributor

@yingsu00 yingsu00 Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think using long is a good idea.

First of all, by using long and UNIT_VALUE 100, it loses precision and finer granularity since it can only represent 100 granules.

Secondly, this would regress performance instead of improving it as you originally thought. Since you used long in SPI and double in connector, you had to convert the value between the two:

public static SplitWeight fromProportion(double weight)
{
    return fromRawValue((long) Math.ceil(weight * UNIT_VALUE));
}

The use of Math.ceil() is in fact much more expensive than ADD instructions on double precision floating numbers. An ADD on double takes 4 cycles on Intel CPU (Nehalem) while ceil() or floor() takes ten's or hundred's of cycles: https://stackoverflow.com/questions/23203710/floating-point-operations-per-cycle-intel . You also had to do a multiplication weight * UNIT_VALUE instead of a simple assignment, and multiplication is more expensive than ADD on double or integer types on most CPUs. All these don't need to happen if you just use double in both systems, and you can simply pass the value from connector to scheduler, and the only arithmetic operation you needed to do is mostly ADD. It should be much faster than this conversion. Furthermore, the double arithmetic is the least thing you need to worry about in this scenario. Listing directory is many orders of magnitude higher than adding some, even millions of doubles. In the past I never saw the split assignment related operations would be CPU bottlenecks. You are welcome to write a JMH benchmark to test this.

Thirdly, it added extra logic and complexity to the code for maintaining two weight systems. Code becomes more complex, and developers get confused.

Based on the above, I think having two weight types is a bad idea. I didn't know why Trino side suggested this. Is there something I didn't know? If not any particular reasons, I would strongly suggest unify the two weighting systems.

Copy link
Contributor Author

@pettyjamesm pettyjamesm Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to walk through the thinking about how we ended up with this version of SplitWeight representations in the original PR and then through the changes made in the course of the Trino PR revisions.

First, I want to highlight the different places and characteristics of where these weight values are used:

  1. The SPI: Each split must be able to assign a weight value, and connectors not supporting split weighting must have a default "standard" weight. Ideally, the "standard" weight and general weight calculation system should be intuitive and avoid creating compatibility headaches in the future.
  2. The NodeScheduler and supporting classes like NodeTaskMap, NodeAssignmentStats, and split placement logic. This logic does a few things that start to create some real constraints on possible solutions:
    • The coordinators view of a current task must track the combined weight of splits that are queued locally, queued remotely, and running splits separately performs arithmetic on those separate weight sums to combine the queued locally weight with the weights from the last observed remote task status update.
    • The task level accounting then contributes its computed view of the current task via and deltas from previous values into the NodeTaskMap, which tracks the current total weight running on a node through these incrementally computed deltas for all tasks on a given node, and does so over the course of that nodes entire lifetime.
    • Failing to accurately account weights is a risk, and introduces the requirement that split weight arithmetic must not lose accuracy in the course of performic arithmetic over arbitrarily ordered additions and subtractions of combined sums and deltas between them, otherwise a node's state would drift away from the "correct" value and cause bad scheduling decisions in either direction (ie: node is "full" when it's actually not running anything, or "empty" when it's actually fully loaded).
    • Split scheduling is latency sensitive, especially when splits are small and the task / node state tracking updates all occur within critical sections or compareAndSet operations. Overhead of calculation should be minimized where possible. For what it's worth, I think double would be fine here, but BigDecimal would be a performance risk.
    • The scheduler only needs to understand the concept of a "standard" weight in order to translate the previous configuration properties that expressed "split counts" into equivalent "standard weight sum" values
  3. Task and pipeline status updates inside of split assignments must also accumulate and perform arithmetic operations to determine the worker's view of currently queued / running / completed weights which is similar to the above coordinator side problem. It's also similar in that these accounting points happen in critical paths that are expected to be somewhat contended. Workers need not understand anything about the definition of "standard weight". The final bit of complexity that the task status updates provide however, is that however the state is computed and represented- it must be able to represent the current computed state value back to the coordinator in JSON format. This causes more problems because floating point values must lose at least some precision when serialized to strings (ie: 0.33333...) and might need a large number of digits to lost only an acceptable amount of precision.

With the above constraints, I chose to use a single long field in the first iteration of this PR with a "standard" value of 100. The choice solved problems associated with items 2 and 3 above, because arbitrarily ordered arithmetic operations over single and aggregated weights is not a problem with integer arithmetic, calculations are cheap to perform, and values serialize correctly and succinctly in JSON. double would probably perform sufficiently well, but risked precision loss beyond just that introduced by serialization. BigDecimal would probably have been able to reach a reasonable amount of precision such that even with JSON serialization the loss was negligible, but would have been a performance risk for the inner loop of the split scheduling and critical sections of the task and node status updates performing arbitrary precision arithmetic continuously. This unfortunately, forced me to compromised on item 1 initially.

In the Trino PR the initial pushback on this attempt was similar:

  • Computing weights "relative to 100" is awkward and unintuitive- computing them relative to 1.0 is more natural
  • Exposing a magic hard-coded constant of 100 as a "standard" value would be cemented into the SPI contract after that point and hard to change
  • What if 100 "granules" isn't enough and we want to be able to express weights at higher resolution (eg: 1,000 or 1M)

The resolution was what you see updated in the latest version of this PR:

  • The internal representation remains a long to avoid loss of accuracy in arithmetic and serialization
  • The preferred construction of weight values for connectors is via SplitWeight.fromProportion(double) which is more intuitive, but allows changes in the granularity in the future if necessary. The performance of the one-time conversion from double to long at the point of creation in connectors is less critical than the performance of sections doing arithmetic on the internal representation within the engine.

That kind of leaves the last remaining item here: is 100 enough granularity? In my experience with this setting, yes. Remember, the weighting mechanism is designed to make the coordinator split assignment rate adaptive to avoid bottlenecking on split scheduling latency which leavees workers underutilized. Once workers are given sufficiently many splits to stay busy, the goal is accomplished and expressing and no further granularity is especially important. This is incidentally part of the reason which is why the default "hive.minimum-assigned-split-weight" is 0.05 (1/20 of a standard split) or 20x more splits allowed, and not 100x more splits. Different environments might have different needs, so if in the future we decide that 1,000 or 1M should become the definition of "standard", that's still an option with the solution in this current PR.

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch 3 times, most recently from 8ffdeab to a5d716f Compare Nov 12, 2021
@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Nov 13, 2021

@pettyjamesm Thanks for the explanation. I see existing split weights you added in May 2020 was using long and changing to another type may be over-killing, and comparing double and long may lose precision. So I'm fine with this approach now.

One more catch: the "split" semantics now changed to "standard split" in the related configurations like node-scheduler.max-splits-per-node and node-scheduler.max-pending-splits-per-task. We need to at least add descriptions to these configs, if not changing the names of these configs to node-scheduler.max-standard-splits-per-node and node-scheduler.max-pending-standard-splits-per-task. Otherwise the actual splits count may exceed these config values and users would be confused.

Also would you please merge the last 3 commits to the previous ones?

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 15, 2021

the "split" semantics now changed to "standard split" in the related configurations like node-scheduler.max-splits-per-node and node-scheduler.max-pending-splits-per-task. We need to at least add descriptions to these configs, if not changing the names of these configs to node-scheduler.max-standard-splits-per-node and node-scheduler.max-pending-standard-splits-per-task. Otherwise the actual splits count may exceed these config values and users would be confused.

Agreed, the names are no longer appropriately named- I'll change the config property names and add the old names as @LegacyConfig as well as add appropriate descriptions.

Also would you please merge the last 3 commits to the previous ones?

Sure thing, my plan was to keep adding incremental commits to make it easier to track the incremental PR changes through review but to collapse the commits down once the changes were more or less ready to merge. My plan for that was to have two commits, one for presto-main and another for presto-hive.

@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Nov 16, 2021

@pettyjamesm thank you! I've been hesitant in whether we should be renaming these properties, or just adding a description with existing names. If we change the property names, existing deployed clusters might fail because of these changes. @tdcmeehan what's your take on this?

@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Nov 18, 2021

@pettyjamesm Since @tdcmeehan seems busy, let's choose the least intrusive way by just adding description to these properties for now. We can always change their names in the future. What do you think?

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 18, 2021

@yingsu00 sure, I'll revert the property name changes but leave in the updated config description annotations and documentation changes (with the property name references changed to the originals, of course). Shall I go ahead and squash down the PR commits into the two commits now, or after you have a chance to review the final documentation commit separately?

@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from 9148911 to 79107ac Compare Nov 18, 2021
@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Nov 21, 2021

@pettyjamesm the updated description looks good to me. Now let's squash the last 4 commits into the first 5, or if you prefer 2 commits that's also fine. Thank you!

pettyjamesm added 2 commits Nov 22, 2021
This change adds the notion of a SplitWeight to the presto-spi, a
concept which allows connectors to indicate the amount of work
associated with a given split, relative to some "standard" split.

Connectors can choose to assign that concept of a weight based on
whatever heuristics might be appropriate within the context of that
connector, such as size in bytes or some other metric that might
be known about the split at the time that it is generated. The
calculation need not be fully precise, but care should be taken
when implementing weight calculations in connectors to avoid using
weights that are very small or very large.

On the presto-main side, split weights are used to inform how many
splits are allowed to be running on a given node or queued on a
given task (still configured by: node-scheduler.max-splits-per-node
and node-scheduler.max-pending-splits-per-task respectively). Those
values are now interpreted to be relative to one "standard" split
weight. When all splits are assigned the standard weight, the
scheduler will behave the same way as before. However, when splits
are assigned weights that are smaller, the scheduler will allow
more of them to be assigned or queued to tasks. In effect, this
allows the coordinator to assign enough splits to workers for them
to stay busy between batches of split assignments when the amount
of time that workers take to complete individual splits is too
short.

In order to control the maximum number of splits that might be
delivered in a single task update request, the existing config
parameter node-scheduler.max-unacknowledged-splits-per-task still
controls the absolute count of splits that the coordinator will
allow to be assigned to a given task that have not yet been
acknowledged by the worker as having been received. This can be
especially important if splits themselves have a large serialized
JSON representation, in which case sending a large number of small
splits (by weight) could create huge task update requests to be
sent.
Implements split weighting for HiveSplit instances based on their
size. This behavior is enabled by default and can be disabled by
setting the session property `size_based_split_weights_enabled=false`
or the hive configuration property `hive.size-based-split-weights-enabled=false`.

When enabled, splits are assigned their weight based on their size
in bytes relative to `hive.max-split-size`. Splits that are 1/2 of
the max split size will be weighted as 1/2 of the standard split weight.

Splits that are larger in size than the hive target split size
(eg: unsplittable files) are still assigned the standard split weight
to avoid scheduler interactions that might be harmful when extremely
large weights are calculated. This is a conservative decision designed
to prevent existing workloads from regressing, but might be work
revisiting in the future.

Finally, when size based split weights are enabled, no split will be
assigned a weight smaller than the value set by the
`minimum_assigned_split_weight` session property or the
`hive.minimum-assigned-split-weight` configuration property which
defaults to 0.05 (proportional to 1.0, the weight of a standard split).
This provides a mechanism to control how aggressively the scheduler
will respond to the presence of small files. With the default
configuration, files smaller than 5% of the target split size will
still be assigned 5% of the standard split weight, allowing a
maximum of 20x more splits to be running or queued.
@pettyjamesm pettyjamesm force-pushed the weighted-split-scheduling branch from 79107ac to cccfe44 Compare Nov 22, 2021
@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 22, 2021

@yingsu00 changes have been squashed down into two commits, one for the changes in the engine core and another for the connector implementation in presto-hive

@yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Nov 23, 2021

@pettyjamesm Looks good, except I found that NodeSchedulerConfig.java was not updated. Will you please update the ConfigDescription for all 3 mentioned properties?
Btw. I think making node-scheduler.max-unacknowledged-splits-per-task being absolute split counts while max-pending-splits-per-task and max-splits-per-node being "relative" counts may confuse the users. Was there any particular reason for that?

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 23, 2021

I found that NodeSchedulerConfig.java was not updated. Will you please update the ConfigDescription for all 3 mentioned properties?

Are you referring to @ConfigDescription annotations? I added those here for the two split scheduling properties to mention the semantics in terms of split weighting.

The third (existing and unchanged configuration) property node-scheduler.max-unacknowledged-splits-per-task is indeed, still used in terms of absolute split count and intentionally does not consider split weights. The reasoning for that is that splits "unacknowledged" are coordinator local splits that will be sent in the next task update request, which means that having too many splits (in terms of count) could generate very large task update requests when serialized as JSON and that setting is intended to control exactly that.

It was a pre-existing change that I made in advance of this PR to add weighted scheduling because it laid some of the necessary groundwork in terms of the refactoring required- but the intention was specifically to control the kind of behavior that can now occur with weighted scheduling potentially assigning many more splits to nodes. In any case, it has an existing @ConfigDescription which remains accurate as-is even with this PR merged.

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 25, 2021

@yingsu00 @tdcmeehan any objections to calling this ready to merge (and merging)?

@pettyjamesm
Copy link
Contributor Author

@pettyjamesm pettyjamesm commented Nov 29, 2021

Confirmed with @yingsu00 via slack that she's in favor of proceeding to merge this, merging now.

@pettyjamesm pettyjamesm merged commit adab448 into prestodb:master Nov 29, 2021
40 checks passed
@pettyjamesm pettyjamesm deleted the weighted-split-scheduling branch Nov 29, 2021
@ajaygeorge ajaygeorge mentioned this pull request Dec 13, 2021
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants