Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: change predicateColumnCollector to columnStatsUsageCollector and collect histogram-needed columns #30671

Merged
merged 14 commits into from
Dec 25, 2021
Merged
8 changes: 4 additions & 4 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ func extractColumns(result []*Column, expr Expression, filter func(*Column) bool
return result
}

// ExtractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`.
func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
// extractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`.
func extractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
switch v := expr.(type) {
case *Column:
result = append(result, v)
case *CorrelatedColumn:
result = append(result, &v.Column)
case *ScalarFunction:
for _, arg := range v.GetArgs() {
result = ExtractColumnsAndCorColumns(result, arg)
result = extractColumnsAndCorColumns(result, arg)
}
}
return result
Expand All @@ -184,7 +184,7 @@ func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
// ExtractColumnsAndCorColumnsFromExpressions extracts columns and correlated columns from expressions and append them to `result`.
func ExtractColumnsAndCorColumnsFromExpressions(result []*Column, list []Expression) []*Column {
for _, expr := range list {
result = ExtractColumnsAndCorColumns(result, expr)
result = extractColumnsAndCorColumns(result, expr)
}
return result
}
Expand Down
296 changes: 165 additions & 131 deletions planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,49 @@ import (
"github.com/pingcap/tidb/parser/model"
)

// predicateColumnCollector collects predicate columns from logical plan. Predicate columns are the columns whose statistics
// are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
type predicateColumnCollector struct {
// colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column.
colMap map[int64]map[model.TableColumnID]struct{}
const (
collectPredicateColumns uint64 = 1 << iota
collectHistNeededColumns
)

// columnStatsUsageCollector collects predicate columns and/or histogram-needed columns from logical plan.
// Predicate columns are the columns whose statistics are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
// Histogram-needed columns are the columns whose histograms are utilized when making query plans, which usually occur in the conditions pushed down to DataSource.
// The set of histogram-needed columns is the subset of that of predicate columns.
type columnStatsUsageCollector struct {
// collectMode indicates whether to collect predicate columns and/or histogram-needed columns
collectMode uint64
// predicateCols records predicate columns.
predicateCols map[model.TableColumnID]struct{}
// colMap maps expression.Column.UniqueID to the table columns whose statistics may be utilized to calculate statistics of the column.
// It is used for collecting predicate columns.
// For example, in `select count(distinct a, b) as e from t`, the count of column `e` is calculated as `max(ndv(t.a), ndv(t.b))` if
// we don't know `ndv(t.a, t.b)`(see (*LogicalAggregation).DeriveStats and getColsNDV for details). So when calculating the statistics
// of column `e`, we may use the statistics of column `t.a` and `t.b`.
colMap map[int64]map[model.TableColumnID]struct{}
// histNeededCols records histogram-needed columns
histNeededCols map[model.TableColumnID]struct{}
// cols is used to store columns collected from expressions and saves some allocation.
cols []*expression.Column
}

func newPredicateColumnCollector() *predicateColumnCollector {
return &predicateColumnCollector{
colMap: make(map[int64]map[model.TableColumnID]struct{}),
predicateCols: make(map[model.TableColumnID]struct{}),
func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector {
collector := &columnStatsUsageCollector{
collectMode: collectMode,
// Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning.
cols: make([]*expression.Column, 0, 8),
}
if collectMode&collectPredicateColumns != 0 {
collector.predicateCols = make(map[model.TableColumnID]struct{})
collector.colMap = make(map[int64]map[model.TableColumnID]struct{})
}
if collectMode&collectHistNeededColumns != 0 {
collector.histNeededCols = make(map[model.TableColumnID]struct{})
}
return collector
}

func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
func (c *columnStatsUsageCollector) addPredicateColumn(col *expression.Column) {
tblColIDs, ok := c.colMap[col.UniqueID]
if !ok {
// It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs.
Expand All @@ -50,21 +72,14 @@ func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpression(expr expression.Expression) {
cols := expression.ExtractColumnsAndCorColumns(c.cols[:0], expr)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
func (c *columnStatsUsageCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
cols := expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
func (c *columnStatsUsageCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
if _, ok := c.colMap[col.UniqueID]; !ok {
c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{}
}
Expand All @@ -80,15 +95,11 @@ func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedC
}
}

func (c *predicateColumnCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) {
c.updateColMap(col, expression.ExtractColumnsAndCorColumns(c.cols[:0], expr))
}

func (c *predicateColumnCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
func (c *columnStatsUsageCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list))
}

func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *DataSource) {
tblID := ds.TableInfo().ID
for _, col := range ds.Schema().Columns {
tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID}
Expand All @@ -98,7 +109,7 @@ func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(ds.pushedDownConds)
}

func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForJoin(p *LogicalJoin) {
// The only schema change is merging two schemas so there is no new column.
// Assume statistics of all the columns in EqualConditions/LeftConditions/RightConditions/OtherConditions are needed.
exprs := make([]expression.Expression, 0, len(p.EqualConditions)+len(p.LeftConditions)+len(p.RightConditions)+len(p.OtherConditions))
Expand All @@ -117,7 +128,7 @@ func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(exprs)
}

func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForUnionAll(p *LogicalUnionAll) {
// statistics of the ith column of UnionAll come from statistics of the ith column of each child.
schemas := make([]*expression.Schema, 0, len(p.Children()))
relatedCols := make([]*expression.Column, 0, len(p.Children()))
Expand All @@ -133,120 +144,143 @@ func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnC
}
}

func (c *predicateColumnCollector) collectFromPlan(lp LogicalPlan) {
func (c *columnStatsUsageCollector) addHistNeededColumns(ds *DataSource) {
columns := expression.ExtractColumnsFromExpressions(c.cols[:0], ds.pushedDownConds, nil)
for _, col := range columns {
tblColID := model.TableColumnID{TableID: ds.physicalTableID, ColumnID: col.ID}
c.histNeededCols[tblColID] = struct{}{}
}
}

func (c *columnStatsUsageCollector) collectFromPlan(lp LogicalPlan) {
for _, child := range lp.Children() {
c.collectFromPlan(child)
}
switch x := lp.(type) {
case *DataSource:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalIndexScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalIndexScan.AccessConds? Is LogicalIndexScan.AccessConds a subset of LogicalIndexScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalTableScan.AccessConds? Is LogicalTableScan.AccessConds a subset of LogicalTableScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *TiKVSingleGather:
// TODO: Is it redundant?
x.Source.updateColMapAndAddPredicateColumns(c)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpression(schema.Columns[i], expr)
}
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently?
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalApply:
x.updateColMapAndAddPredicateColumns(c)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in predicateColumnCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalPartitionUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
if c.collectMode&collectPredicateColumns != 0 {
switch x := lp.(type) {
case *DataSource:
c.collectPredicateColumnsForDataSource(x)
case *LogicalIndexScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpressions(schema.Columns[i], []expression.Expression{expr})
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// We don't use statistics of the columns in LogicalWindow.OrderBy currently.
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
c.collectPredicateColumnsForJoin(x)
case *LogicalApply:
c.collectPredicateColumnsForJoin(&x.LogicalJoin)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in columnStatsUsageCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpressions([]expression.Expression{item.Expr})
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpressions([]expression.Expression{item.Expr})
}
case *LogicalUnionAll:
c.collectPredicateColumnsForUnionAll(x)
case *LogicalPartitionUnionAll:
c.collectPredicateColumnsForUnionAll(&x.LogicalUnionAll)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
if c.collectMode&collectHistNeededColumns != 0 {
// Histogram-needed columns are the columns which occur in the conditions pushed down to DataSource.
// We don't consider LogicalCTE because seedLogicalPlan and recursiveLogicalPlan haven't got logical optimization
// yet(seedLogicalPlan and recursiveLogicalPlan are optimized in DeriveStats phase). Without logical optimization,
// there is no condition pushed down to DataSource so no histogram-needed column can be collected.
switch x := lp.(type) {
case *DataSource:
c.addHistNeededColumns(x)
case *LogicalIndexScan:
c.addHistNeededColumns(x.Source)
case *LogicalTableScan:
c.addHistNeededColumns(x.Source)
}
}
}

// CollectPredicateColumnsForTest collects predicate columns from logical plan. It is only for test.
func CollectPredicateColumnsForTest(lp LogicalPlan) []model.TableColumnID {
collector := newPredicateColumnCollector()
// CollectColumnStatsUsage collects column stats usage from logical plan.
// The first return value is predicate columns and the second return value is histogram-needed columns.
func CollectColumnStatsUsage(lp LogicalPlan) ([]model.TableColumnID, []model.TableColumnID) {
collector := newColumnStatsUsageCollector(collectPredicateColumns | collectHistNeededColumns)
collector.collectFromPlan(lp)
tblColIDs := make([]model.TableColumnID, 0, len(collector.predicateCols))
for tblColID := range collector.predicateCols {
tblColIDs = append(tblColIDs, tblColID)
set2slice := func(set map[model.TableColumnID]struct{}) []model.TableColumnID {
ret := make([]model.TableColumnID, 0, len(set))
for tblColID := range set {
ret = append(ret, tblColID)
}
return ret
}
return tblColIDs
return set2slice(collector.predicateCols), set2slice(collector.histNeededCols)
}