Skip to content

Commit

Permalink
planner/core: thoroughly push down count-distinct agg in the MPP mode. (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Jul 20, 2021
1 parent 16b4cc4 commit 0b6c3c9
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 4 deletions.
6 changes: 5 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2418,7 +2418,11 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64}
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
agg.SetSchema(la.schema.Clone())
agg.MppRunMode = MppTiDB
if la.HasDistinct() {
agg.MppRunMode = MppScalar
} else {
agg.MppRunMode = MppTiDB
}
hashAggs = append(hashAggs, agg)
}
return
Expand Down
7 changes: 7 additions & 0 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

Expand All @@ -40,6 +41,8 @@ type Fragment struct {
ExchangeSender *PhysicalExchangeSender // data exporter

IsRoot bool

singleton bool // indicates if this is a task running on a single node.
}

type tasksAndFrags struct {
Expand Down Expand Up @@ -121,6 +124,7 @@ func (f *Fragment) init(p PhysicalPlan) error {
}
f.TableScan = x
case *PhysicalExchangeReceiver:
f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
f.ExchangeReceivers = append(f.ExchangeReceivers, x)
case *PhysicalUnionAll:
return errors.New("unexpected union all detected")
Expand Down Expand Up @@ -246,6 +250,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
for _, r := range f.ExchangeReceivers {
childrenTasks = append(childrenTasks, r.Tasks...)
}
if f.singleton {
childrenTasks = childrenTasks[0:1]
}
tasks = e.constructMPPTasksByChildrenTasks(childrenTasks)
}
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,8 @@ const (
Mpp2Phase
// MppTiDB runs agg on TiDB (and a partial agg on TiFlash if in 2 phase agg)
MppTiDB
// MppScalar also has 2 phases. The second phase runs in a single task.
MppScalar
)

type basePhysicalAgg struct {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// the align the output schema. In the future, we can solve this in-compatibility by
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
Expand Down
22 changes: 22 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,28 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
attachPlan2Task(finalAgg, t)
t.addCost(p.GetCost(inputRows, true, false))
return t
case MppScalar:
proj := p.convertAvgForMPP()
partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true)
if partialAgg == nil || finalAgg == nil {
return invalidTask
}
attachPlan2Task(partialAgg, mpp)
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType}
newMpp := mpp.enforceExchangerImpl(prop)
attachPlan2Task(finalAgg, newMpp)
if proj == nil {
proj = PhysicalProjection{
Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)),
}.Init(p.ctx, p.statsInfo(), p.SelectBlockOffset())
for _, col := range p.Schema().Columns {
proj.Exprs = append(proj.Exprs, col)
}
proj.SetSchema(p.schema)
}
attachPlan2Task(proj, newMpp)
newMpp.addCost(p.GetCost(inputRows, false, true))
return newMpp
default:
return invalidTask
}
Expand Down
5 changes: 4 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@
"desc format = 'brief' select * from t join ( select count(distinct value), id from t group by id) as A on A.id = t.id",
"desc format = 'brief' select * from t join ( select count(1/value), id from t group by id) as A on A.id = t.id",
"desc format = 'brief' select /*+hash_agg()*/ sum(id) from (select value, id from t where id > value group by id, value)A group by value /*the exchange should have only one partition column: test.t.value*/",
"desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/"
"desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/",
"desc format = 'brief' select count(distinct value) from t",
"desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t",
"desc format = 'brief' select count(distinct value), count(value), avg(value) from t"
]
},
{
Expand Down
44 changes: 44 additions & 0 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,50 @@
" └─ExchangeSender 10000.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id",
" └─TableFullScan 10000.00 batchCop[tiflash] table:B keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(distinct value) from t",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 batchCop[tiflash] Column#4",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 batchCop[tiflash] Column#5",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct Column#4)->Column#5",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:Column#4, ",
" └─Projection 1.00 batchCop[tiflash] Column#4",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(distinct value), count(value), avg(value) from t",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 batchCop[tiflash] Column#4, Column#5, div(Column#6, cast(case(eq(Column#7, 0), 1, Column#7), decimal(20,0) BINARY))->Column#6",
" └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4, funcs:sum(Column#8)->Column#5, funcs:sum(Column#9)->Column#7, funcs:sum(Column#10)->Column#6",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, funcs:count(test.t.value)->Column#8, funcs:count(test.t.value)->Column#9, funcs:sum(test.t.value)->Column#10",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"
]
}
]
},
Expand Down

0 comments on commit 0b6c3c9

Please sign in to comment.