Skip to content

Commit

Permalink
planner: classify logical apply into a separate file for later pkg mo…
Browse files Browse the repository at this point in the history
…ve. (#54795)

ref #51664, ref #52714
  • Loading branch information
AilinKid committed Jul 22, 2024
1 parent 669bf4d commit c784592
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 162 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (*ImplApply) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (
// OnImplement implements ImplementationRule OnImplement interface
func (*ImplApply) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
la := expr.ExprNode.(*plannercore.LogicalApply)
join := la.GetHashJoin(reqProp)
join := plannercore.GetHashJoin(la, reqProp)
physicalApply := plannercore.PhysicalApply{
PhysicalHashJoin: *join,
OuterSchema: la.CorCols,
Expand Down
8 changes: 4 additions & 4 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2744,12 +2744,12 @@ func MatchItems(p *property.PhysicalProperty, items []*util.ByItems) bool {
}

// GetHashJoin is public for cascades planner.
func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHashJoin {
func GetHashJoin(la *LogicalApply, prop *property.PhysicalProperty) *PhysicalHashJoin {
return getHashJoin(&la.LogicalJoin, prop, 1, false)
}

// ExhaustPhysicalPlans implements LogicalPlan interface.
func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
// ExhaustPhysicalPlans4LogicalApply generates the physical plan for a logical apply.
func ExhaustPhysicalPlans4LogicalApply(la *LogicalApply, prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
if !prop.AllColsFromSchema(la.Children()[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
la.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `Apply` is not supported now.")
Expand All @@ -2760,7 +2760,7 @@ func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([
return nil, true, nil
}
disableAggPushDownToCop(la.Children()[0])
join := la.GetHashJoin(prop)
join := GetHashJoin(la, prop)
var columns = make([]*expression.Column, 0, len(la.CorCols))
for _, colColumn := range la.CorCols {
columns = append(columns, &colColumn.Column)
Expand Down
5 changes: 0 additions & 5 deletions pkg/planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,6 @@ func formatWindowFuncDescs(ctx expression.EvalContext, buffer *bytes.Buffer, des
return buffer
}

// ExplainInfo implements Plan interface.
func (p *LogicalApply) ExplainInfo() string {
return p.LogicalJoin.ExplainInfo()
}

// ExplainInfo implements Plan interface.
func (ds *DataSource) ExplainInfo() string {
buffer := bytes.NewBufferString("")
Expand Down
213 changes: 211 additions & 2 deletions pkg/planner/core/logical_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/plancodec"
)

// LogicalApply gets one row from outer executor and gets one row from inner executor according to outer row.
Expand All @@ -32,7 +39,143 @@ type LogicalApply struct {
NoDecorrelate bool
}

// ExtractCorrelatedCols implements LogicalPlan interface.
// Init initializes LogicalApply.
func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply {
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset)
return &la
}

// *************************** start implementation of Plan interface ***************************

// ExplainInfo implements Plan interface.
func (la *LogicalApply) ExplainInfo() string {
return la.LogicalJoin.ExplainInfo()
}

// ReplaceExprColumns implements base.LogicalPlan interface.
func (la *LogicalApply) ReplaceExprColumns(replace map[string]*expression.Column) {
la.LogicalJoin.ReplaceExprColumns(replace)
for _, coCol := range la.CorCols {
dst := replace[string(coCol.Column.HashCode())]
if dst != nil {
coCol.Column = *dst
}
}
}

// *************************** end implementation of Plan interface ***************************

// *************************** start implementation of logicalPlan interface ***************************

// HashCode inherits the BaseLogicalPlan.LogicalPlan.<0th> implementation.

// PredicatePushDown inherits the BaseLogicalPlan.LogicalPlan.<1st> implementation.

// PruneColumns implements base.LogicalPlan.<2nd> interface.
func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
leftCols, rightCols := la.extractUsedCols(parentUsedCols)
allowEliminateApply := fixcontrol.GetBoolWithDefault(la.SCtx().GetSessionVars().GetOptimizerFixControlMap(), fixcontrol.Fix45822, true)
var err error
if allowEliminateApply && rightCols == nil && la.JoinType == LeftOuterJoin {
logicaltrace.ApplyEliminateTraceStep(la.Children()[1], opt)
resultPlan := la.Children()[0]
// reEnter the new child's column pruning, returning child[0] as a new child here.
return resultPlan.PruneColumns(parentUsedCols, opt)
}

// column pruning for child-1.
la.Children()[1], err = la.Children()[1].PruneColumns(rightCols, opt)
if err != nil {
return nil, err
}
addConstOneForEmptyProjection(la.Children()[1])

la.CorCols = coreusage.ExtractCorColumnsBySchema4LogicalPlan(la.Children()[1], la.Children()[0].Schema())
for _, col := range la.CorCols {
leftCols = append(leftCols, &col.Column)
}

// column pruning for child-0.
la.Children()[0], err = la.Children()[0].PruneColumns(leftCols, opt)
if err != nil {
return nil, err
}
addConstOneForEmptyProjection(la.Children()[0])
la.mergeSchema()
return la, nil
}

// FindBestTask inherits BaseLogicalPlan.LogicalPlan.<3rd> implementation.

// BuildKeyInfo inherits BaseLogicalPlan.LogicalPlan.<4th> implementation.

// PushDownTopN inherits BaseLogicalPlan.LogicalPlan.<5th> implementation.

// DeriveTopN inherits BaseLogicalPlan.LogicalPlan.<6th> implementation.

// PredicateSimplification inherits BaseLogicalPlan.LogicalPlan.<7th> implementation.

// ConstantPropagation inherits BaseLogicalPlan.LogicalPlan.<8th> implementation.

// PullUpConstantPredicates inherits BaseLogicalPlan.LogicalPlan.<9th> implementation.

// RecursiveDeriveStats inherits BaseLogicalPlan.LogicalPlan.<10th> implementation.

// DeriveStats implements base.LogicalPlan.<11th> interface.
func (la *LogicalApply) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error) {
if la.StatsInfo() != nil {
// Reload GroupNDVs since colGroups may have changed.
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childStats)
return la.StatsInfo(), nil
}
leftProfile := childStats[0]
la.SetStats(&property.StatsInfo{
RowCount: leftProfile.RowCount,
ColNDVs: make(map[int64]float64, selfSchema.Len()),
})
for id, c := range leftProfile.ColNDVs {
la.StatsInfo().ColNDVs[id] = c
}
if la.JoinType == LeftOuterSemiJoin || la.JoinType == AntiLeftOuterSemiJoin {
la.StatsInfo().ColNDVs[selfSchema.Columns[selfSchema.Len()-1].UniqueID] = 2.0
} else {
for i := childSchema[0].Len(); i < selfSchema.Len(); i++ {
la.StatsInfo().ColNDVs[selfSchema.Columns[i].UniqueID] = leftProfile.RowCount
}
}
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childStats)
return la.StatsInfo(), nil
}

// ExtractColGroups implements base.LogicalPlan.<12th> interface.
func (la *LogicalApply) ExtractColGroups(colGroups [][]*expression.Column) [][]*expression.Column {
var outerSchema *expression.Schema
// Apply doesn't have RightOuterJoin.
if la.JoinType == LeftOuterJoin || la.JoinType == LeftOuterSemiJoin || la.JoinType == AntiLeftOuterSemiJoin {
outerSchema = la.Children()[0].Schema()
}
if len(colGroups) == 0 || outerSchema == nil {
return nil
}
_, offsets := outerSchema.ExtractColGroups(colGroups)
if len(offsets) == 0 {
return nil
}
extracted := make([][]*expression.Column, len(offsets))
for i, offset := range offsets {
extracted[i] = colGroups[offset]
}
return extracted
}

// PreparePossibleProperties inherits BaseLogicalPlan.LogicalPlan.<13th> implementation.

// ExhaustPhysicalPlans implements base.LogicalPlan.<14th> interface.
func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
return ExhaustPhysicalPlans4LogicalApply(la, prop)
}

// ExtractCorrelatedCols implements base.LogicalPlan.<15th> interface.
func (la *LogicalApply) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := la.LogicalJoin.ExtractCorrelatedCols()
for i := len(corCols) - 1; i >= 0; i-- {
Expand All @@ -43,7 +186,19 @@ func (la *LogicalApply) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
return corCols
}

// ExtractFD implements the LogicalPlan interface.
// MaxOneRow inherits BaseLogicalPlan.LogicalPlan.<16th> implementation.

// Children inherits BaseLogicalPlan.LogicalPlan.<17th> implementation.

// SetChildren inherits BaseLogicalPlan.LogicalPlan.<18th> implementation.

// SetChild inherits BaseLogicalPlan.LogicalPlan.<19th> implementation.

// RollBackTaskMap inherits BaseLogicalPlan.LogicalPlan.<20th> implementation.

// CanPushToCop inherits BaseLogicalPlan.LogicalPlan.<21st> implementation.

// ExtractFD implements the base.LogicalPlan.<22nd> interface.
func (la *LogicalApply) ExtractFD() *fd.FDSet {
innerPlan := la.Children()[1]
// build the join correlated equal condition for apply join, this equal condition is used for deriving the transitive FD between outer and inner side.
Expand Down Expand Up @@ -78,3 +233,57 @@ func (la *LogicalApply) ExtractFD() *fd.FDSet {
return &fd.FDSet{HashCodeToUniqueID: make(map[string]int)}
}
}

// GetBaseLogicalPlan inherits BaseLogicalPlan.LogicalPlan.<23rd> implementation.

// ConvertOuterToInnerJoin inherits BaseLogicalPlan.LogicalPlan.<24th> implementation.

// *************************** end implementation of logicalPlan interface ***************************

// CanPullUpAgg checks if an apply can pull an aggregation up.
func (la *LogicalApply) CanPullUpAgg() bool {
if la.JoinType != InnerJoin && la.JoinType != LeftOuterJoin {
return false
}
if len(la.EqualConditions)+len(la.LeftConditions)+len(la.RightConditions)+len(la.OtherConditions) > 0 {
return false
}
return len(la.Children()[0].Schema().Keys) > 0
}

// DeCorColFromEqExpr checks whether it's an equal condition of form `col = correlated col`. If so we will change the decorrelated
// column to normal column to make a new equal condition.
func (la *LogicalApply) DeCorColFromEqExpr(expr expression.Expression) expression.Expression {
sf, ok := expr.(*expression.ScalarFunction)
if !ok || sf.FuncName.L != ast.EQ {
return nil
}
if col, lOk := sf.GetArgs()[0].(*expression.Column); lOk {
if corCol, rOk := sf.GetArgs()[1].(*expression.CorrelatedColumn); rOk {
ret := corCol.Decorrelate(la.Schema())
if _, ok := ret.(*expression.CorrelatedColumn); ok {
return nil
}
// We should make sure that the equal condition's left side is the join's left join key, right is the right key.
return expression.NewFunctionInternal(la.SCtx().GetExprCtx(), ast.EQ, types.NewFieldType(mysql.TypeTiny), ret, col)
}
}
if corCol, lOk := sf.GetArgs()[0].(*expression.CorrelatedColumn); lOk {
if col, rOk := sf.GetArgs()[1].(*expression.Column); rOk {
ret := corCol.Decorrelate(la.Schema())
if _, ok := ret.(*expression.CorrelatedColumn); ok {
return nil
}
// We should make sure that the equal condition's left side is the join's left join key, right is the right key.
return expression.NewFunctionInternal(la.SCtx().GetExprCtx(), ast.EQ, types.NewFieldType(mysql.TypeTiny), ret, col)
}
}
return nil
}

func (la *LogicalApply) getGroupNDVs(colGroups [][]*expression.Column, childStats []*property.StatsInfo) []property.GroupNDV {
if len(colGroups) > 0 && (la.JoinType == LeftOuterSemiJoin || la.JoinType == AntiLeftOuterSemiJoin || la.JoinType == LeftOuterJoin) {
return childStats[0].GroupNDVs
}
return nil
}
6 changes: 0 additions & 6 deletions pkg/planner/core/logical_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource {
return &ds
}

// Init initializes LogicalApply.
func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply {
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset)
return &la
}

// Init initializes LogicalProjection.
func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeExpand, &p, offset)
Expand Down
36 changes: 0 additions & 36 deletions pkg/planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
)
Expand Down Expand Up @@ -167,40 +165,6 @@ func (ds *DataSource) PruneColumns(parentUsedCols []*expression.Column, opt *opt
return ds, nil
}

// PruneColumns implements base.LogicalPlan interface.
func (la *LogicalApply) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
leftCols, rightCols := la.extractUsedCols(parentUsedCols)
allowEliminateApply := fixcontrol.GetBoolWithDefault(la.SCtx().GetSessionVars().GetOptimizerFixControlMap(), fixcontrol.Fix45822, true)
var err error
if allowEliminateApply && rightCols == nil && la.JoinType == LeftOuterJoin {
logicaltrace.ApplyEliminateTraceStep(la.Children()[1], opt)
resultPlan := la.Children()[0]
// reEnter the new child's column pruning, returning child[0] as a new child here.
return resultPlan.PruneColumns(parentUsedCols, opt)
}

// column pruning for child-1.
la.Children()[1], err = la.Children()[1].PruneColumns(rightCols, opt)
if err != nil {
return nil, err
}
addConstOneForEmptyProjection(la.Children()[1])

la.CorCols = coreusage.ExtractCorColumnsBySchema4LogicalPlan(la.Children()[1], la.Children()[0].Schema())
for _, col := range la.CorCols {
leftCols = append(leftCols, &col.Column)
}

// column pruning for child-0.
la.Children()[0], err = la.Children()[0].PruneColumns(leftCols, opt)
if err != nil {
return nil, err
}
addConstOneForEmptyProjection(la.Children()[0])
la.mergeSchema()
return la, nil
}

func (*columnPruner) name() string {
return "column_prune"
}
Expand Down
Loading

0 comments on commit c784592

Please sign in to comment.