Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider memory during writer scaling #18771

Merged
merged 6 commits into from Sep 25, 2023

Conversation

gaurav8297
Copy link
Member

@gaurav8297 gaurav8297 commented Aug 22, 2023

Description

Benchmarks (Unpartitioned)

Before:

trino> set session query_max_memory_per_node = '3GB';
SET SESSION
trino:insert_demo> insert into lineitem_unpart_2 select * from tpch_sf1000_orc.lineitem;

Query 20230829_001613_00003_7gygv, FAILED, 7 nodes
Splits: 903 total, 16 done (1.77%)
11.32 [36.1M rows, 13.1GB] [3.19M rows/s, 1.16GB/s]

Query 20230829_001613_00003_7gygv failed: Query exceeded per-node memory limit of 3GB [Allocated: 3.00GB, Delta: 1010.93kB, Top Consumers: {TableScanOperator=2.51GB, TableWriterOperator=503.44MB, AggregationOperator=115.58kB}]

After:

trino> set session query_max_memory_per_node = '3GB';
SET SESSION
trino:insert_demo> insert into lineitem_unpart select * from tpch_sf1000_orc.lineitem;
INSERT: 5999989709 rows

Query 20230828_224641_00005_ftzqd, FINISHED, 7 nodes
Splits: 3,116 total, 3,116 done (100.00%)
8:52 [6B rows, 172GB] [11.3M rows/s, 330MB/s]

Benchmarks (Partitioned)

Before:

trino:insert_demo> set session query_max_memory_per_node = '6GB';
SET SESSION
trino:insert_demo> insert into lineitem_few_skewed_partitions select orderkey, partkey, suppkey, linenumber, quant
ity, extendedprice, discount, tax, returnflag, linestatus, commitdate, receiptdate, shipinstruct, comment, shipdat
e, shipmode from tpch_sf1000_orc_part.lineitem;

Query 20230831_032052_00012_tus9r, FAILED, 7 nodes
Splits: 1,779 total, 48 done (2.70%)
21.84 [196M rows, 17.3GB] [8.97M rows/s, 811MB/s]

Query 20230831_032052_00012_tus9r failed: Query exceeded per-node memory limit of 6GB [Allocated: 6.00GB, Delta: 3.00MB, Top Consumers: {ScanFilterAndProjectOperator=3.18GB, TableWriterOperator=2.35GB, PagePartitioner=454.33MB}]

After:

trino:insert_demo> set session query_max_memory_per_node = '6GB';
SET SESSION
trino:insert_demo> insert into lineitem_few_skewed_partitions select orderkey, partkey, suppkey, linenumber, quant
ity, extendedprice, discount, tax, returnflag, linestatus, commitdate, receiptdate, shipinstruct, comment, shipdat
e, shipmode from tpch_sf1000_orc_part.lineitem;
INSERT: 5999989709 rows

Query 20230831_031245_00014_dhrst, FINISHED, 7 nodes
Splits: 5,463 total, 5,463 done (100.00%)
11:48 [6B rows, 159GB] [8.48M rows/s, 230MB/s]

However, note that this query takes ~4 minutes with 64GB query_max_memory_per_node.

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

@ThreadSafe
public class LocalExchange
{
private static final int SCALE_WRITERS_MAX_PARTITIONS_PER_WRITER = 128;
public static final double SCALE_WRITERS_MAX_MEMORY_RATIO = 0.7;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe 0.2, 0.5? IMO the totalMemoryUsed should be pretty accurate during scaling up, because the writers are supposed to be full

This should account for how many writers we can add up in single iteration (e.g. scale by 2x, 1.5x)?

@gaurav8297 gaurav8297 force-pushed the scale_writer_max_memory branch 6 times, most recently from e82d241 to 212804c Compare August 28, 2023 02:49
@gaurav8297 gaurav8297 marked this pull request as ready for review August 28, 2023 02:55
// because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never
// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high
// resource utilization.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32);
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message says same value as the number of worker threads, but maxWorkerThreads is Runtime.getRuntime().availableProcessors() * 2 which is a different number

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Should we set this to the same number? I think it makes sense in case there are more cores.

@gaurav8297 gaurav8297 force-pushed the scale_writer_max_memory branch 4 times, most recently from c63376b to 0c38e6e Compare September 1, 2023 06:46
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, but it becomes pretty sophisticated.

I was thinking about something simpler, like task.scale-writers.rebalace-multiplier=2 where we essentially can multiply partition writers up to a factor (decided by user)

// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high
// resource utilization.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32);
// Set the value of default max writer count to the number of processors * 2 and cap it to 64. We can set this value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed we might simplify properties:

min-writer-count
max-writer-count
scale-writers.enabled

(remove partition writer count)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm creating another PR for this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav8297 gaurav8297 force-pushed the scale_writer_max_memory branch 2 times, most recently from f0cda79 to 804cec8 Compare September 11, 2023 05:46
@gaurav8297 gaurav8297 changed the title Consider memory during scaling Consider memory during writer scaling Sep 12, 2023
@gaurav8297 gaurav8297 force-pushed the scale_writer_max_memory branch 3 times, most recently from de7d6ba to 9b8da1a Compare September 15, 2023 23:26
If we don't reset then it's possible that an
unskewed partition gets scaled eventually
since partitionDataSizeSinceLastRebalance gets
accumulated over multiple rebalance cycles.

This causes unnecessary partitions to get
overscaled which can lead to high memory usage.
This session property specifies the minimum
data processed to trigger skewed partition
rebalancing in local and remote exchange.

Currently, the value of this property is too
small and since now we do not work with
physicalWrittenBytes in scaling, we can increase
the value of this property. This will also make
scaling a bit less aggressive.
@gaurav8297 gaurav8297 force-pushed the scale_writer_max_memory branch 2 times, most recently from e83e2f5 to 7999410 Compare September 19, 2023 01:31
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

@@ -581,10 +585,14 @@ public LocalExecutionPlan plan(
int taskCount = getTaskCount(partitioningScheme);
if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) {
partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes);
// Consider memory while calculating the number of writers. This is to avoid creating too many task buckets.
int partitionedWriterCount = min(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just comment: we kind of have no visibility into partitionedWriterCount.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that it's value that is computed internally, but user isn't really aware how big it is. Hence it could potentially be suprising.

core/trino-main/src/main/java/io/trino/util/MoreMath.java Outdated Show resolved Hide resolved
Add maxRebalancedPartitions limit to
SkewedPartitionRebalancer such that in
extreme cases we don't do over scaling which
can cause high memory utilization.

The value of maxRebalancedPartitions is
calculated using max memory allowed per
node.
Since writers are both CPU and IO bound, we
should increase them to the same value as the
number of worker threads.
@sopel39 sopel39 merged commit 8bba6ff into trinodb:master Sep 25, 2023
91 checks passed
@sopel39
Copy link
Member

sopel39 commented Sep 25, 2023

merged, thanks!

@sopel39 sopel39 mentioned this pull request Sep 25, 2023
public static int getScaleWritersMaxSkewedPartitions(Session session)
{
// Set the value of maxSkewedPartitions to scale to 60% of maximum number of writers possible per node.
return (int) (ceil((double) getQueryMaxMemoryPerNode(session).toBytes() / MAX_MEMORY_PER_PARTITION_WRITER) * 0.60);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment where 60% came from.

// Keep the maxPartitionsToRebalance to atleast writer count such that single partition writes do
// not suffer from skewness and can scale uniformly across all writers. Additionally, note that
// maxWriterCount is calculated considering memory into account. So, it is safe to set the
// maxPartitionsToRebalance to maximum number of writers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it is safe to set the maxPartitionsToRebalance to maximum number of writers.

That kind of depends on order whether you will scale first or see all partitions first.

@@ -3497,24 +3503,29 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
return 1;
}

int maxWritersBasedOnMemory = getMaxPartitionWritersBasedOnMemory(session);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just getMaxWritersBasedOnMemory

if (partitioningScheme.isPresent()) {
// The default value of partitioned writer count is 32 which is high enough to use it
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many
// small files since when scaling is disabled only single writer will handle a single partition.
int partitionedWriterCount = getTaskWriterCount(session);
Copy link
Member

@sopel39 sopel39 Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be getTaskPartitionedWriterCount as it was getTaskPartitionedWriterCount(session) before? This is unpartitioned, unscaled case.

@github-actions github-actions bot added this to the 427 milestone Sep 25, 2023
.map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session)))
.orElse(getTaskPartitionedWriterCount(session));
}
return getTaskPartitionedWriterCount(session);
// Consider memory while calculating writer count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could extract min(partitionedWriterCount, previousPowerOfTwo(maxWritersBasedOnMemory)) and use it here and in plan method. Then it would be obvious it's same thing and where it comes from

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed hive Hive connector
Development

Successfully merging this pull request may close these issues.

None yet

3 participants