Skip to content

Commit

Permalink
*: use approximately algorithm to calculate p90 in slowlog. (#44269) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Aug 4, 2023
1 parent 48909a1 commit 385d1f1
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 109 deletions.
7 changes: 7 additions & 0 deletions DEPS.bzl
Expand Up @@ -1959,6 +1959,13 @@ def go_deps():
sum = "h1:O08dwjOwv9CYlJJEUZKAazSoQDKlsN34Bq3dnhqhyVI=",
version = "v0.0.0-20170331210902-15e594fc09f1",
)
go_repository(
name = "com_github_influxdata_tdigest",
build_file_proto_mode = "disable",
importpath = "github.com/influxdata/tdigest",
sum = "h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=",
version = "v0.0.1",
)

go_repository(
name = "com_github_iris_contrib_blackfriday",
Expand Down
1 change: 0 additions & 1 deletion distsql/BUILD.bazel
Expand Up @@ -46,7 +46,6 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@org_golang_google_grpc//metadata",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand Down
10 changes: 6 additions & 4 deletions distsql/distsql_test.go
Expand Up @@ -111,14 +111,16 @@ func TestSelectResultRuntimeStats(t *testing.T) {
basic := stmtStats.GetBasicRuntimeStats(1)
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: 15,
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.copRespTime.Add(execdetails.Duration(time.Millisecond))
s1.procKeys.Add(100)
s1.procKeys.Add(200)

s2 := *s1
stmtStats.RegisterStats(1, s1)
Expand All @@ -141,13 +143,13 @@ func TestSelectResultRuntimeStats(t *testing.T) {
require.Equal(t, expect, stats.String())

s1 = &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second},
procKeys: []int64{100},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
s1.copRespTime.Add(execdetails.Duration(time.Second))
s1.procKeys.Add(100)
expect = "cop_task: {num: 1, max: 1s, proc_keys: 100, tot_proc: 1s, tot_wait: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, s1.String())
}
Expand Down
58 changes: 26 additions & 32 deletions distsql/select_result.go
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -156,8 +155,8 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
defer func() {
if r.stats != nil {
// Ignore internal sql.
if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 {
ratio := float64(r.stats.CoprCacheHitNum) / float64(len(r.stats.copRespTime))
if !r.ctx.GetSessionVars().InRestrictedSQL && r.stats.copRespTime.Size() > 0 {
ratio := float64(r.stats.CoprCacheHitNum) / float64(r.stats.copRespTime.Size())
if ratio >= 1 {
telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc()
}
Expand Down Expand Up @@ -467,8 +466,8 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
copRespTime execdetails.Percentile[execdetails.Duration]
procKeys execdetails.Percentile[execdetails.Int64]
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
Expand All @@ -478,11 +477,11 @@ type selectResultRuntimeStats struct {
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
s.copRespTime.Add(execdetails.Duration(respTime))
if copStats.ScanDetail != nil {
s.procKeys = append(s.procKeys, copStats.ScanDetail.ProcessedKeys)
s.procKeys.Add(execdetails.Int64(copStats.ScanDetail.ProcessedKeys))
} else {
s.procKeys = append(s.procKeys, 0)
s.procKeys.Add(0)
}
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
s.totalProcessTime += copStats.TimeDetail.ProcessTime
Expand All @@ -495,15 +494,15 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
copRespTime: execdetails.Percentile[execdetails.Duration]{},
procKeys: execdetails.Percentile[execdetails.Int64]{},
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
newRs.copRespTime.MergePercentile(&s.copRespTime)
newRs.procKeys.MergePercentile(&s.procKeys)
for k, v := range s.backoffSleep {
newRs.backoffSleep[k] += v
}
Expand All @@ -518,8 +517,8 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
if !ok {
return
}
s.copRespTime = append(s.copRespTime, other.copRespTime...)
s.procKeys = append(s.procKeys, other.procKeys...)
s.copRespTime.MergePercentile(&other.copRespTime)
s.procKeys.MergePercentile(&other.procKeys)

for k, v := range other.backoffSleep {
s.backoffSleep[k] += v
Expand All @@ -533,31 +532,26 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
rpcStat := s.rpcStat
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
if s.copRespTime.Size() > 0 {
size := s.copRespTime.Size()
if size == 1 {
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max: %v, proc_keys: %v", execdetails.FormatDuration(s.copRespTime[0]), s.procKeys[0]))
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max: %v, proc_keys: %v", execdetails.FormatDuration(time.Duration(s.copRespTime.GetPercentile(0))), s.procKeys.GetPercentile(0)))
} else {
slices.Sort(s.copRespTime)
vMax, vMin := s.copRespTime[size-1], s.copRespTime[0]
vP95 := s.copRespTime[size*19/20]
sum := 0.0
for _, t := range s.copRespTime {
sum += float64(t)
}
vMax, vMin := s.copRespTime.GetMax(), s.copRespTime.GetMin()
vP95 := s.copRespTime.GetPercentile(0.95)
sum := s.copRespTime.Sum()
vAvg := time.Duration(sum / float64(size))

slices.Sort(s.procKeys)
keyMax := s.procKeys[size-1]
keyP95 := s.procKeys[size*19/20]
keyMax := s.procKeys.GetMax()
keyP95 := s.procKeys.GetPercentile(0.95)
buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size,
execdetails.FormatDuration(vMax), execdetails.FormatDuration(vMin),
execdetails.FormatDuration(vAvg), execdetails.FormatDuration(vP95)))
execdetails.FormatDuration(time.Duration(vMax.GetFloat64())), execdetails.FormatDuration(time.Duration(vMin.GetFloat64())),
execdetails.FormatDuration(vAvg), execdetails.FormatDuration(time.Duration(vP95))))
if keyMax > 0 {
buf.WriteString(", max_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyMax, 10))
buf.WriteString(strconv.FormatInt(int64(keyMax), 10))
buf.WriteString(", p95_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyP95, 10))
buf.WriteString(strconv.FormatInt(int64(keyP95), 10))
}
}
if s.totalProcessTime > 0 {
Expand All @@ -579,7 +573,7 @@ func (s *selectResultRuntimeStats) String() string {
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(s.copRespTime.Size()), 'f', 2, 64)))
} else {
buf.WriteString(", copr_cache: disabled")
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -52,6 +52,7 @@ require (
github.com/gostaticanalysis/forcetypeassert v0.1.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/tdigest v0.0.1
github.com/jarcoal/httpmock v1.2.0
github.com/jedib0t/go-pretty/v6 v6.2.2
github.com/jingyugao/rowserrcheck v1.1.1
Expand Down Expand Up @@ -248,6 +249,7 @@ require (
golang.org/x/exp/typeparams v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.8.2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -529,6 +529,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb v0.0.0-20170331210902-15e594fc09f1/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI=
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI=
Expand Down Expand Up @@ -1392,7 +1394,10 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/stmtctx/BUILD.bazel
Expand Up @@ -20,7 +20,6 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand All @@ -35,6 +34,7 @@ go_test(
],
embed = [":stmtctx"],
flaky = True,
shard_count = 5,
deps = [
"//kv",
"//sessionctx/variable",
Expand Down
86 changes: 27 additions & 59 deletions sessionctx/stmtctx/stmtctx.go
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -212,7 +211,7 @@ type StatementContext struct {
warnings []SQLWarn
errorCount uint16
execDetails execdetails.ExecDetails
allExecDetails []*execdetails.DetailsNeedP90
detailsSummary execdetails.P90Summary
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Expand Down Expand Up @@ -877,7 +876,7 @@ func (sc *StatementContext) resetMuForRetry() {
sc.mu.errorCount = 0
sc.mu.warnings = nil
sc.mu.execDetails = execdetails.ExecDetails{}
sc.mu.allExecDetails = make([]*execdetails.DetailsNeedP90, 0, 4)
sc.mu.detailsSummary.Reset()
}

// ResetForRetry resets the changed states during execution.
Expand All @@ -901,13 +900,13 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.execDetails.RequestCount++
sc.MergeScanDetail(details.ScanDetail)
sc.MergeTimeDetail(details.TimeDetail)
sc.mu.allExecDetails = append(sc.mu.allExecDetails,
&execdetails.DetailsNeedP90{
BackoffSleep: details.BackoffSleep,
BackoffTimes: details.BackoffTimes,
CalleeAddress: details.CalleeAddress,
TimeDetail: details.TimeDetail,
})
detail := &execdetails.DetailsNeedP90{
BackoffSleep: details.BackoffSleep,
BackoffTimes: details.BackoffTimes,
CalleeAddress: details.CalleeAddress,
TimeDetail: details.TimeDetail,
}
sc.mu.detailsSummary.Merge(detail)
}
if commitDetails != nil {
if sc.mu.execDetails.CommitDetail == nil {
Expand Down Expand Up @@ -1010,7 +1009,7 @@ func (sc *StatementContext) PushDownFlags() uint64 {
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
n := sc.mu.detailsSummary.NumCopTasks
d := &CopTasksDetails{
NumCopTasks: n,
MaxBackoffTime: make(map[string]time.Duration),
Expand All @@ -1026,57 +1025,26 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
d.AvgProcessTime = sc.mu.execDetails.TimeDetail.ProcessTime / time.Duration(n)
d.AvgWaitTime = sc.mu.execDetails.TimeDetail.WaitTime / time.Duration(n)

slices.SortFunc(sc.mu.allExecDetails, func(i, j *execdetails.DetailsNeedP90) bool {
return i.TimeDetail.ProcessTime < j.TimeDetail.ProcessTime
})
d.P90ProcessTime = sc.mu.allExecDetails[n*9/10].TimeDetail.ProcessTime
d.MaxProcessTime = sc.mu.allExecDetails[n-1].TimeDetail.ProcessTime
d.MaxProcessAddress = sc.mu.allExecDetails[n-1].CalleeAddress
d.P90ProcessTime = time.Duration((sc.mu.detailsSummary.ProcessTimePercentile.GetPercentile(0.9)))
d.MaxProcessTime = sc.mu.detailsSummary.ProcessTimePercentile.GetMax().D
d.MaxProcessAddress = sc.mu.detailsSummary.ProcessTimePercentile.GetMax().Addr

slices.SortFunc(sc.mu.allExecDetails, func(i, j *execdetails.DetailsNeedP90) bool {
return i.TimeDetail.WaitTime < j.TimeDetail.WaitTime
})
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].TimeDetail.WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].TimeDetail.WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress

// calculate backoff details
type backoffItem struct {
callee string
sleepTime time.Duration
times int
}
backoffInfo := make(map[string][]backoffItem)
for _, ed := range sc.mu.allExecDetails {
for backoff := range ed.BackoffTimes {
backoffInfo[backoff] = append(backoffInfo[backoff], backoffItem{
callee: ed.CalleeAddress,
sleepTime: ed.BackoffSleep[backoff],
times: ed.BackoffTimes[backoff],
})
}
}
for backoff, items := range backoffInfo {
if len(items) == 0 {
d.P90WaitTime = time.Duration((sc.mu.detailsSummary.WaitTimePercentile.GetPercentile(0.9)))
d.MaxWaitTime = sc.mu.detailsSummary.WaitTimePercentile.GetMax().D
d.MaxWaitAddress = sc.mu.detailsSummary.WaitTimePercentile.GetMax().Addr

for backoff, items := range sc.mu.detailsSummary.BackoffInfo {
if items == nil {
continue
}
slices.SortFunc(items, func(i, j backoffItem) bool {
return i.sleepTime < j.sleepTime
})
n := len(items)
d.MaxBackoffAddress[backoff] = items[n-1].callee
d.MaxBackoffTime[backoff] = items[n-1].sleepTime
d.P90BackoffTime[backoff] = items[n*9/10].sleepTime

var totalTime time.Duration
totalTimes := 0
for _, it := range items {
totalTime += it.sleepTime
totalTimes += it.times
}
d.AvgBackoffTime[backoff] = totalTime / time.Duration(n)
d.TotBackoffTime[backoff] = totalTime
d.TotBackoffTimes[backoff] = totalTimes
n := items.ReqTimes
d.MaxBackoffAddress[backoff] = items.BackoffPercentile.GetMax().Addr
d.MaxBackoffTime[backoff] = items.BackoffPercentile.GetMax().D
d.P90BackoffTime[backoff] = time.Duration(items.BackoffPercentile.GetPercentile(0.9))

d.AvgBackoffTime[backoff] = items.TotBackoffTime / time.Duration(n)
d.TotBackoffTime[backoff] = items.TotBackoffTime
d.TotBackoffTimes[backoff] = items.TotBackoffTimes
}
return d
}
Expand Down

0 comments on commit 385d1f1

Please sign in to comment.