Skip to content

Commit

Permalink
*: use approximately algorithm to calculate p90 in slowlog. (#44269)
Browse files Browse the repository at this point in the history
close #44047
  • Loading branch information
wshwsh12 committed Jun 13, 2023
1 parent c95682c commit 6f54a29
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 108 deletions.
7 changes: 7 additions & 0 deletions DEPS.bzl
Expand Up @@ -2240,6 +2240,13 @@ def go_deps():
sum = "h1:O08dwjOwv9CYlJJEUZKAazSoQDKlsN34Bq3dnhqhyVI=",
version = "v0.0.0-20170331210902-15e594fc09f1",
)
go_repository(
name = "com_github_influxdata_tdigest",
build_file_proto_mode = "disable",
importpath = "github.com/influxdata/tdigest",
sum = "h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=",
version = "v0.0.1",
)

go_repository(
name = "com_github_iris_contrib_blackfriday",
Expand Down
1 change: 0 additions & 1 deletion distsql/BUILD.bazel
Expand Up @@ -48,7 +48,6 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@org_golang_google_grpc//metadata",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand Down
10 changes: 6 additions & 4 deletions distsql/distsql_test.go
Expand Up @@ -111,14 +111,16 @@ func TestSelectResultRuntimeStats(t *testing.T) {
basic := stmtStats.GetBasicRuntimeStats(1)
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.copRespTime.Add(execdetails.Duration(time.Millisecond))
s1.procKeys.Add(100)
s1.procKeys.Add(200)

s2 := *s1
stmtStats.RegisterStats(1, s1)
Expand All @@ -141,13 +143,13 @@ func TestSelectResultRuntimeStats(t *testing.T) {
require.Equal(t, expect, stats.String())

s1 = &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second},
procKeys: []int64{100},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.procKeys.Add(100)
expect = "cop_task: {num: 1, max: 1s, proc_keys: 100, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, s1.String())
}
Expand Down
58 changes: 26 additions & 32 deletions distsql/select_result.go
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -325,7 +324,7 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
defer func() {
if r.stats != nil {
// Ignore internal sql.
if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 {
if !r.ctx.GetSessionVars().InRestrictedSQL && r.stats.copRespTime.Size() > 0 {
ratio := r.stats.calcCacheHit()
if ratio >= 1 {
telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc()
Expand Down Expand Up @@ -636,7 +635,7 @@ func (r *selectResult) Close() error {
r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback
telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum))
telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum))
telemetryBatchedQueryTaskCnt.Add(float64(len(r.stats.copRespTime)))
telemetryBatchedQueryTaskCnt.Add(float64(r.stats.copRespTime.Size()))
}
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
Expand All @@ -652,8 +651,8 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
copRespTime execdetails.Percentile[execdetails.Duration]
procKeys execdetails.Percentile[execdetails.Int64]
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
Expand All @@ -667,11 +666,11 @@ type selectResultRuntimeStats struct {
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
s.copRespTime.Add(execdetails.Duration(respTime))
if copStats.ScanDetail != nil {
s.procKeys = append(s.procKeys, copStats.ScanDetail.ProcessedKeys)
s.procKeys.Add(execdetails.Int64(copStats.ScanDetail.ProcessedKeys))
} else {
s.procKeys = append(s.procKeys, 0)
s.procKeys.Add(0)
}
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
s.totalProcessTime += copStats.TimeDetail.ProcessTime
Expand All @@ -684,8 +683,8 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
copRespTime: execdetails.Percentile[execdetails.Duration]{},
procKeys: execdetails.Percentile[execdetails.Int64]{},
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
Expand All @@ -695,8 +694,8 @@ func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
storeBatchedFallbackNum: s.storeBatchedFallbackNum,
buildTaskDuration: s.buildTaskDuration,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
newRs.copRespTime.MergePercentile(&s.copRespTime)
newRs.procKeys.MergePercentile(&s.procKeys)
for k, v := range s.backoffSleep {
newRs.backoffSleep[k] += v
}
Expand All @@ -711,8 +710,8 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
if !ok {
return
}
s.copRespTime = append(s.copRespTime, other.copRespTime...)
s.procKeys = append(s.procKeys, other.procKeys...)
s.copRespTime.MergePercentile(&other.copRespTime)
s.procKeys.MergePercentile(&other.procKeys)

for k, v := range other.backoffSleep {
s.backoffSleep[k] += v
Expand All @@ -735,31 +734,26 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
rpcStat := s.rpcStat
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
if s.copRespTime.Size() > 0 {
size := s.copRespTime.Size()
if size == 1 {
fmt.Fprintf(buf, "cop_task: {num: 1, max: %v, proc_keys: %v", execdetails.FormatDuration(s.copRespTime[0]), s.procKeys[0])
fmt.Fprintf(buf, "cop_task: {num: 1, max: %v, proc_keys: %v", execdetails.FormatDuration(time.Duration(s.copRespTime.GetPercentile(0))), s.procKeys.GetPercentile(0))
} else {
slices.Sort(s.copRespTime)
vMax, vMin := s.copRespTime[size-1], s.copRespTime[0]
vP95 := s.copRespTime[size*19/20]
sum := 0.0
for _, t := range s.copRespTime {
sum += float64(t)
}
vMax, vMin := s.copRespTime.GetMax(), s.copRespTime.GetMin()
vP95 := s.copRespTime.GetPercentile(0.95)
sum := s.copRespTime.Sum()
vAvg := time.Duration(sum / float64(size))

slices.Sort(s.procKeys)
keyMax := s.procKeys[size-1]
keyP95 := s.procKeys[size*19/20]
keyMax := s.procKeys.GetMax()
keyP95 := s.procKeys.GetPercentile(0.95)
fmt.Fprintf(buf, "cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size,
execdetails.FormatDuration(vMax), execdetails.FormatDuration(vMin),
execdetails.FormatDuration(vAvg), execdetails.FormatDuration(vP95))
execdetails.FormatDuration(time.Duration(vMax.GetFloat64())), execdetails.FormatDuration(time.Duration(vMin.GetFloat64())),
execdetails.FormatDuration(vAvg), execdetails.FormatDuration(time.Duration(vP95)))
if keyMax > 0 {
buf.WriteString(", max_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyMax, 10))
buf.WriteString(strconv.FormatInt(int64(keyMax), 10))
buf.WriteString(", p95_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyP95, 10))
buf.WriteString(strconv.FormatInt(int64(keyP95), 10))
}
}
if s.totalProcessTime > 0 {
Expand Down Expand Up @@ -836,7 +830,7 @@ func (*selectResultRuntimeStats) Tp() int {

func (s *selectResultRuntimeStats) calcCacheHit() float64 {
hit := s.CoprCacheHitNum
tot := len(s.copRespTime)
tot := s.copRespTime.Size()
if s.storeBatchedNum > 0 {
tot += int(s.storeBatchedNum)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -201,6 +201,7 @@ require (
github.com/huandu/xstrings v1.3.1 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/tdigest v0.0.1
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
Expand Down Expand Up @@ -278,6 +279,7 @@ require (
golang.org/x/exp/typeparams v0.0.0-20230224173230-c95f2b4c22f2 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -551,6 +551,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb v0.0.0-20170331210902-15e594fc09f1/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI=
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI=
Expand Down Expand Up @@ -1435,7 +1437,10 @@ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNq
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/stmtctx/BUILD.bazel
Expand Up @@ -37,7 +37,7 @@ go_test(
],
embed = [":stmtctx"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//kv",
"//sessionctx/variable",
Expand Down
85 changes: 27 additions & 58 deletions sessionctx/stmtctx/stmtctx.go
Expand Up @@ -225,7 +225,7 @@ type StatementContext struct {
extraWarnings []SQLWarn

execDetails execdetails.ExecDetails
allExecDetails []*execdetails.DetailsNeedP90
detailsSummary execdetails.P90Summary
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Expand Down Expand Up @@ -979,7 +979,7 @@ func (sc *StatementContext) resetMuForRetry() {
sc.mu.message = ""
sc.mu.warnings = nil
sc.mu.execDetails = execdetails.ExecDetails{}
sc.mu.allExecDetails = make([]*execdetails.DetailsNeedP90, 0, 4)
sc.mu.detailsSummary.Reset()
}

// ResetForRetry resets the changed states during execution.
Expand All @@ -1003,13 +1003,13 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.execDetails.RequestCount++
sc.MergeScanDetail(details.ScanDetail)
sc.MergeTimeDetail(details.TimeDetail)
sc.mu.allExecDetails = append(sc.mu.allExecDetails,
&execdetails.DetailsNeedP90{
BackoffSleep: details.BackoffSleep,
BackoffTimes: details.BackoffTimes,
CalleeAddress: details.CalleeAddress,
TimeDetail: details.TimeDetail,
})
detail := &execdetails.DetailsNeedP90{
BackoffSleep: details.BackoffSleep,
BackoffTimes: details.BackoffTimes,
CalleeAddress: details.CalleeAddress,
TimeDetail: details.TimeDetail,
}
sc.mu.detailsSummary.Merge(detail)
}
if commitDetails != nil {
if sc.mu.execDetails.CommitDetail == nil {
Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (sc *StatementContext) PushDownFlags() uint64 {
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
n := sc.mu.detailsSummary.NumCopTasks
d := &CopTasksDetails{
NumCopTasks: n,
MaxBackoffTime: make(map[string]time.Duration),
Expand All @@ -1128,57 +1128,26 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
d.AvgProcessTime = sc.mu.execDetails.TimeDetail.ProcessTime / time.Duration(n)
d.AvgWaitTime = sc.mu.execDetails.TimeDetail.WaitTime / time.Duration(n)

slices.SortFunc(sc.mu.allExecDetails, func(i, j *execdetails.DetailsNeedP90) bool {
return i.TimeDetail.ProcessTime < j.TimeDetail.ProcessTime
})
d.P90ProcessTime = sc.mu.allExecDetails[n*9/10].TimeDetail.ProcessTime
d.MaxProcessTime = sc.mu.allExecDetails[n-1].TimeDetail.ProcessTime
d.MaxProcessAddress = sc.mu.allExecDetails[n-1].CalleeAddress
d.P90ProcessTime = time.Duration((sc.mu.detailsSummary.ProcessTimePercentile.GetPercentile(0.9)))
d.MaxProcessTime = sc.mu.detailsSummary.ProcessTimePercentile.GetMax().D
d.MaxProcessAddress = sc.mu.detailsSummary.ProcessTimePercentile.GetMax().Addr

slices.SortFunc(sc.mu.allExecDetails, func(i, j *execdetails.DetailsNeedP90) bool {
return i.TimeDetail.WaitTime < j.TimeDetail.WaitTime
})
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].TimeDetail.WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].TimeDetail.WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress

// calculate backoff details
type backoffItem struct {
callee string
sleepTime time.Duration
times int
}
backoffInfo := make(map[string][]backoffItem)
for _, ed := range sc.mu.allExecDetails {
for backoff := range ed.BackoffTimes {
backoffInfo[backoff] = append(backoffInfo[backoff], backoffItem{
callee: ed.CalleeAddress,
sleepTime: ed.BackoffSleep[backoff],
times: ed.BackoffTimes[backoff],
})
}
}
for backoff, items := range backoffInfo {
if len(items) == 0 {
d.P90WaitTime = time.Duration((sc.mu.detailsSummary.WaitTimePercentile.GetPercentile(0.9)))
d.MaxWaitTime = sc.mu.detailsSummary.WaitTimePercentile.GetMax().D
d.MaxWaitAddress = sc.mu.detailsSummary.WaitTimePercentile.GetMax().Addr

for backoff, items := range sc.mu.detailsSummary.BackoffInfo {
if items == nil {
continue
}
slices.SortFunc(items, func(i, j backoffItem) bool {
return i.sleepTime < j.sleepTime
})
n := len(items)
d.MaxBackoffAddress[backoff] = items[n-1].callee
d.MaxBackoffTime[backoff] = items[n-1].sleepTime
d.P90BackoffTime[backoff] = items[n*9/10].sleepTime

var totalTime time.Duration
totalTimes := 0
for _, it := range items {
totalTime += it.sleepTime
totalTimes += it.times
}
d.AvgBackoffTime[backoff] = totalTime / time.Duration(n)
d.TotBackoffTime[backoff] = totalTime
d.TotBackoffTimes[backoff] = totalTimes
n := items.ReqTimes
d.MaxBackoffAddress[backoff] = items.BackoffPercentile.GetMax().Addr
d.MaxBackoffTime[backoff] = items.BackoffPercentile.GetMax().D
d.P90BackoffTime[backoff] = time.Duration(items.BackoffPercentile.GetPercentile(0.9))

d.AvgBackoffTime[backoff] = items.TotBackoffTime / time.Duration(n)
d.TotBackoffTime[backoff] = items.TotBackoffTime
d.TotBackoffTimes[backoff] = items.TotBackoffTimes
}
return d
}
Expand Down

0 comments on commit 6f54a29

Please sign in to comment.