Skip to content

Commit

Permalink
planner: stream agg should not be pushed to double read (#12443)
Browse files Browse the repository at this point in the history
For the following two reason, we should not push stream agg down to double read
- The aggregate will lost the handle information
- There's no sort operator. The second read is ordered with pk, not by index.
  • Loading branch information
winoros committed Oct 12, 2019
1 parent 1f3af1e commit 45bc789
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain.result
Expand Up @@ -28,13 +28,13 @@ set session tidb_hashagg_partial_concurrency = 1;
set session tidb_hashagg_final_concurrency = 1;
explain select group_concat(a) from t group by id;
id count task operator info
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#0, ",")
StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#6, ",")
└─Projection_18 10000.00 root cast(Column#2), Column#1
└─TableReader_15 10000.00 root data:TableScan_14
└─TableScan_14 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true, stats:pseudo
explain select group_concat(a, b) from t group by id;
id count task operator info
StreamAgg_8 8000.00 root group by:Column#6, funcs:group_concat(Column#0, Column#0, ",")
StreamAgg_8 8000.00 root group by:Column#8, funcs:group_concat(Column#6, Column#7, ",")
└─Projection_18 10000.00 root cast(Column#2), cast(Column#3), Column#1
└─TableReader_15 10000.00 root data:TableScan_14
└─TableScan_14 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true, stats:pseudo
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_complex.result
Expand Up @@ -179,7 +179,7 @@ CREATE TABLE `tbl_008` (`a` int, `b` int);
CREATE TABLE `tbl_009` (`a` int, `b` int);
explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;
id count task operator info
HashAgg_34 72000.00 root group by:Column#50, funcs:sum(Column#0)
HashAgg_34 72000.00 root group by:Column#51, funcs:sum(Column#50)
└─Projection_63 90000.00 root cast(Column#46), Column#47
└─Union_35 90000.00 root
├─TableReader_38 10000.00 root data:TableScan_37
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_complex_stats.result
Expand Up @@ -205,7 +205,7 @@ CREATE TABLE tbl_009 (a int, b int);
load stats 's/explain_complex_stats_tbl_009.json';
explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;
id count task operator info
HashAgg_34 18000.00 root group by:Column#50, funcs:sum(Column#0)
HashAgg_34 18000.00 root group by:Column#51, funcs:sum(Column#50)
└─Projection_63 18000.00 root cast(Column#46), Column#47
└─Union_35 18000.00 root
├─TableReader_38 2000.00 root data:TableScan_37
Expand Down
6 changes: 3 additions & 3 deletions cmd/explaintest/r/explain_easy.result
Expand Up @@ -90,7 +90,7 @@ TableReader_7 0.33 root data:Selection_6
└─TableScan_5 1.00 cop[tikv] table:t1, range:[1,1], keep order:false, stats:pseudo
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
StreamAgg_12 1.00 root funcs:sum(Column#0)
StreamAgg_12 1.00 root funcs:sum(Column#11)
└─Projection_23 10000.00 root cast(Column#8)
└─HashLeftJoin_22 10000.00 root CARTESIAN left outer semi join, inner:IndexReader_21, other cond:eq(Column#1, Column#4)
├─IndexReader_17 10000.00 root index:IndexScan_16
Expand Down Expand Up @@ -217,7 +217,7 @@ StreamAgg_11 1.00 root funcs:count(1)
set @@session.tidb_opt_insubq_to_join_and_agg=0;
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
StreamAgg_12 1.00 root funcs:sum(Column#0)
StreamAgg_12 1.00 root funcs:sum(Column#11)
└─Projection_23 10000.00 root cast(Column#8)
└─HashLeftJoin_22 10000.00 root CARTESIAN left outer semi join, inner:IndexReader_21, other cond:eq(Column#1, Column#4)
├─IndexReader_17 10000.00 root index:IndexScan_16
Expand All @@ -235,7 +235,7 @@ Projection_6 10000.00 root Column#8
└─TableScan_12 10000.00 cop[tikv] table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
explain select sum(6 in (select c2 from t2)) from t1;
id count task operator info
StreamAgg_12 1.00 root funcs:sum(Column#0)
StreamAgg_12 1.00 root funcs:sum(Column#11)
└─Projection_22 10000.00 root cast(Column#8)
└─HashLeftJoin_21 10000.00 root CARTESIAN left outer semi join, inner:TableReader_20
├─IndexReader_17 10000.00 root index:IndexScan_16
Expand Down
4 changes: 2 additions & 2 deletions cmd/explaintest/r/select.result
Expand Up @@ -386,7 +386,7 @@ Projection_8 10000.00 root and(or(or(gt(Column#9, 1), ne(Column#1, Column#8)), i
└─HashLeftJoin_9 10000.00 root CARTESIAN inner join, inner:StreamAgg_16
├─TableReader_12 10000.00 root data:TableScan_11
│ └─TableScan_11 10000.00 cop[tikv] table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_16 1.00 root funcs:firstrow(Column#0), count(distinct Column#0), sum(Column#0), count(1)
└─StreamAgg_16 1.00 root funcs:firstrow(Column#15), count(distinct Column#16), sum(Column#17), count(1)
└─Projection_26 10000.00 root Column#4, Column#4, cast(isnull(Column#4))
└─TableReader_23 10000.00 root data:TableScan_22
└─TableScan_22 10000.00 cop[tikv] table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
Expand All @@ -396,7 +396,7 @@ Projection_8 10000.00 root or(and(and(le(Column#9, 1), eq(Column#1, Column#8)),
└─HashLeftJoin_9 10000.00 root CARTESIAN inner join, inner:StreamAgg_16
├─TableReader_12 10000.00 root data:TableScan_11
│ └─TableScan_11 10000.00 cop[tikv] table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_16 1.00 root funcs:firstrow(Column#0), count(distinct Column#0), sum(Column#0), count(1)
└─StreamAgg_16 1.00 root funcs:firstrow(Column#15), count(distinct Column#16), sum(Column#17), count(1)
└─Projection_26 10000.00 root Column#4, Column#4, cast(isnull(Column#4))
└─TableReader_23 10000.00 root data:TableScan_22
└─TableScan_22 10000.00 cop[tikv] table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
Expand Down
16 changes: 8 additions & 8 deletions cmd/explaintest/r/tpch.result
Expand Up @@ -251,7 +251,7 @@ limit 10;
id count task operator info
Projection_14 10.00 root Column#18, Column#35, Column#13, Column#16
└─TopN_17 10.00 root Column#35:desc, Column#13:asc, offset:0, count:10
└─HashAgg_23 40252367.98 root group by:Column#45, Column#46, Column#47, funcs:sum(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0)
└─HashAgg_23 40252367.98 root group by:Column#49, Column#50, Column#51, funcs:sum(Column#45), firstrow(Column#46), firstrow(Column#47), firstrow(Column#48)
└─Projection_79 91515927.49 root mul(Column#23, minus(1, Column#24)), Column#13, Column#16, Column#18, Column#18, Column#13, Column#16
└─IndexHashJoin_38 91515927.49 root inner join, inner:IndexLookUp_28, outer key:Column#9, inner key:Column#18
├─HashRightJoin_69 22592975.51 root inner join, inner:TableReader_75, equal:[eq(Column#1, Column#10)]
Expand Down Expand Up @@ -345,7 +345,7 @@ revenue desc;
id count task operator info
Sort_23 5.00 root Column#51:desc
└─Projection_25 5.00 root Column#43, Column#49
└─HashAgg_28 5.00 root group by:Column#52, funcs:sum(Column#0), firstrow(Column#0)
└─HashAgg_28 5.00 root group by:Column#54, funcs:sum(Column#52), firstrow(Column#53)
└─Projection_86 11822812.50 root mul(Column#23, minus(1, Column#24)), Column#43, Column#43
└─HashLeftJoin_38 11822812.50 root inner join, inner:TableReader_84, equal:[eq(Column#38, Column#4) eq(Column#10, Column#1)]
├─IndexMergeJoin_49 11822812.50 root inner join, inner:TableReader_47, outer key:Column#18, inner key:Column#9
Expand Down Expand Up @@ -518,7 +518,7 @@ o_year;
id count task operator info
Sort_29 719.02 root Column#67:asc
└─Projection_31 719.02 root Column#62, div(Column#65, Column#66)
└─HashAgg_34 719.02 root group by:Column#75, funcs:sum(Column#0), sum(Column#0), firstrow(Column#0)
└─HashAgg_34 719.02 root group by:Column#78, funcs:sum(Column#75), sum(Column#76), firstrow(Column#77)
└─Projection_123 563136.02 root case(eq(Column#64, "INDIA"), Column#63, 0), Column#63, Column#62, Column#62
└─Projection_35 563136.02 root extract("YEAR", Column#38), mul(Column#22, minus(1, Column#23)), Column#56
└─HashLeftJoin_45 563136.02 root inner join, inner:TableReader_121, equal:[eq(Column#13, Column#55)]
Expand Down Expand Up @@ -660,7 +660,7 @@ limit 20;
id count task operator info
Projection_17 20.00 root Column#1, Column#2, Column#39, Column#6, Column#36, Column#3, Column#5, Column#8
└─TopN_20 20.00 root Column#39:desc, offset:0, count:20
└─HashAgg_26 3017307.69 root group by:Column#51, Column#52, Column#53, Column#54, Column#55, Column#56, Column#57, funcs:sum(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0), firstrow(Column#0)
└─HashAgg_26 3017307.69 root group by:Column#59, Column#60, Column#61, Column#62, Column#63, Column#64, Column#65, funcs:sum(Column#51), firstrow(Column#52), firstrow(Column#53), firstrow(Column#54), firstrow(Column#55), firstrow(Column#56), firstrow(Column#57), firstrow(Column#58)
└─Projection_67 12222016.17 root mul(Column#23, minus(1, Column#24)), Column#1, Column#2, Column#3, Column#5, Column#6, Column#8, Column#36, Column#1, Column#2, Column#6, Column#5, Column#36, Column#3, Column#8
└─IndexMergeJoin_39 12222016.17 root inner join, inner:Projection_37, outer key:Column#9, inner key:Column#18
├─HashLeftJoin_44 3017307.69 root inner join, inner:TableReader_63, equal:[eq(Column#1, Column#10)]
Expand Down Expand Up @@ -716,7 +716,7 @@ id count task operator info
Projection_57 1304801.67 root Column#1, Column#18
└─Sort_58 1304801.67 root Column#18:desc
└─Selection_60 1304801.67 root gt(Column#18, NULL)
└─HashAgg_63 1631002.09 root group by:Column#46, funcs:sum(Column#0), firstrow(Column#0)
└─HashAgg_63 1631002.09 root group by:Column#49, funcs:sum(Column#47), firstrow(Column#48)
└─Projection_89 1631002.09 root mul(Column#4, cast(Column#3)), Column#1, Column#1
└─HashRightJoin_67 1631002.09 root inner join, inner:HashRightJoin_80, equal:[eq(Column#7, Column#2)]
├─HashRightJoin_80 20000.00 root inner join, inner:TableReader_85, equal:[eq(Column#14, Column#10)]
Expand Down Expand Up @@ -769,7 +769,7 @@ l_shipmode;
id count task operator info
Sort_9 1.00 root Column#29:asc
└─Projection_11 1.00 root Column#24, Column#27, Column#28
└─HashAgg_14 1.00 root group by:Column#34, funcs:sum(Column#0), sum(Column#0), firstrow(Column#0)
└─HashAgg_14 1.00 root group by:Column#37, funcs:sum(Column#34), sum(Column#35), firstrow(Column#36)
└─Projection_40 10023369.01 root cast(case(or(eq(Column#6, "1-URGENT"), eq(Column#6, "2-HIGH")), 1, 0)), cast(case(and(ne(Column#6, "1-URGENT"), ne(Column#6, "2-HIGH")), 1, 0)), Column#24, Column#24
└─IndexMergeJoin_22 10023369.01 root inner join, inner:TableReader_20, outer key:Column#10, inner key:Column#1
├─TableReader_36 10023369.01 root data:Selection_35
Expand Down Expand Up @@ -840,7 +840,7 @@ and l_shipdate >= '1996-12-01'
and l_shipdate < date_add('1996-12-01', interval '1' month);
id count task operator info
Projection_8 1.00 root div(mul(100.00, Column#27), Column#28)
└─StreamAgg_13 1.00 root funcs:sum(Column#0), sum(Column#0)
└─StreamAgg_13 1.00 root funcs:sum(Column#31), sum(Column#32)
└─Projection_41 4121984.49 root case(like(Column#22, "PROMO%", 92), mul(Column#6, minus(1, Column#7)), 0), mul(Column#6, minus(1, Column#7))
└─IndexMergeJoin_36 4121984.49 root inner join, inner:TableReader_34, outer key:Column#2, inner key:Column#18
├─TableReader_27 4121984.49 root data:Selection_26
Expand Down Expand Up @@ -1090,7 +1090,7 @@ and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
);
id count task operator info
StreamAgg_13 1.00 root funcs:sum(Column#0)
StreamAgg_13 1.00 root funcs:sum(Column#29)
└─Projection_46 6286493.79 root mul(Column#6, minus(1, Column#7))
└─IndexMergeJoin_41 6286493.79 root inner join, inner:TableReader_39, outer key:Column#2, inner key:Column#18, other cond:or(and(and(eq(Column#21, "Brand#52"), in(Column#24, "SM CASE", "SM BOX", "SM PACK", "SM PKG")), and(ge(Column#5, 4), and(le(Column#5, 14), le(Column#23, 5)))), or(and(and(eq(Column#21, "Brand#11"), in(Column#24, "MED BAG", "MED BOX", "MED PKG", "MED PACK")), and(ge(Column#5, 18), and(le(Column#5, 28), le(Column#23, 10)))), and(and(eq(Column#21, "Brand#51"), in(Column#24, "LG CASE", "LG BOX", "LG PACK", "LG PKG")), and(ge(Column#5, 29), and(le(Column#5, 39), le(Column#23, 15))))))
├─TableReader_29 6286493.79 root data:Selection_28
Expand Down
7 changes: 4 additions & 3 deletions planner/core/rule_inject_extra_projection.go
Expand Up @@ -99,9 +99,10 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes
}
projExprs = append(projExprs, arg)
newArg := &expression.Column{
RetType: arg.GetType(),
ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(projSchemaCols))),
Index: cursor,
UniqueID: aggPlan.SCtx().GetSessionVars().AllocPlanColumnID(),
RetType: arg.GetType(),
ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(projSchemaCols))),
Index: cursor,
}
projSchemaCols = append(projSchemaCols, newArg)
f.Args[i] = newArg
Expand Down
38 changes: 23 additions & 15 deletions planner/core/task.go
Expand Up @@ -998,23 +998,31 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
t := tasks[0].copy()
inputRows := t.count()
if cop, ok := t.(*copTask); ok {
// copToFlash means whether the cop task is run on flash storage
copToFlash := isFlashCopTask(cop)
partialAgg, finalAgg := p.newPartialAggregate(copToFlash)
if partialAgg != nil {
if cop.tablePlan != nil {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
// We should not push agg down across double read, since the data of second read is ordered by handle instead of index.
// The `extraHandleCol` is added if the double read needs to keep order. So we just use it to decided
// whether the following plan is double read with order reserved.
if cop.extraHandleCol == nil {
copToFlash := isFlashCopTask(cop)
partialAgg, finalAgg := p.newPartialAggregate(copToFlash)
if partialAgg != nil {
if cop.tablePlan != nil {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
}
cop.addCost(p.GetCost(inputRows, false))
}
cop.addCost(p.GetCost(inputRows, false))
t = finishCopTask(p.ctx, cop)
inputRows = t.count()
attachPlan2Task(finalAgg, t)
} else {
t = finishCopTask(p.ctx, cop)
inputRows = t.count()
attachPlan2Task(p, t)
}
t = finishCopTask(p.ctx, cop)
inputRows = t.count()
attachPlan2Task(finalAgg, t)
} else {
attachPlan2Task(p, t)
}
Expand Down
3 changes: 2 additions & 1 deletion planner/core/testdata/plan_suite_in.json
Expand Up @@ -446,7 +446,8 @@
{"SQL": "select /*+ HASH_AGG() */ t1.a from t t1 where t1.a < any(select t2.b from t t2)"},
{"SQL": "select /*+ hash_agg() */ t1.a from t t1 where t1.a != any(select t2.b from t t2)"},
{"SQL": "select /*+ hash_agg() */ t1.a from t t1 where t1.a = all(select t2.b from t t2)"},
{"SQL": "select /*+ STREAM_AGG() */ sum(t1.a) from t t1 join t t2 on t1.b = t2.b group by t1.b", "AggPushDown": true}
{"SQL": "select /*+ STREAM_AGG() */ sum(t1.a) from t t1 join t t2 on t1.b = t2.b group by t1.b", "AggPushDown": true},
{"SQL": "select /*+ STREAM_AGG() */ e, sum(b) from t group by e"}
]
},
{
Expand Down
5 changes: 5 additions & 0 deletions planner/core/testdata/plan_suite_out.json
Expand Up @@ -1153,6 +1153,11 @@
"SQL": "select /*+ STREAM_AGG() */ sum(t1.a) from t t1 join t t2 on t1.b = t2.b group by t1.b",
"Best": "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))->Sort->Projection->StreamAgg}(Column#14,Column#2)->HashAgg",
"Warning": "[planner:1815]Optimizer Hint STREAM_AGG is inapplicable"
},
{
"SQL": "select /*+ STREAM_AGG() */ e, sum(b) from t group by e",
"Best": "TableReader(Table(t))->Sort->Projection->StreamAgg->Projection",
"Warning": ""
}
]
},
Expand Down

0 comments on commit 45bc789

Please sign in to comment.