Skip to content

Commit

Permalink
util: fix panic cause by concurrent map iteration and write in memory…
Browse files Browse the repository at this point in the history
… usage alarm (#38729)

close #38703
  • Loading branch information
mengxin9014 committed Nov 2, 2022
1 parent 1c5b837 commit ba175c4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail
if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) {
copStats = runtimeStatsColl.GetCopStats(explainID)
}
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID())
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID())
return
}

Expand Down
44 changes: 21 additions & 23 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (

// ExecDetails contains execution detail information.
type ExecDetails struct {
CalleeAddress string
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
RequestCount int
CommitDetail *util.CommitDetails
LockKeysDetail *util.LockKeysDetails
ScanDetail *util.ScanDetail
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CalleeAddress string
TimeDetail util.TimeDetail
RequestCount int
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
}

type stmtExecDetailKeyType struct{}
Expand Down Expand Up @@ -318,9 +318,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) {
}

type basicCopRuntimeStats struct {
BasicRuntimeStats
threads int32
storeType string
BasicRuntimeStats
threads int32
}

// String implements the RuntimeStats interface.
Expand Down Expand Up @@ -359,8 +359,6 @@ func (*basicCopRuntimeStats) Tp() int {

// CopRuntimeStats collects cop tasks' execution info.
type CopRuntimeStats struct {
sync.Mutex

// stats stores the runtime statistics of coprocessor tasks.
// The key of the map is the tikv-server address. Because a tikv-server can
// have many region leaders, several coprocessor tasks can be sent to the
Expand All @@ -370,28 +368,30 @@ type CopRuntimeStats struct {
scanDetail *util.ScanDetail
// do not use kv.StoreType because it will meet cycle import error
storeType string
// count CopRuntimeStats total rows
totalRows int64
sync.Mutex
}

// RecordOneCopTask records a specific cop tasks's execution detail.
func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) {
crs.Lock()
defer crs.Unlock()
currentRows := int64(*summary.NumProducedRows)
crs.totalRows += currentRows
crs.stats[address] = append(crs.stats[address],
&basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: int64(*summary.NumProducedRows)},
rows: currentRows},
threads: int32(summary.GetConcurrency()),
storeType: crs.storeType})
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
totalRows += stat.rows
}
}
return totalRows
crs.Lock()
defer crs.Unlock()
return crs.totalRows
}

// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
Expand Down Expand Up @@ -635,9 +635,9 @@ func (e *BasicRuntimeStats) GetTime() int64 {

// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
mu sync.Mutex
rootStats map[int]*RootRuntimeStats
copStats map[int]*CopRuntimeStats
mu sync.Mutex
}

// NewRuntimeStatsColl creates new executor collector.
Expand Down Expand Up @@ -786,10 +786,8 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {

// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
type RuntimeStatsWithConcurrencyInfo struct {
// protect concurrency
sync.Mutex
// executor concurrency information
concurrency []*ConcurrencyInfo
sync.Mutex
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -840,8 +838,8 @@ func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {}
// RuntimeStatsWithCommit is the RuntimeStats with commit detail.
type RuntimeStatsWithCommit struct {
Commit *util.CommitDetails
TxnCnt int
LockKeys *util.LockKeysDetails
TxnCnt int
}

// Tp implements the RuntimeStats interface.
Expand Down
14 changes: 14 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerWithLock searches the specific tracker under this tracker with lock.
func (t *Tracker) SearchTrackerWithLock(label int) *Tracker {
t.mu.Lock()
defer t.mu.Unlock()
if t.label == label {
return t
}
children := t.mu.children[label]
if len(children) > 0 {
return children[0]
}
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
Expand Down

0 comments on commit ba175c4

Please sign in to comment.