Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: fix panic cause by concurrent map iteration and write in memory usage alarm #38729

4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail
if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) {
copStats = runtimeStatsColl.GetCopStats(explainID)
}
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID())
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID())
return
}

Expand Down
44 changes: 21 additions & 23 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (

// ExecDetails contains execution detail information.
type ExecDetails struct {
CalleeAddress string
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
RequestCount int
CommitDetail *util.CommitDetails
LockKeysDetail *util.LockKeysDetails
ScanDetail *util.ScanDetail
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CalleeAddress string
TimeDetail util.TimeDetail
RequestCount int
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
}

type stmtExecDetailKeyType struct{}
Expand Down Expand Up @@ -318,9 +318,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) {
}

type basicCopRuntimeStats struct {
BasicRuntimeStats
threads int32
storeType string
BasicRuntimeStats
threads int32
}

// String implements the RuntimeStats interface.
Expand Down Expand Up @@ -359,8 +359,6 @@ func (*basicCopRuntimeStats) Tp() int {

// CopRuntimeStats collects cop tasks' execution info.
type CopRuntimeStats struct {
sync.Mutex

// stats stores the runtime statistics of coprocessor tasks.
// The key of the map is the tikv-server address. Because a tikv-server can
// have many region leaders, several coprocessor tasks can be sent to the
Expand All @@ -370,28 +368,30 @@ type CopRuntimeStats struct {
scanDetail *util.ScanDetail
// do not use kv.StoreType because it will meet cycle import error
storeType string
// count CopRuntimeStats total rows
totalRows int64
sync.Mutex
}

// RecordOneCopTask records a specific cop tasks's execution detail.
func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) {
crs.Lock()
defer crs.Unlock()
currentRows := int64(*summary.NumProducedRows)
crs.totalRows += currentRows
crs.stats[address] = append(crs.stats[address],
&basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: int64(*summary.NumProducedRows)},
rows: currentRows},
threads: int32(summary.GetConcurrency()),
storeType: crs.storeType})
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
totalRows += stat.rows
}
}
return totalRows
crs.Lock()
defer crs.Unlock()
return crs.totalRows
}

// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
Expand Down Expand Up @@ -635,9 +635,9 @@ func (e *BasicRuntimeStats) GetTime() int64 {

// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
mu sync.Mutex
rootStats map[int]*RootRuntimeStats
copStats map[int]*CopRuntimeStats
mu sync.Mutex
}

// NewRuntimeStatsColl creates new executor collector.
Expand Down Expand Up @@ -786,10 +786,8 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {

// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
type RuntimeStatsWithConcurrencyInfo struct {
// protect concurrency
sync.Mutex
// executor concurrency information
concurrency []*ConcurrencyInfo
sync.Mutex
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -840,8 +838,8 @@ func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {}
// RuntimeStatsWithCommit is the RuntimeStats with commit detail.
type RuntimeStatsWithCommit struct {
Commit *util.CommitDetails
TxnCnt int
LockKeys *util.LockKeysDetails
TxnCnt int
}

// Tp implements the RuntimeStats interface.
Expand Down
14 changes: 14 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerWithLock searches the specific tracker under this tracker with lock.
func (t *Tracker) SearchTrackerWithLock(label int) *Tracker {
t.mu.Lock()
defer t.mu.Unlock()
if t.label == label {
return t
}
children := t.mu.children[label]
if len(children) > 0 {
return children[0]
}
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
Expand Down