Skip to content

Commit

Permalink
planner: refactor a few methods about cost calculation (#33681)
Browse files Browse the repository at this point in the history
close #33722
  • Loading branch information
qw4990 committed Apr 6, 2022
1 parent 1c658e5 commit bd8c710
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
4 changes: 4 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ type PhysicalIndexLookUpReader struct {

// Used by partition table.
PartitionInfo PartitionInfo

// required by cost calculation
expectedCnt uint64
keepOrder bool
}

// Clone implements PhysicalPlan interface.
Expand Down
66 changes: 37 additions & 29 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,16 +442,15 @@ func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task {
}
t := &rootTask{
p: p,
cst: p.GetCost(outerTask, innerTask),
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()),
}
p.cost = t.cost()
return t
}

// GetCost computes the cost of index join operator and its children.
func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 {
func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, innerCost float64) float64 {
var cpuCost float64
outerCnt, innerCnt := outerTask.count(), innerTask.count()
sessVars := p.ctx.GetSessionVars()
// Add the cost of evaluating outer filter, since inner filter of index join
// is always empty, we can simply tell whether outer filter is empty using the
Expand Down Expand Up @@ -494,8 +493,8 @@ func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 {
// since the executor is pipelined and not all workers are always in full load.
memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.MemoryFactor
// Cost of inner child plan, i.e, mainly I/O and network cost.
innerPlanCost := outerCnt * innerTask.cost()
return outerTask.cost() + innerPlanCost + cpuCost + memoryCost
innerPlanCost := outerCnt * innerCost
return outerCost + innerPlanCost + cpuCost + memoryCost
}

func getAvgRowSize(stats *property.StatsInfo, schema *expression.Schema) (size float64) {
Expand Down Expand Up @@ -916,56 +915,66 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
return t
}

func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
newTask := &rootTask{cst: t.cst}
// GetCost computes cost of index lookup operator itself.
func (p *PhysicalIndexLookUpReader) GetCost() (cost float64) {
indexPlan, tablePlan := p.indexPlan, p.tablePlan
ctx := p.ctx
sessVars := ctx.GetSessionVars()
p := PhysicalIndexLookUpReader{
tablePlan: t.tablePlan,
indexPlan: t.indexPlan,
ExtraHandleCol: t.extraHandleCol,
CommonHandleCols: t.commonHandleCols,
}.Init(ctx, t.tablePlan.SelectBlockOffset())
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
p.stats = t.tablePlan.statsInfo()
// Add cost of building table reader executors. Handles are extracted in batch style,
// each handle is a range, the CPU cost of building copTasks should be:
// (indexRows / batchSize) * batchSize * CPUFactor
// Since we don't know the number of copTasks built, ignore these network cost now.
indexRows := t.indexPlan.statsInfo().RowCount
indexRows := indexPlan.statsInfo().RowCount
idxCst := indexRows * sessVars.CPUFactor
// if the expectCnt is below the paging threshold, using paging API, recalculate idxCst.
// paging API reduces the count of index and table rows, however introduces more seek cost.
if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold {
if ctx.GetSessionVars().EnablePaging && p.expectedCnt > 0 && p.expectedCnt <= paging.Threshold {
p.Paging = true
pagingCst := calcPagingCost(ctx, t)
pagingCst := calcPagingCost(ctx, p.indexPlan, p.expectedCnt)
// prevent enlarging the cost because we take paging as a better plan,
// if the cost is enlarged, it'll be easier to go another plan.
idxCst = math.Min(idxCst, pagingCst)
}
newTask.cst += idxCst
cost += idxCst
// Add cost of worker goroutines in index lookup.
numTblWorkers := float64(sessVars.IndexLookupConcurrency())
newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
cost += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
// When building table reader executor for each batch, we would sort the handles. CPU
// cost of sort is:
// CPUFactor * batchSize * Log2(batchSize) * (indexRows / batchSize)
indexLookupSize := float64(sessVars.IndexLookupSize)
batchSize := math.Min(indexLookupSize, indexRows)
if batchSize > 2 {
sortCPUCost := (indexRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
cost += sortCPUCost
}
// Also, we need to sort the retrieved rows if index lookup reader is expected to return
// ordered results. Note that row count of these two sorts can be different, if there are
// operators above table scan.
tableRows := t.tablePlan.statsInfo().RowCount
tableRows := tablePlan.statsInfo().RowCount
selectivity := tableRows / indexRows
batchSize = math.Min(indexLookupSize*selectivity, tableRows)
if t.keepOrder && batchSize > 2 {
if p.keepOrder && batchSize > 2 {
sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
cost += sortCPUCost
}
return
}

func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
newTask := &rootTask{cst: t.cst}
p := PhysicalIndexLookUpReader{
tablePlan: t.tablePlan,
indexPlan: t.indexPlan,
ExtraHandleCol: t.extraHandleCol,
CommonHandleCols: t.commonHandleCols,
expectedCnt: t.expectCnt,
keepOrder: t.keepOrder,
}.Init(ctx, t.tablePlan.SelectBlockOffset())
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
p.stats = t.tablePlan.statsInfo()
newTask.cst += p.GetCost()
p.cost = newTask.cst

// Do not inject the extra Projection even if t.needExtraProj is set, or the schema between the phase-1 agg and
Expand Down Expand Up @@ -1004,11 +1013,10 @@ func extractRows(p PhysicalPlan) float64 {
}

// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows.
func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 {
func calcPagingCost(ctx sessionctx.Context, indexPlan PhysicalPlan, expectCnt uint64) float64 {
sessVars := ctx.GetSessionVars()
indexRows := t.indexPlan.statsInfo().RowCount
expectCnt := t.expectCnt
sourceRows := extractRows(t.indexPlan)
indexRows := indexPlan.StatsCount()
sourceRows := extractRows(indexPlan)
// with paging, the scanned rows is always less than or equal to source rows.
if uint64(sourceRows) < expectCnt {
expectCnt = uint64(sourceRows)
Expand Down

0 comments on commit bd8c710

Please sign in to comment.