Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnecessary remote exchange for DISTINCT #11251

Open
haozhun opened this issue Aug 11, 2018 · 9 comments
Open

Unnecessary remote exchange for DISTINCT #11251

haozhun opened this issue Aug 11, 2018 · 9 comments
Labels

Comments

@haozhun
Copy link
Contributor

haozhun commented Aug 11, 2018

Is this a bug?

Is this intentional to avoid skew and low parallelism? If so, what's the long term plan?

-- This issue affects bucket-by-bucket execution.
-- However, the issue itself is unrelated to bucket-by-bucket execution.
-- Bucket-by-bucket also isn't necessary for the reproduce.
set session colocated_join=true;
set session grouped_execution_for_aggregation=true;
set session concurrent_lifespans_per_task=1;
-- Use legacy mark-distinct strategy
-- (Doesn't reproduce with new strategy)
presto:di> set session use_mark_distinct=true;
presto:di> create table test_hjin_md with (partitioned_by=array['ds'], bucketed_by=array['x'], bucket_count=5) as select orderkey x, orderstatus status, '2019-01-01' ds from tpch.tiny.orders;
CREATE TABLE: 15000 rows
presto:di> explain (type distributed) select x, count(status), array_agg(distinct status) from test_hjin_md group by x;
                                                                           Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [x, count, array_agg]
     Output partitioning: SINGLE []
     Execution Flow: UNGROUPED_EXECUTION
     - Output[x, _col1, _col2] => [x:bigint, count:bigint, array_agg:array(varchar)]
             _col1 := count
             _col2 := array_agg
         - RemoteSource[1] => [x:bigint, count:bigint, array_agg:array(varchar)]

 Fragment 1 [HASH]
     Output layout: [x, count, array_agg]
     Output partitioning: SINGLE []
     Execution Flow: UNGROUPED_EXECUTION
     - Aggregate(FINAL)[x] => [x:bigint, count:bigint, array_agg:array(varchar)]
             count := "count"("count_14")
             array_agg := "array_agg"("array_agg_15")
         - LocalExchange[HASH][$hashvalue] ("x") => x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue:bigint
             - RemoteSource[2] => [x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue_16:bigint]

 Fragment 2 [HASH]
     Output layout: [x, count_14, array_agg_15, $hashvalue_20]
     Output partitioning: HASH [x][$hashvalue_20]
     Execution Flow: UNGROUPED_EXECUTION
     - Project[] => [x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue_20:bigint]
             $hashvalue_20 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0))
         - Aggregate(PARTIAL)[x] => [x:bigint, count_14:bigint, array_agg_15:array(varchar)]
                 count_14 := "count"("status")
                 array_agg_15 := "array_agg"("status") (mask = status$distinct)
             - MarkDistinct[distinct=x:bigint, status:varchar marker=status$distinct] => [x:bigint, status:varchar, status$distinct:boolean]
                 - Project[] => [x:bigint, status:varchar]
                     - LocalExchange[HASH][$hashvalue_17] ("x", "status") => x:bigint, status:varchar, $hashvalue_17:bigint
                         - RemoteSource[3] => [x:bigint, status:varchar, $hashvalue_18:bigint]

 Fragment 3 [prism:HivePartitioningHandle{bucketCount=5, hiveTypes=[bigint]}]
     Output layout: [x, status, $hashvalue_19]
     Output partitioning: HASH [x, status][$hashvalue_19]
     Execution Flow: UNGROUPED_EXECUTION
     - ScanProject[table = prism:di:test_hjin_md, originalConstraint = true] => [x:bigint, status:varchar, $hashvalue_19:bigint]
             Cost: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
             $hashvalue_19 := "combine_hash"("combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0)), COALESCE("$operator$hash_code"("status"), 0))
             LAYOUT: di.test_hjin_md
             x := HiveColumnHandle{name=x, hiveType=bigint, hiveColumnIndex=0, columnType=REGULAR}
             status := HiveColumnHandle{name=status, hiveType=string, hiveColumnIndex=1, columnType=REGULAR}
             HiveColumnHandle{name=ds, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}
                 :: [[2019-01-01]]

For reference, this is how the plan looks like without DISTINCT

presto:di> explain (type distributed) select x, count(status) from test_hjin_md group by x;
                                                         Query Plan
-----------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [x, count]
     Output partitioning: SINGLE []
     Execution Flow: UNGROUPED_EXECUTION
     - Output[x, _col1] => [x:bigint, count:bigint]
             _col1 := count
         - RemoteSource[1] => [x:bigint, count:bigint]

 Fragment 1 [prism:HivePartitioningHandle{bucketCount=5, hiveTypes=[bigint]}]
     Output layout: [x, count]
     Output partitioning: SINGLE []
     Execution Flow: GROUPED_EXECUTION
     - Aggregate(FINAL)[x] => [x:bigint, count:bigint]
             Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
             count := "count"("count_9")
         - LocalExchange[HASH][$hashvalue] ("x") => x:bigint, count_9:bigint, $hashvalue:bigint
                 Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
             - Project[] => [x:bigint, count_9:bigint, $hashvalue_10:bigint]
                     Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
                     $hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0))
                 - Aggregate(PARTIAL)[x] => [x:bigint, count_9:bigint]
                         Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
                         count_9 := "count"("status")
                     - TableScan[prism:di:test_hjin_md, originalConstraint = true] => [x:bigint, status:varchar]
                             Cost: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                             LAYOUT: di.test_hjin_md
                             x := HiveColumnHandle{name=x, hiveType=bigint, hiveColumnIndex=0, columnType=REGULAR}
                             status := HiveColumnHandle{name=status, hiveType=string, hiveColumnIndex=1, columnType=REGULAR}
                             HiveColumnHandle{name=ds, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}
                                 :: [[2019-01-01]]
@findepi
Copy link
Contributor

findepi commented Aug 13, 2018

@martint , do you intend to change the default for use_mark_distinct?

@martint
Copy link
Contributor

martint commented Aug 13, 2018

There are many queries that fail or have worse performance due to lack of parallelism when use_mark_distinct is off. We can’t yet change the default.

@arhimondr
Copy link
Member

Currently for MarkDistinct (

!child.getProperties().isStreamPartitionedOn(node.getDistinctSymbols())) {
) the partitioning is not followed as for the Aggregation node (
else if (!child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) {
)

There was a similar issue for the Window functions resolved by the #12122

This prevents us from using the grouped execution efficiently.

@arhimondr
Copy link
Member

CC: @wenleix @rschlussel @shixuan-fan

@arhimondr
Copy link
Member

arhimondr commented Apr 17, 2019

I recently found that queries like

SELECT 
  user_id,
  count(distinct item_id),
  count(distinct page_id),
  count(distinct ... ),
  count(distinct ... ),
  count(distinct ... ),
  count(distinct ... )
FROM 
  ...
GROUP BY
  userId

Are very common.

Currently the high level plan for this query will be

[partition by user_id + item_id] -> [mark distinct user_id + item_id] -> [partition by user_id + page_id] -> [mark distinct user_id + page_id] -> ... -> [partition by user_id] -> [aggregate by user id]

It requires extra shuffle at every step, that is very expensive.

In theory, the input can be partitioned once by user_id, and then all the distincts and the aggregation itself can be executed in a single stage.

I think the concert @martint expressed:

There are many queries that fail or have worse performance due to lack of parallelism when use_mark_distinct is off. We can’t yet change the default.

does not apply for this very case, as at the final step the aggregation by the user_id must be done anyway. So, if there's a skew by user_id the whole processing will be skewed by the final aggregation anyway.

The proposal is to change the condition here:

!child.getProperties().isStreamPartitionedOn(node.getDistinctSymbols())) {

to

if (child.getProperties().isSingleNode() ||
                    !child.getProperties().isStreamPartitionedOn(node.getDistinctSymbols()) &&
                    !child.getProperties().isNodePartitionedOn(preferredChildProperties.getGlobalProperties().get().getPartitioningProperties().get().getPartitioningColumns())) {

(of course with doing all the safety checks like preferredChildProperties.getGlobalProperties().isPresent() ...

The additional check effectively verifies that if the parent partitioning requirement is satisfied, no need to change the partitioning just for the mark distinct.

@wenleix , @rschlussel Thoughts?

@wenleix
Copy link
Contributor

wenleix commented Apr 17, 2019

So this is indeed an interesting problem. We will label the following query shape for convenience.

Query-Single:

SELECT custkey, COUNT(DISTINCT col)
FROM ...
GROUP BY custkey

Query-Many:

SELECT custkey, 
              COUNT(DISTINCT col1),
              COUNT(DISTINCT col2),
              ...
              COUNT(DISTINCT col100)
FROM ...
GROUP BY custkey

We will discuss over the two different scenarios (over unbucketed table vs. bucketed table)

Unbucketed Table

When the underlying table is unbucketed and use_mark_distinct=true, MarkDistinct will request to add an exchange. In this case, it makes sense to add exchange over both custkey and col (thinking about the case for join and aggregate). Only partitioning on custkey has the potential risk on skew or insufficient different custkey values.

So, for Query-Many over unbucketed table, we will see 100 remote exchanges since all the added exchanges are not compatible to each other. This is a known to be very inefficient even for remote exchange, and this situation is even further exacerbated in the exchange materialization situation.

In such case, the discussion in this issue about not inserting add exchange won't help. We should change to use use_mark_distinct=false introduced in 78f8146. One thought is to always to make user set use_mark_distinct=false if they want to try this new exchange materialization feature :)

Bucketed Table

When the input is already partitioned on custkey (e.g. the input table is bucketed), this now brings the question whether we still need to add the exchange to partition over (custkey, col). One could argue similar to the aggregation and window function (#12122) case, we should just adopt the existing partitioning.

On the other hand, one could also argue should we just set use_mark_distinct=false.

Other thoughts

Adaptive execution? For example, we can always start the query with use_mark_distinct=false, and only switch to use use_mark_distinct=true when we found

I found in general this problem has many resemblance with join/aggregation skew and whether to insert additional exchange, and might want to be solved in the same framework.

@arhimondr
Copy link
Member

Only partitioning on custkey has the potential risk on skew or insufficient different custkey values.

But anyhow, the latest GROUP BY custkey will have to shuffle by custkey. So, in case there's a skew - it will be skewed on the last stage anyway.

When the input is already partitioned on custkey (e.g. the input table is bucketed), this now brings the question whether we still need to add the exchange to partition over (custkey, col).

Again, if the downstream stage is an aggregation on custkey, i don't see why should we add extra exchange.

The only reason i can see - is the memory usage skew (some custkey's may have more distinct values for col)? Do we think that this is the concern?

@arhimondr
Copy link
Member

After the offline discussion with @rschlussel, we came into conclusion that the shuffles mainly needed to take care of the memory skew if the aggregation column cardinality is low.

I think nothing has to be changed related to MarkDistinct, and instead use_mark_distinct=false has to be used if memory skew is not a problem.

@Donderia
Copy link

@arhimondr / @wenleix - If we run multiple distinct queries on presto views which is made from multiple union all. Remote exchange stage is happening in single stage for all the distincts

use_mark_distinct[distinct=transactiondate:date, accountnumber:bigint marker=accountnumber$distinct][$hashvalue_210] │ Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint, $has │ CPU: 10.77s (10.17%), Scheduled: 13.59s (3.10%), Output: 9570301 rows (1.67GB) │ Input avg.: 74767.98 rows, Input std.dev.: 645.51% └─ MarkDistinct[distinct=transactiondate:date, origintransactionid:varchar marker=origintransactionid$distinct][$hashvalue_211] │ Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint, $ │ CPU: 12.56s (11.86%), Scheduled: 15.32s (3.49%), Output: 9570301 rows (1.66GB) │ Input avg.: 74767.98 rows, Input std.dev.: 645.51% └─ MarkDistinct[distinct=transactiondate:date, servedmsisdn:bigint marker=servedmsisdn$distinct][$hashvalue_212] │ Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint │ CPU: 11.04s (10.43%), Scheduled: 13.31s (3.03%), Output: 9570301 rows (1.64GB) │ Input avg.: 74767.98 rows, Input std.dev.: 645.51% └─ MarkDistinct[distinct=transactiondate:date, filename:varchar marker=filename$distinct][$hashvalue_213] │ Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:big │ CPU: 1.97s (1.86%), Scheduled: 3.02s (0.69%), Output: 9570301 rows (1.62GB) │ Input avg.: 74767.98 rows, Input std.dev.: 645.51% └─ LocalExchange[HASH][$hashvalue] ("transactiondate") │ Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211: │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?} │ CPU: 6.71s (6.34%), Scheduled: 11.00s (2.51%), Output: 9570301 rows (1.60GB) │ Input avg.: 37383.99 rows, Input std.dev.: 312.76% ├─ RemoteSource[3] │ Layout: [origintransactionid_5:varchar, servedmsisdn_6:bigint, accountnumber_7:bigint, transactionamount_13:double, filename_66:varchar, transactiondate_70:date, expr_73:double, $hashvalue_214:bigint, $hashvalue_215:bigint, │ CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 0 rows (0B) │ Input avg.: 0.00 rows, Input std.dev.: ?% └─ RemoteSource[4] Layout: [servedmsisdn_89:bigint, accountnumber_90:bigint, origintransactionid_101:varchar, transactionamount_106:double, accountbalance_108:double, filename_113:varchar, transactiondate_117:date, $hashvalue_229:bigint, $hash CPU: 2.02s (1.90%), Scheduled: 4.58s (1.04%), Output: 9570301 rows (1.60GB) Input avg.: 74767.98 rows, Input std.dev.: 209.55%

On tables we have multiple stages for mark distinct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants