Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict size of automatically broadcasted tables #11667

Merged
merged 3 commits into from
Oct 11, 2018

Conversation

sopel39
Copy link
Contributor

@sopel39 sopel39 commented Oct 9, 2018

Add join-max-broadcast-table-size property

Property defines maximum size of a table
that can be broadcast for JOIN (0 disables the limit).

Supersedes #11570

@sopel39
Copy link
Contributor Author

sopel39 commented Oct 9, 2018

I've done benchmarks with:

  • red <- current stats calculator with broadcast table size limit = 100MB
  • green <- broadcast limit disabled
  • blue <- upper estimate stats calculator with broadcast table size limit = 100MB

TPCH
TPCH
TPCH
TPCDS
TPCDS
TPCDS
TPCDS
TPCDS

@sopel39
Copy link
Contributor Author

sopel39 commented Oct 9, 2018

Observations:

  1. upper estimate calculator does bring more problems than benefits
  2. upper estimate calculator improves performance (vs disabled and current "probabilistic" stats calculator) for queries: tpcds/q22, tpcds/q72, tpcds/q95, tpcds/q76. This suggest that for those queries we are overestimating number of rows for build side.
  3. using current "probabilistic" calculator with broadcast table size limit improves performance (vs no limit) in queries: tpch/q09, tpcds/q80, tpcds/q81, tpcds/q64, tpcds/q40, tpcds/q24_1, tpcds/q24_2
  4. using current "probabilistic" calculator with broadcast table size limit degrades performance for some queries: tpch/q09, tpcds/q82, tpcds/q72, tpcds/q85, tpcds/q43, tpcds/q50, tpcds/q37.
    Overall averaged improvement is 0.029x and weighted by query time is 0.17x (for individual query the value is from 1 to inf [if perf improved] or from -inf to -1 [if perf degraded]).
  5. We greatly underestimate join build side construction, see query tpch/09. Will create separate issue for that

Conclusions:

  • I would land toggle for limiting broadcast build side (using "normal" stats calculator) as it slightly improves overall performance (based on TPCH/TPCDS benchmarks) but provides better cluster resource utilization (memory and concurrency).
    It can however greatly improve performance for individual queries and prevent them from failing on memory limit. Such toggle is available in other engines (e.g: spark, Netizza) too.

Additional steps for the future:

@@ -149,6 +151,15 @@ public SystemSessionProperties(
false,
value -> JoinDistributionType.valueOf(((String) value).toUpperCase()),
JoinDistributionType::name),
new PropertyMetadata<>(
JOIN_MAX_BROADCAST_TABLE_SIZE,
"Maximum size of a table that can be broadcast for JOIN (0 disables the limit). The most pessimistic size estimate is taken when doing comparison.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most pessimistic size estimate is taken when doing comparison. - This doesn't seem to be the case, is it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it nullable? So if the property is not set the limit is disabled. 0 rather implies that the broadcast should never be chosen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 disables the limit

It should be rather +INF to disable limit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most pessimistic size estimate is taken when doing comparison.

I would prefer to not to go to such details.

Maximum size

Please update to Maximum estimated size

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 Karol, I like this change. It is quite straightforward. When you refer to a similar option in Spark, are you thinking about spark.sql.autoBroadcastJoinThreshold ?

https://spark.apache.org/docs/latest/sql-programming-guide.html
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.

@@ -149,6 +151,15 @@ public SystemSessionProperties(
false,
value -> JoinDistributionType.valueOf(((String) value).toUpperCase()),
JoinDistributionType::name),
new PropertyMetadata<>(
JOIN_MAX_BROADCAST_TABLE_SIZE,
"Maximum size of a table that can be broadcast for JOIN (0 disables the limit). The most pessimistic size estimate is taken when doing comparison.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it nullable? So if the property is not set the limit is disabled. 0 rather implies that the broadcast should never be chosen.

}

@Config("join-max-broadcast-table-size")
@MinDataSize("0MB")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is extra. DataSize cannot be negative.

double buildSideSizeInBytes = buildSideStatsEstimate.getOutputSizeInBytes(buildSide.getOutputSymbols(), context.getSymbolAllocator().getTypes());
JoinDistributionType joinDistributionType = getJoinDistributionType(context.getSession());
Optional<DataSize> joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession());
return joinDistributionType.canReplicate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shortcircuit this check

JoinDistributionType joinDistributionType = getJoinDistributionType(context.getSession());
Optional<DataSize> joinMaxBroadcastTableSize = getJoinMaxBroadcastTableSize(context.getSession());
return joinDistributionType.canReplicate()
&& (!joinMaxBroadcastTableSize.isPresent() || buildSideSizeInBytes <= joinMaxBroadcastTableSize.get().toBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shortcircuit joinMaxBroadcastTableSize.isPresent() before doing anything else

p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.symbol("A1", BIGINT)),
p.filter(TRUE_LITERAL, p.values(new PlanNodeId("valuesB"), bRows, p.symbol("B1", BIGINT))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just p.values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's leftover from upper boundary stats calculator.

p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.symbol("A1", BIGINT)),
p.filter(TRUE_LITERAL, p.values(new PlanNodeId("valuesB"), bRows, p.symbol("B1", BIGINT))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about filter

p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.symbol("A1")),
p.filter(TRUE_LITERAL, p.values(new PlanNodeId("valuesB"), bRows, p.symbol("B1"))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

p.join(
INNER,
p.values(new PlanNodeId("valuesA"), aRows, p.symbol("A1")),
p.filter(TRUE_LITERAL, p.values(new PlanNodeId("valuesB"), bRows, p.symbol("B1"))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

% comments

@@ -149,6 +151,15 @@ public SystemSessionProperties(
false,
value -> JoinDistributionType.valueOf(((String) value).toUpperCase()),
JoinDistributionType::name),
new PropertyMetadata<>(
JOIN_MAX_BROADCAST_TABLE_SIZE,
"Maximum size of a table that can be broadcast for JOIN (0 disables the limit). The most pessimistic size estimate is taken when doing comparison.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 disables the limit

It should be rather +INF to disable limit

@@ -149,6 +151,15 @@ public SystemSessionProperties(
false,
value -> JoinDistributionType.valueOf(((String) value).toUpperCase()),
JoinDistributionType::name),
new PropertyMetadata<>(
JOIN_MAX_BROADCAST_TABLE_SIZE,
"Maximum size of a table that can be broadcast for JOIN (0 disables the limit). The most pessimistic size estimate is taken when doing comparison.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most pessimistic size estimate is taken when doing comparison.

I would prefer to not to go to such details.

Maximum size

Please update to Maximum estimated size

private PlanNode getCostBasedJoin(JoinNode joinNode, Context context)
{
CostProvider costProvider = context.getCostProvider();
List<PlanNodeWithCost> possibleJoinNodes = new ArrayList<>();

if (!mustPartition(joinNode)) {
if (!mustPartition(joinNode) && canReplicate(joinNode, context)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, joinNode.withDistributionType(REPLICATED)));
}
if (!mustReplicate(joinNode, context)) {
possibleJoinNodes.add(getJoinNodeWithCost(costProvider, joinNode.withDistributionType(PARTITIONED)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

above two ifs can be extracted as method and reused below

@findepi findepi self-assigned this Oct 11, 2018
Property defines maximum estimated size of a table
that can be broadcast for JOIN. This is used to
prevent CBO from broadcasting large tables which
could saturate the cluster. Additionally,
performance penality is small if CBO incorrectly
chooses to broadcast small table.

Related PR: prestodb#11667
@sopel39
Copy link
Contributor Author

sopel39 commented Oct 11, 2018

@mbasmanova that's the one (autoBroadcastJoinThreshold)

@sopel39 sopel39 merged commit b45c5a8 into prestodb:master Oct 11, 2018
sopel39 added a commit that referenced this pull request Oct 11, 2018
Property defines maximum estimated size of a table
that can be broadcast for JOIN. This is used to
prevent CBO from broadcasting large tables which
could saturate the cluster. Additionally,
performance penality is small if CBO incorrectly
chooses to broadcast small table.

Related PR: #11667
@sopel39 sopel39 deleted the ks/join_max_broadcast_size branch October 12, 2018 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants