Skip to content

Commit

Permalink
planner: implement projection and table dual in cascades
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka committed Jul 22, 2019
1 parent 700bf2f commit b1bf97f
Show file tree
Hide file tree
Showing 20 changed files with 200 additions and 38 deletions.
56 changes: 55 additions & 1 deletion planner/cascades/implementation_rules.go
Expand Up @@ -15,6 +15,7 @@ package cascades

import (
plannercore "github.com/pingcap/tidb/planner/core"
impl "github.com/pingcap/tidb/planner/implementation"
"github.com/pingcap/tidb/planner/memo"
"github.com/pingcap/tidb/planner/property"
)
Expand All @@ -25,7 +26,7 @@ type ImplementationRule interface {
Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool)
// OnImplement generates physical plan using this rule for current GroupExpr. Note that
// childrenReqProps of generated physical plan should be set correspondingly in this function.
OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (impl memo.Implementation, err error)
OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (memo.Implementation, error)
}

// GetImplementationRules gets the all the candidate implementation rules based
Expand All @@ -44,4 +45,57 @@ var implementationMap = map[memo.Operand][]ImplementationRule{
nil,
},
*/
memo.OperandTableDual: {
&ImplTableDual{},
},
memo.OperandProjection: {
&ImplProjection{},
},
}

// ImplTableDual implements LogicalTableDual as PhysicalTableDual.
type ImplTableDual struct {
}

// Match implements ImplementationRule Match interface.
func (r *ImplTableDual) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
if !prop.IsEmpty() {
return false
}
return true
}

// OnImplement implements ImplementationRule OnImplement interface.
func (r *ImplTableDual) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (memo.Implementation, error) {
logicProp := expr.Group.Prop
logicDual := expr.ExprNode.(*plannercore.LogicalTableDual)
dual := plannercore.PhysicalTableDual{RowCount: logicDual.RowCount}.Init(logicDual.Context(), logicProp.Stats)
dual.SetSchema(logicProp.Schema)
return impl.NewTableDualImpl(dual), nil
}

// ImplProjection implements LogicalProjection as PhysicalProjection.
type ImplProjection struct {
}

// Match implements ImplementationRule Match interface.
func (r *ImplProjection) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return true
}

// OnImplement implements ImplementationRule OnImplement interface.
func (r *ImplProjection) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (memo.Implementation, error) {
logicProp := expr.Group.Prop
logicProj := expr.ExprNode.(*plannercore.LogicalProjection)
childProp, ok := logicProj.TryToGetChildProp(reqProp)
if !ok {
return nil, nil
}
proj := plannercore.PhysicalProjection{
Exprs: logicProj.Exprs,
CalculateNoDelay: logicProj.CalculateNoDelay,
AvoidColumnEvaluator: logicProj.AvoidColumnEvaluator,
}.Init(logicProj.Context(), logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), childProp)
proj.SetSchema(logicProp.Schema)
return impl.NewProjectionImpl(proj), nil
}
60 changes: 60 additions & 0 deletions planner/cascades/integration_test.go
@@ -0,0 +1,60 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
)

var _ = Suite(&testIntegrationSuite{})

type testIntegrationSuite struct {
store kv.Storage
tk *testkit.TestKit
}

func newStoreWithBootstrap() (kv.Storage, error) {
store, err := mockstore.NewMockTikvStore()
if err != nil {
return nil, err
}
_, err = session.BootstrapSession(store)
return store, err
}

func (s *testIntegrationSuite) SetUpSuite(c *C) {
var err error
s.store, err = newStoreWithBootstrap()
c.Assert(err, IsNil)
s.tk = testkit.NewTestKitWithInit(c, s.store)
s.tk.MustExec("set session tidb_enable_cascades_planner = 1")
}

func (s *testIntegrationSuite) TearDownSuite(c *C) {
s.store.Close()
}

func (s *testIntegrationSuite) TestSimpleProjDual(c *C) {
s.tk.MustQuery("explain select 1").Check(testkit.Rows(
"Projection_3 1.00 root 1",
"└─TableDual_4 1.00 root rows:1",
))
s.tk.MustQuery("select 1").Check(testkit.Rows(
"1",
))
}
2 changes: 1 addition & 1 deletion planner/cascades/optimize.go
Expand Up @@ -65,7 +65,7 @@ func exploreGroup(g *memo.Group) error {
}

g.Explored = true
for elem := g.Equivalents.Front(); elem != nil; elem.Next() {
for elem := g.Equivalents.Front(); elem != nil; elem = elem.Next() {
curExpr := elem.Value.(*memo.GroupExpr)
if curExpr.Explored {
continue
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Expand Up @@ -274,8 +274,8 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
}

func (e *Execute) rebuildRange(p Plan) error {
sctx := p.context()
sc := p.context().GetSessionVars().StmtCtx
sctx := p.Context()
sc := p.Context().GetSessionVars().StmtCtx
var err error
switch x := p.(type) {
case *PhysicalTableReader:
Expand Down
10 changes: 5 additions & 5 deletions planner/core/exhaust_physical_plans.go
Expand Up @@ -1087,10 +1087,10 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) []Ph
return joins
}

// tryToGetChildProp will check if this sort property can be pushed or not.
// TryToGetChildProp will check if this sort property can be pushed or not.
// When a sort column will be replaced by scalar function, we refuse it.
// When a sort column will be replaced by a constant, we just remove it.
func (p *LogicalProjection) tryToGetChildProp(prop *property.PhysicalProperty) (*property.PhysicalProperty, bool) {
func (p *LogicalProjection) TryToGetChildProp(prop *property.PhysicalProperty) (*property.PhysicalProperty, bool) {
newProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: prop.ExpectedCnt}
newCols := make([]property.Item, 0, len(prop.Items))
for _, col := range prop.Items {
Expand All @@ -1107,14 +1107,14 @@ func (p *LogicalProjection) tryToGetChildProp(prop *property.PhysicalProperty) (
}

func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
newProp, ok := p.tryToGetChildProp(prop)
newProp, ok := p.TryToGetChildProp(prop)
if !ok {
return nil
}
proj := PhysicalProjection{
Exprs: p.Exprs,
CalculateNoDelay: p.calculateNoDelay,
AvoidColumnEvaluator: p.avoidColumnEvaluator,
CalculateNoDelay: p.CalculateNoDelay,
AvoidColumnEvaluator: p.AvoidColumnEvaluator,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), newProp)
proj.SetSchema(p.schema)
return []PhysicalPlan{proj}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Expand Up @@ -857,7 +857,7 @@ func (b *PlanBuilder) buildProjection4Union(ctx context.Context, u *LogicalUnion
}
}
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs, avoidColumnEvaluator: true}.Init(b.ctx)
proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx)
proj.SetSchema(u.schema.Clone())
proj.SetChildren(child)
u.children[childID] = proj
Expand Down
8 changes: 4 additions & 4 deletions planner/core/logical_plans.go
Expand Up @@ -217,18 +217,18 @@ type LogicalProjection struct {
// In *UPDATE*, we should know this to tell different projections.
calculateGenCols bool

// calculateNoDelay indicates this Projection is the root Plan and should be
// CalculateNoDelay indicates this Projection is the root Plan and should be
// calculated without delay and will not return any result to client.
// Currently it is "true" only when the current sql query is a "DO" statement.
// See "https://dev.mysql.com/doc/refman/5.7/en/do.html" for more detail.
calculateNoDelay bool
CalculateNoDelay bool

// avoidColumnRef is a temporary variable which is ONLY used to avoid
// AvoidColumnRef is a temporary variable which is ONLY used to avoid
// building columnEvaluator for the expressions of Projection which is
// built by buildProjection4Union.
// This can be removed after column pool being supported.
// Related issue: TiDB#8141(https://github.com/pingcap/tidb/issues/8141)
avoidColumnEvaluator bool
AvoidColumnEvaluator bool
}

func (p *LogicalProjection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
5 changes: 3 additions & 2 deletions planner/core/plan.go
Expand Up @@ -40,7 +40,7 @@ type Plan interface {
// replaceExprColumns replace all the column reference in the plan's expression node.
replaceExprColumns(replace map[string]*expression.Column)

context() sessionctx.Context
Context() sessionctx.Context

// property.StatsInfo will return the property.StatsInfo for this plan.
statsInfo() *property.StatsInfo
Expand Down Expand Up @@ -312,7 +312,8 @@ func (p *basePhysicalPlan) SetChild(i int, child PhysicalPlan) {
p.children[i] = child
}

func (p *basePlan) context() sessionctx.Context {
// Context implements Plan Context interface.
func (p *basePlan) Context() sessionctx.Context {
return p.ctx
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Expand Up @@ -338,7 +338,7 @@ func (b *PlanBuilder) buildDo(ctx context.Context, v *ast.DoStmt) (Plan, error)
proj.SetChildren(p)
proj.self = proj
proj.SetSchema(schema)
proj.calculateNoDelay = true
proj.CalculateNoDelay = true
return proj, nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_aggregation_push_down.go
Expand Up @@ -316,7 +316,7 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u
}

func (a *aggregationPushDownSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) {
if !p.context().GetSessionVars().AllowAggPushDown {
if !p.Context().GetSessionVars().AllowAggPushDown {
return p, nil
}
return a.aggPushDown(p)
Expand Down
12 changes: 6 additions & 6 deletions planner/core/rule_inject_extra_projection.go
Expand Up @@ -73,7 +73,7 @@ func wrapCastForAggFuncs(sctx sessionctx.Context, aggFuncs []*aggregation.AggFun
func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression) PhysicalPlan {
hasScalarFunc := false

wrapCastForAggFuncs(aggPlan.context(), aggFuncs)
wrapCastForAggFuncs(aggPlan.Context(), aggFuncs)
for i := 0; !hasScalarFunc && i < len(aggFuncs); i++ {
for _, arg := range aggFuncs[i].Args {
_, isScalarFunc := arg.(*expression.ScalarFunction)
Expand Down Expand Up @@ -115,7 +115,7 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes
}
projExprs = append(projExprs, item)
newArg := &expression.Column{
UniqueID: aggPlan.context().GetSessionVars().AllocPlanColumnID(),
UniqueID: aggPlan.Context().GetSessionVars().AllocPlanColumnID(),
RetType: item.GetType(),
ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(projSchemaCols))),
Index: cursor,
Expand All @@ -130,7 +130,7 @@ func injectProjBelowAgg(aggPlan PhysicalPlan, aggFuncs []*aggregation.AggFuncDes
proj := PhysicalProjection{
Exprs: projExprs,
AvoidColumnEvaluator: false,
}.Init(aggPlan.context(), child.statsInfo().ScaleByExpectCnt(prop.ExpectedCnt), prop)
}.Init(aggPlan.Context(), child.statsInfo().ScaleByExpectCnt(prop.ExpectedCnt), prop)
proj.SetSchema(expression.NewSchema(projSchemaCols...))
proj.SetChildren(child)

Expand Down Expand Up @@ -164,7 +164,7 @@ func injectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan {
topProj := PhysicalProjection{
Exprs: topProjExprs,
AvoidColumnEvaluator: false,
}.Init(p.context(), p.statsInfo(), nil)
}.Init(p.Context(), p.statsInfo(), nil)
topProj.SetSchema(p.Schema().Clone())
topProj.SetChildren(p)

Expand All @@ -185,7 +185,7 @@ func injectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan {
}
bottomProjExprs = append(bottomProjExprs, itemExpr)
newArg := &expression.Column{
UniqueID: p.context().GetSessionVars().AllocPlanColumnID(),
UniqueID: p.Context().GetSessionVars().AllocPlanColumnID(),
RetType: itemExpr.GetType(),
ColName: model.NewCIStr(fmt.Sprintf("col_%d", len(bottomProjSchemaCols))),
Index: len(bottomProjSchemaCols),
Expand All @@ -198,7 +198,7 @@ func injectProjBelowSort(p PhysicalPlan, orderByItems []*ByItems) PhysicalPlan {
bottomProj := PhysicalProjection{
Exprs: bottomProjExprs,
AvoidColumnEvaluator: false,
}.Init(p.context(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), childProp)
}.Init(p.Context(), childPlan.statsInfo().ScaleByExpectCnt(childProp.ExpectedCnt), childProp)
bottomProj.SetSchema(expression.NewSchema(bottomProjSchemaCols...))
bottomProj.SetChildren(childPlan)
p.SetChildren(bottomProj)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_join_reorder.go
Expand Up @@ -55,7 +55,7 @@ type jrNode struct {
}

func (s *joinReOrderSolver) optimize(ctx context.Context, p LogicalPlan) (LogicalPlan, error) {
return s.optimizeRecursive(p.context(), p)
return s.optimizeRecursive(p.Context(), p)
}

// optimizeRecursive recursively collects join groups and applies join reorder algorithm for each group.
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_max_min_eliminate.go
Expand Up @@ -46,7 +46,7 @@ func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan) {
}

child := p.Children()[0]
ctx := p.context()
ctx := p.Context()

// If there's no column in f.GetArgs()[0], we still need limit and read data from real table because the result should NULL if the below is empty.
if len(expression.ExtractColumns(f.Args[0])) > 0 {
Expand Down
10 changes: 5 additions & 5 deletions planner/core/rule_partition_processor.go
Expand Up @@ -112,7 +112,7 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {
children := make([]LogicalPlan, 0, len(pi.Definitions))
for i, expr := range partitionExprs {
// If the select condition would never be satisified, prune that partition.
pruned, err := s.canBePruned(ds.context(), col, expr, ds.allConds)
pruned, err := s.canBePruned(ds.Context(), col, expr, ds.allConds)
if err != nil {
return nil, err
}
Expand All @@ -128,27 +128,27 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {

// Not a deep copy.
newDataSource := *ds
newDataSource.baseLogicalPlan = newBaseLogicalPlan(ds.context(), TypeTableScan, &newDataSource)
newDataSource.baseLogicalPlan = newBaseLogicalPlan(ds.Context(), TypeTableScan, &newDataSource)
newDataSource.isPartition = true
newDataSource.physicalTableID = pi.Definitions[i].ID
// There are many expression nodes in the plan tree use the original datasource
// id as FromID. So we set the id of the newDataSource with the original one to
// avoid traversing the whole plan tree to update the references.
newDataSource.id = ds.id
newDataSource.statisticTable = getStatsTable(ds.context(), ds.table.Meta(), pi.Definitions[i].ID)
newDataSource.statisticTable = getStatsTable(ds.Context(), ds.table.Meta(), pi.Definitions[i].ID)
children = append(children, &newDataSource)
}
if len(children) == 0 {
// No result after table pruning.
tableDual := LogicalTableDual{RowCount: 0}.Init(ds.context())
tableDual := LogicalTableDual{RowCount: 0}.Init(ds.Context())
tableDual.schema = ds.Schema()
return tableDual, nil
}
if len(children) == 1 {
// No need for the union all.
return children[0], nil
}
unionAll := LogicalUnionAll{}.Init(ds.context())
unionAll := LogicalUnionAll{}.Init(ds.Context())
unionAll.SetChildren(children...)
unionAll.SetSchema(ds.schema)
return unionAll, nil
Expand Down
8 changes: 4 additions & 4 deletions planner/core/rule_predicate_push_down.go
Expand Up @@ -35,14 +35,14 @@ func addSelection(p LogicalPlan, child LogicalPlan, conditions []expression.Expr
p.Children()[chIdx] = child
return
}
conditions = expression.PropagateConstant(p.context(), conditions)
conditions = expression.PropagateConstant(p.Context(), conditions)
// Return table dual when filter is constant false or null.
dual := conds2TableDual(child, conditions)
if dual != nil {
p.Children()[chIdx] = dual
return
}
selection := LogicalSelection{Conditions: conditions}.Init(p.context())
selection := LogicalSelection{Conditions: conditions}.Init(p.Context())
selection.SetChildren(child)
p.Children()[chIdx] = selection
}
Expand Down Expand Up @@ -480,9 +480,9 @@ func conds2TableDual(p LogicalPlan, conds []expression.Expression) LogicalPlan {
if !ok {
return nil
}
sc := p.context().GetSessionVars().StmtCtx
sc := p.Context().GetSessionVars().StmtCtx
if isTrue, err := con.Value.ToBool(sc); (err == nil && isTrue == 0) || con.Value.IsNull() {
dual := LogicalTableDual{}.Init(p.context())
dual := LogicalTableDual{}.Init(p.Context())
dual.SetSchema(p.Schema())
return dual
}
Expand Down

0 comments on commit b1bf97f

Please sign in to comment.