Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: refactor a few methods about cost calculation #33681

Merged
merged 6 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ type PhysicalIndexLookUpReader struct {

// Used by partition table.
PartitionInfo PartitionInfo

// required by cost calculation
expectedCnt uint64
keepOrder bool
}

// Clone implements PhysicalPlan interface.
Expand Down
66 changes: 37 additions & 29 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,16 +442,15 @@ func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task {
}
t := &rootTask{
p: p,
cst: p.GetCost(outerTask, innerTask),
cst: p.GetCost(outerTask.count(), innerTask.count(), outerTask.cost(), innerTask.cost()),
}
p.cost = t.cost()
return t
}

// GetCost computes the cost of index join operator and its children.
func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 {
func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt float64, outerCost, innerCost float64) float64 {
var cpuCost float64
outerCnt, innerCnt := outerTask.count(), innerTask.count()
sessVars := p.ctx.GetSessionVars()
// Add the cost of evaluating outer filter, since inner filter of index join
// is always empty, we can simply tell whether outer filter is empty using the
Expand Down Expand Up @@ -494,8 +493,8 @@ func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 {
// since the executor is pipelined and not all workers are always in full load.
memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.MemoryFactor
// Cost of inner child plan, i.e, mainly I/O and network cost.
innerPlanCost := outerCnt * innerTask.cost()
return outerTask.cost() + innerPlanCost + cpuCost + memoryCost
innerPlanCost := outerCnt * innerCost
return outerCost + innerPlanCost + cpuCost + memoryCost
}

func getAvgRowSize(stats *property.StatsInfo, schema *expression.Schema) (size float64) {
Expand Down Expand Up @@ -916,56 +915,66 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
return t
}

func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
newTask := &rootTask{cst: t.cst}
// GetCost computes cost of index lookup operator itself.
func (p *PhysicalIndexLookUpReader) GetCost() (cost float64) {
indexPlan, tablePlan := p.indexPlan, p.tablePlan
ctx := p.ctx
sessVars := ctx.GetSessionVars()
p := PhysicalIndexLookUpReader{
tablePlan: t.tablePlan,
indexPlan: t.indexPlan,
ExtraHandleCol: t.extraHandleCol,
CommonHandleCols: t.commonHandleCols,
}.Init(ctx, t.tablePlan.SelectBlockOffset())
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
p.stats = t.tablePlan.statsInfo()
// Add cost of building table reader executors. Handles are extracted in batch style,
// each handle is a range, the CPU cost of building copTasks should be:
// (indexRows / batchSize) * batchSize * CPUFactor
// Since we don't know the number of copTasks built, ignore these network cost now.
indexRows := t.indexPlan.statsInfo().RowCount
indexRows := indexPlan.statsInfo().RowCount
idxCst := indexRows * sessVars.CPUFactor
// if the expectCnt is below the paging threshold, using paging API, recalculate idxCst.
// paging API reduces the count of index and table rows, however introduces more seek cost.
if ctx.GetSessionVars().EnablePaging && t.expectCnt > 0 && t.expectCnt <= paging.Threshold {
if ctx.GetSessionVars().EnablePaging && p.expectedCnt > 0 && p.expectedCnt <= paging.Threshold {
p.Paging = true
pagingCst := calcPagingCost(ctx, t)
pagingCst := calcPagingCost(ctx, p.indexPlan, p.expectedCnt)
// prevent enlarging the cost because we take paging as a better plan,
// if the cost is enlarged, it'll be easier to go another plan.
idxCst = math.Min(idxCst, pagingCst)
}
newTask.cst += idxCst
cost += idxCst
// Add cost of worker goroutines in index lookup.
numTblWorkers := float64(sessVars.IndexLookupConcurrency())
newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
cost += (numTblWorkers + 1) * sessVars.ConcurrencyFactor
// When building table reader executor for each batch, we would sort the handles. CPU
// cost of sort is:
// CPUFactor * batchSize * Log2(batchSize) * (indexRows / batchSize)
indexLookupSize := float64(sessVars.IndexLookupSize)
batchSize := math.Min(indexLookupSize, indexRows)
if batchSize > 2 {
sortCPUCost := (indexRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
cost += sortCPUCost
}
// Also, we need to sort the retrieved rows if index lookup reader is expected to return
// ordered results. Note that row count of these two sorts can be different, if there are
// operators above table scan.
tableRows := t.tablePlan.statsInfo().RowCount
tableRows := tablePlan.statsInfo().RowCount
selectivity := tableRows / indexRows
batchSize = math.Min(indexLookupSize*selectivity, tableRows)
if t.keepOrder && batchSize > 2 {
if p.keepOrder && batchSize > 2 {
sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
cost += sortCPUCost
}
return
}

func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
newTask := &rootTask{cst: t.cst}
p := PhysicalIndexLookUpReader{
tablePlan: t.tablePlan,
indexPlan: t.indexPlan,
ExtraHandleCol: t.extraHandleCol,
CommonHandleCols: t.commonHandleCols,
expectedCnt: t.expectCnt,
keepOrder: t.keepOrder,
}.Init(ctx, t.tablePlan.SelectBlockOffset())
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
p.stats = t.tablePlan.statsInfo()
newTask.cst += p.GetCost()
p.cost = newTask.cst

// Do not inject the extra Projection even if t.needExtraProj is set, or the schema between the phase-1 agg and
Expand Down Expand Up @@ -1004,11 +1013,10 @@ func extractRows(p PhysicalPlan) float64 {
}

// calcPagingCost calculates the cost for paging processing which may increase the seekCnt and reduce scanned rows.
func calcPagingCost(ctx sessionctx.Context, t *copTask) float64 {
func calcPagingCost(ctx sessionctx.Context, indexPlan PhysicalPlan, expectCnt uint64) float64 {
sessVars := ctx.GetSessionVars()
indexRows := t.indexPlan.statsInfo().RowCount
expectCnt := t.expectCnt
sourceRows := extractRows(t.indexPlan)
indexRows := indexPlan.StatsCount()
sourceRows := extractRows(indexPlan)
// with paging, the scanned rows is always less than or equal to source rows.
if uint64(sourceRows) < expectCnt {
expectCnt = uint64(sourceRows)
Expand Down