Skip to content

Commit

Permalink
planner: supply Task memory usage and correct some accumulation (#38638)
Browse files Browse the repository at this point in the history
ref #37632
  • Loading branch information
fzzf678 committed Nov 2, 2022
1 parent 60b2aa6 commit acf1dcf
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
3 changes: 3 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ func (c *ColumnInfo) GetTypeDesc() string {
return desc
}

// EmptyColumnInfoSize is the memory usage of ColumnInfoSize
const EmptyColumnInfoSize = int64(unsafe.Sizeof(ColumnInfo{}))

// FindColumnInfo finds ColumnInfo in cols by name.
func FindColumnInfo(cols []*ColumnInfo, name string) *ColumnInfo {
name = strings.ToLower(name)
Expand Down
25 changes: 13 additions & 12 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,7 @@ func (p *PhysicalTableReader) MemoryUsage() (sum int64) {
if p.tablePlan != nil {
sum += p.tablePlan.MemoryUsage()
}

for _, plan := range p.TablePlans {
sum += plan.MemoryUsage()
}
// since TablePlans is the flats of tablePlan, so we don't count it
for _, pInfo := range p.PartitionInfos {
sum += pInfo.MemoryUsage()
}
Expand Down Expand Up @@ -540,12 +537,7 @@ func (p *PhysicalIndexLookUpReader) MemoryUsage() (sum int64) {
sum += p.PushedLimit.MemoryUsage()
}

for _, plan := range p.IndexPlans {
sum += plan.MemoryUsage()
}
for _, plan := range p.TablePlans {
sum += plan.MemoryUsage()
}
// since IndexPlans and TablePlans are the flats of indexPlan and tablePlan, so we don't count it
for _, col := range p.CommonHandleCols {
sum += col.MemoryUsage()
}
Expand Down Expand Up @@ -732,7 +724,7 @@ func (p *PhysicalIndexScan) MemoryUsage() (sum int64) {
}

sum = emptyPhysicalIndexScanSize + p.physicalSchemaProducer.MemoryUsage() + int64(cap(p.IdxColLens))*size.SizeOfInt +
p.DBName.MemoryUsage() + int64(len(p.rangeInfo))
p.DBName.MemoryUsage() + int64(len(p.rangeInfo)) + int64(len(p.Columns))*model.EmptyColumnInfoSize
if p.TableAsName != nil {
sum += p.TableAsName.MemoryUsage()
}
Expand All @@ -742,6 +734,9 @@ func (p *PhysicalIndexScan) MemoryUsage() (sum int64) {
if p.prop != nil {
sum += p.prop.MemoryUsage()
}
if p.dataSourceSchema != nil {
sum += p.dataSourceSchema.MemoryUsage()
}
// slice memory usage
for _, cond := range p.AccessCondition {
sum += cond.MemoryUsage()
Expand Down Expand Up @@ -1198,7 +1193,10 @@ func (p *basePhysicalJoin) MemoryUsage() (sum int64) {
return
}

sum = emptyBasePhysicalJoinSize + p.physicalSchemaProducer.MemoryUsage() + int64(cap(p.IsNullEQ))*size.SizeOfBool
sum = emptyBasePhysicalJoinSize + p.physicalSchemaProducer.MemoryUsage() + int64(cap(p.IsNullEQ))*size.SizeOfBool +
int64(cap(p.LeftConditions)+cap(p.RightConditions)+cap(p.OtherConditions))*size.SizeOfInterface +
int64(cap(p.OuterJoinKeys)+cap(p.InnerJoinKeys)+cap(p.LeftJoinKeys)+cap(p.RightNAJoinKeys)+cap(p.LeftNAJoinKeys)+
cap(p.RightNAJoinKeys))*size.SizeOfPointer + int64(cap(p.DefaultValues))*types.EmptyDatumSize

for _, cond := range p.LeftConditions {
sum += cond.MemoryUsage()
Expand Down Expand Up @@ -1369,6 +1367,9 @@ func (p *PhysicalIndexJoin) MemoryUsage() (sum int64) {

sum = p.basePhysicalJoin.MemoryUsage() + size.SizeOfInterface*2 + size.SizeOfSlice*4 +
int64(cap(p.KeyOff2IdxOff)+cap(p.IdxColLens))*size.SizeOfInt + size.SizeOfPointer
if p.innerTask != nil {
sum += p.innerTask.MemoryUsage()
}
if p.CompareFilters != nil {
sum += p.CompareFilters.MemoryUsage()
}
Expand Down
64 changes: 64 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/paging"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/size"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand All @@ -54,6 +55,7 @@ type task interface {
plan() PhysicalPlan
invalid() bool
convertToRootTask(ctx sessionctx.Context) *rootTask
MemoryUsage() int64
}

// copTask is a task that runs in a distributed kv store.
Expand Down Expand Up @@ -173,6 +175,42 @@ func (t *copTask) getStoreType() kv.StoreType {
return kv.TiKV
}

// MemoryUsage return the memory usage of copTask
func (t *copTask) MemoryUsage() (sum int64) {
if t == nil {
return
}

sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 +
size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.partitionInfo.MemoryUsage()
if t.indexPlan != nil {
sum += t.indexPlan.MemoryUsage()
}
if t.tablePlan != nil {
sum += t.tablePlan.MemoryUsage()
}
if t.originSchema != nil {
sum += t.originSchema.MemoryUsage()
}
if t.extraHandleCol != nil {
sum += t.extraHandleCol.MemoryUsage()
}

for _, col := range t.commonHandleCols {
sum += col.MemoryUsage()
}
for _, col := range t.tblCols {
sum += col.MemoryUsage()
}
for _, p := range t.idxMergePartPlans {
sum += p.MemoryUsage()
}
for _, expr := range t.rootTaskConds {
sum += expr.MemoryUsage()
}
return
}

func (p *basePhysicalPlan) attach2Task(tasks ...task) task {
t := tasks[0].convertToRootTask(p.ctx)
return attachPlan2Task(p.self, t)
Expand Down Expand Up @@ -757,6 +795,19 @@ func (t *rootTask) plan() PhysicalPlan {
return t.p
}

// MemoryUsage return the memory usage of rootTask
func (t *rootTask) MemoryUsage() (sum int64) {
if t == nil {
return
}

sum = size.SizeOfInterface + size.SizeOfBool
if t.p != nil {
sum += t.p.MemoryUsage()
}
return sum
}

func (p *PhysicalLimit) attach2Task(tasks ...task) task {
t := tasks[0].copy()
sunk := false
Expand Down Expand Up @@ -1887,6 +1938,19 @@ func (t *mppTask) convertToRootTask(ctx sessionctx.Context) *rootTask {
return t.copy().(*mppTask).convertToRootTaskImpl(ctx)
}

// MemoryUsage return the memory usage of mppTask
func (t *mppTask) MemoryUsage() (sum int64) {
if t == nil {
return
}

sum = size.SizeOfInterface + size.SizeOfInt + size.SizeOfSlice + int64(cap(t.hashCols))*size.SizeOfPointer
if t.p != nil {
sum += t.p.MemoryUsage()
}
return
}

func collectPartitionInfosFromMPPPlan(p *PhysicalTableReader, mppPlan PhysicalPlan) {
switch x := mppPlan.(type) {
case *PhysicalTableScan:
Expand Down

0 comments on commit acf1dcf

Please sign in to comment.