Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: change predicateColumnCollector to columnStatsUsageCollector and collect histogram-needed columns #30671

Merged
merged 14 commits into from
Dec 25, 2021
Merged
293 changes: 173 additions & 120 deletions planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,46 @@ import (
"github.com/pingcap/tidb/parser/model"
)

// predicateColumnCollector collects predicate columns from logical plan. Predicate columns are the columns whose statistics
// are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
type predicateColumnCollector struct {
// colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column.
colMap map[int64]map[model.TableColumnID]struct{}
const (
collectPredicateColumns uint64 = 1 << iota
collectHistNeededColumns
)

// columnStatsUsageCollector collects predicate columns and/or histogram-needed columns from logical plan.
// Predicate columns are the columns whose statistics are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
// Histogram-needed columns are the columns whose histograms are utilized when making query plans, which usually occur in the conditions pushed down to DataSource.
// The set of histogram-needed columns is the subset of that of predicate columns.
type columnStatsUsageCollector struct {
// collectMode indicates whether to collect predicate columns and/or histogram-needed columns
collectMode uint64
// predicateCols records predicate columns.
predicateCols map[model.TableColumnID]struct{}
// colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more comments about the special cases to explain this? It's not easy to understand why an expression.Column.UniqueID needs more table columns to calculate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// It is used for collecting predicate columns.
colMap map[int64]map[model.TableColumnID]struct{}
// histNeededCols records histogram-needed columns
histNeededCols map[model.TableColumnID]struct{}
// cols is used to store columns collected from expressions and saves some allocation.
cols []*expression.Column
}

func newPredicateColumnCollector() *predicateColumnCollector {
return &predicateColumnCollector{
colMap: make(map[int64]map[model.TableColumnID]struct{}),
predicateCols: make(map[model.TableColumnID]struct{}),
func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector {
collector := &columnStatsUsageCollector{
collectMode: collectMode,
// Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning.
cols: make([]*expression.Column, 0, 8),
}
if collectMode&collectPredicateColumns != 0 {
collector.predicateCols = make(map[model.TableColumnID]struct{})
collector.colMap = make(map[int64]map[model.TableColumnID]struct{})
}
if collectMode&collectHistNeededColumns != 0 {
collector.histNeededCols = make(map[model.TableColumnID]struct{})
}
return collector
}

func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
func (c *columnStatsUsageCollector) addPredicateColumn(col *expression.Column) {
tblColIDs, ok := c.colMap[col.UniqueID]
if !ok {
// It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs.
Expand All @@ -50,21 +69,21 @@ func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpression(expr expression.Expression) {
func (c *columnStatsUsageCollector) addPredicateColumnsFromExpression(expr expression.Expression) {
cols := expression.ExtractColumnsAndCorColumns(c.cols[:0], expr)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
func (c *columnStatsUsageCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this part? It seems the only difference is whether the expression is a slice or not.
Besides, we only need to export expression.ExtractColumnsAndCorColumnsFromExpressions, not all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

cols := expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
func (c *columnStatsUsageCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
if _, ok := c.colMap[col.UniqueID]; !ok {
c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{}
}
Expand All @@ -80,15 +99,15 @@ func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedC
}
}

func (c *predicateColumnCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) {
func (c *columnStatsUsageCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) {
c.updateColMap(col, expression.ExtractColumnsAndCorColumns(c.cols[:0], expr))
}

func (c *predicateColumnCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
func (c *columnStatsUsageCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list))
}

func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (ds *DataSource) updateColMapAndAddPredicateColumns(c *columnStatsUsageCollector) {
tblID := ds.TableInfo().ID
for _, col := range ds.Schema().Columns {
tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID}
Expand All @@ -98,7 +117,7 @@ func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(ds.pushedDownConds)
}

func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *columnStatsUsageCollector) {
// The only schema change is merging two schemas so there is no new column.
// Assume statistics of all the columns in EqualConditions/LeftConditions/RightConditions/OtherConditions are needed.
exprs := make([]expression.Expression, 0, len(p.EqualConditions)+len(p.LeftConditions)+len(p.RightConditions)+len(p.OtherConditions))
Expand All @@ -117,7 +136,7 @@ func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(exprs)
}

func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *columnStatsUsageCollector) {
// statistics of the ith column of UnionAll come from statistics of the ith column of each child.
schemas := make([]*expression.Schema, 0, len(p.Children()))
relatedCols := make([]*expression.Column, 0, len(p.Children()))
Expand All @@ -133,119 +152,153 @@ func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnC
}
}

func (c *predicateColumnCollector) collectFromPlan(lp LogicalPlan) {
func (ds *DataSource) addHistNeededColumns(c *columnStatsUsageCollector) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change function signature to func (c *columnStatsUsageCollector) funcName(ds *DataSource)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

tblID := ds.TableInfo().ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be physicalTableID?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thinks for reminding. We can first check DataSource.isPartition. If it is true, use DataSource.physicalTableID. Otherwise use DataSource.TableInfo().ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, after #30754 is merged we can just use physicalTableID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it and added test.

columns := expression.ExtractColumnsFromExpressions(c.cols[:0], ds.pushedDownConds, nil)
for _, col := range columns {
tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID}
c.histNeededCols[tblColID] = struct{}{}
}
}

func (c *columnStatsUsageCollector) collectFromPlan(lp LogicalPlan) {
for _, child := range lp.Children() {
c.collectFromPlan(child)
}
switch x := lp.(type) {
case *DataSource:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalIndexScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalIndexScan.AccessConds? Is LogicalIndexScan.AccessConds a subset of LogicalIndexScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalTableScan.AccessConds? Is LogicalTableScan.AccessConds a subset of LogicalTableScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *TiKVSingleGather:
// TODO: Is it redundant?
x.Source.updateColMapAndAddPredicateColumns(c)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpression(schema.Columns[i], expr)
}
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently?
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalApply:
x.updateColMapAndAddPredicateColumns(c)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in predicateColumnCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalPartitionUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
if c.collectMode&collectPredicateColumns != 0 {
switch x := lp.(type) {
case *DataSource:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalIndexScan:
x.Source.updateColMapAndAddPredicateColumns(c)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
x.Source.updateColMapAndAddPredicateColumns(c)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpression(schema.Columns[i], expr)
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently?
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalApply:
x.updateColMapAndAddPredicateColumns(c)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in columnStatsUsageCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalPartitionUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
if c.collectMode&collectHistNeededColumns != 0 {
// Histogram-needed columns are the columns which occur in the conditions pushed down to DataSource.
// We don't consider LogicalCTE because seedLogicalPlan and recursiveLogicalPlan haven't got logical optimization
// yet(seedLogicalPlan and recursiveLogicalPlan are optimized in DeriveStats phase). Without logical optimization,
// there is no condition pushed down to DataSource so no histogram-needed column can be collected.
switch x := lp.(type) {
case *DataSource:
x.addHistNeededColumns(c)
case *LogicalIndexScan:
x.Source.addHistNeededColumns(c)
case *LogicalTableScan:
x.Source.addHistNeededColumns(c)
}
}
}

// CollectPredicateColumnsForTest collects predicate columns from logical plan. It is only for test.
func CollectPredicateColumnsForTest(lp LogicalPlan) []model.TableColumnID {
collector := newPredicateColumnCollector()
// CollectColumnStatsUsageForTest is used for test.
// If onlyHistNeeded is true, it collects histogram-needed columns from logical plan.
// Otherwise, it collects predicate columns from logical plan.
func CollectColumnStatsUsageForTest(lp LogicalPlan, onlyHistNeeded bool) []model.TableColumnID {
var collectMode uint64
if onlyHistNeeded {
collectMode = collectHistNeededColumns
} else {
collectMode = collectPredicateColumns
}
collector := newColumnStatsUsageCollector(collectMode)
collector.collectFromPlan(lp)
tblColIDs := make([]model.TableColumnID, 0, len(collector.predicateCols))
for tblColID := range collector.predicateCols {
var colSet map[model.TableColumnID]struct{}
if onlyHistNeeded {
colSet = collector.histNeededCols
} else {
colSet = collector.predicateCols
}
tblColIDs := make([]model.TableColumnID, 0, len(colSet))
for tblColID := range colSet {
tblColIDs = append(tblColIDs, tblColID)
}
return tblColIDs
Expand Down