diff --git a/integration_tests/main_test.go b/integration_tests/main_test.go index fcd7a050a..5998804ba 100644 --- a/integration_tests/main_test.go +++ b/integration_tests/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), // TODO: fix ttlManager goroutine leak + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), } goleak.VerifyTestMain(m, opts...) diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index a9880227b..cc13b38c2 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -287,12 +287,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe() { func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { reqStats := tikv.NewRegionRequestRuntimeStats() - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) snapshot := s.store.GetSnapshot(0) - snapshot.SetRuntimeStats(&txnkv.SnapshotRuntimeStats{}) - snapshot.MergeRegionRequestStats(reqStats.Stats) - snapshot.MergeRegionRequestStats(reqStats.Stats) + runtimeStats := &txnkv.SnapshotRuntimeStats{} + snapshot.SetRuntimeStats(runtimeStats) + snapshot.MergeRegionRequestStats(reqStats) + snapshot.MergeRegionRequestStats(reqStats) bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleepTxnLockFast(5, errors.New("test")) s.Nil(err) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 12dcc2805..fc052dc46 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -791,7 +791,7 @@ func sendBatchRequest( select { case batchConn.batchCommandsCh <- entry: case <-ctx.Done(): - logutil.BgLogger().Debug("send request is cancelled", + logutil.Logger(ctx).Debug("send request is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) case <-timer.C: @@ -807,7 +807,7 @@ func sendBatchRequest( return tikvrpc.FromBatchCommandsResponse(res) case <-ctx.Done(): atomic.StoreInt32(&entry.canceled, 1) - logutil.BgLogger().Debug("wait response is cancelled", + logutil.Logger(ctx).Debug("wait response is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) case <-timer.C: diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index b10958cd3..cbd265e84 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -674,7 +674,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate(Other) - logutil.BgLogger().Info("invalidate current region, because others failed on same store", + logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) return nil, nil @@ -789,7 +789,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate(Other) - logutil.BgLogger().Info("invalidate current region, because others failed on same store", + logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) // TiFlash will always try to find out a valid peer, avoiding to retry too many times. @@ -1826,7 +1826,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext (meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer || meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) { err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions) - logutil.BgLogger().Info("region epoch is ahead of tikv", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Info("region epoch is ahead of tikv", zap.Error(err)) return true, bo.Backoff(retry.BoRegionMiss, err) } } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8bbce0eb5..1e02cf3a0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -35,7 +35,6 @@ package locate import ( - "bytes" "context" "fmt" "math/rand" @@ -134,18 +133,35 @@ type RegionRequestSender struct { replicaSelector *replicaSelector failStoreIDs map[uint64]struct{} failProxyStoreIDs map[uint64]struct{} - RegionRequestRuntimeStats + Stats *RegionRequestRuntimeStats + AccessStats *ReplicaAccessStats +} + +func (s *RegionRequestSender) String() string { + if s.replicaSelector == nil { + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector) + } + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector.String()) } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - Stats map[tikvrpc.CmdType]*RPCRuntimeStats + RPCStats map[tikvrpc.CmdType]*RPCRuntimeStats + RequestErrorStats +} + +// RequestErrorStats records the request error(region error and rpc error) count. +type RequestErrorStats struct { + // ErrStats record the region error and rpc error, and their count. + // Attention: avoid too many error types, ErrStats only record the first 16 different errors. + ErrStats map[string]int + OtherErrCnt int } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { - return RegionRequestRuntimeStats{ - Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { + return &RegionRequestRuntimeStats{ + RPCStats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), } } @@ -156,14 +172,55 @@ type RPCRuntimeStats struct { Consume int64 } +// RecordRPCRuntimeStats uses to record the rpc count and duration stats. +func (r *RegionRequestRuntimeStats) RecordRPCRuntimeStats(cmd tikvrpc.CmdType, d time.Duration) { + stat, ok := r.RPCStats[cmd] + if !ok { + r.RPCStats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), + } + return + } + stat.Count++ + stat.Consume += int64(d) +} + +// RecordRPCErrorStats uses to record the request error(region error label and rpc error) info and count. +func (r *RequestErrorStats) RecordRPCErrorStats(errLabel string) { + if r.ErrStats == nil { + // lazy init to avoid unnecessary allocation. + r.ErrStats = make(map[string]int) + } + if len(r.ErrStats) < 16 { + // Avoid too many error. + r.ErrStats[errLabel]++ + } else { + r.OtherErrCnt++ + } +} + +// getErrMsg returns error message. if the error has cause error, then return cause error message. +func getErrMsg(err error) string { + if err == nil { + return "" + } + if causeErr := errors.Cause(err); causeErr != nil { + return causeErr.Error() + } + return err.Error() +} + // String implements fmt.Stringer interface. func (r *RegionRequestRuntimeStats) String() string { + if r == nil { + return "" + } var builder strings.Builder - for k, v := range r.Stats { + for k, v := range r.RPCStats { if builder.Len() > 0 { builder.WriteByte(',') } - // append string: fmt.Sprintf("%s:{num_rpc:%v, total_time:%s}", k.String(), v.Count, util.FormatDuration(time.Duration(v.Consume))") builder.WriteString(k.String()) builder.WriteString(":{num_rpc:") builder.WriteString(strconv.FormatInt(v.Count, 10)) @@ -171,27 +228,61 @@ func (r *RegionRequestRuntimeStats) String() string { builder.WriteString(util.FormatDuration(time.Duration(v.Consume))) builder.WriteString("}") } + if errStatsStr := r.RequestErrorStats.String(); errStatsStr != "" { + builder.WriteString(", rpc_errors:") + builder.WriteString(errStatsStr) + } + return builder.String() +} + +// String implements fmt.Stringer interface. +func (r *RequestErrorStats) String() string { + if len(r.ErrStats) == 0 { + return "" + } + var builder strings.Builder + builder.WriteString("{") + for err, cnt := range r.ErrStats { + if builder.Len() > 2 { + builder.WriteString(", ") + } + builder.WriteString(err) + builder.WriteString(":") + builder.WriteString(strconv.Itoa(cnt)) + } + if r.OtherErrCnt > 0 { + builder.WriteString(", other_error:") + builder.WriteString(strconv.Itoa(r.OtherErrCnt)) + } + builder.WriteString("}") return builder.String() } // Clone returns a copy of itself. -func (r *RegionRequestRuntimeStats) Clone() RegionRequestRuntimeStats { +func (r *RegionRequestRuntimeStats) Clone() *RegionRequestRuntimeStats { newRs := NewRegionRequestRuntimeStats() - for cmd, v := range r.Stats { - newRs.Stats[cmd] = &RPCRuntimeStats{ - Count: v.Count, - Consume: v.Consume, + for k, v := range r.RPCStats { + newRs.RPCStats[k] = v + } + if len(r.ErrStats) > 0 { + newRs.ErrStats = make(map[string]int) + for k, v := range r.ErrStats { + newRs.ErrStats[k] = v } + newRs.OtherErrCnt = r.OtherErrCnt } return newRs } // Merge merges other RegionRequestRuntimeStats. -func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { - for cmd, v := range rs.Stats { - stat, ok := r.Stats[cmd] +func (r *RegionRequestRuntimeStats) Merge(rs *RegionRequestRuntimeStats) { + if rs == nil { + return + } + for cmd, v := range rs.RPCStats { + stat, ok := r.RPCStats[cmd] if !ok { - r.Stats[cmd] = &RPCRuntimeStats{ + r.RPCStats[cmd] = &RPCRuntimeStats{ Count: v.Count, Consume: v.Consume, } @@ -200,20 +291,114 @@ func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { stat.Count += v.Count stat.Consume += v.Consume } + if len(rs.ErrStats) > 0 { + if r.ErrStats == nil { + r.ErrStats = make(map[string]int) + } + for err, cnt := range rs.ErrStats { + r.ErrStats[err] += cnt + } + r.OtherErrCnt += rs.OtherErrCnt + } } -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - stat, ok := stats[cmd] +// ReplicaAccessStats records the replica access info. +type ReplicaAccessStats struct { + // AccessInfos records the access info + AccessInfos []ReplicaAccessInfo + // avoid to consume too much memory, after more than 5 records, count them by peerID in `OverflowAccessStat` map. + OverflowAccessStat map[uint64]*RequestErrorStats +} + +// ReplicaAccessInfo indicates the access path detail info of a request. +type ReplicaAccessInfo struct { + Peer uint64 + Store uint64 + ReqReadTp ReqReadType + Err string +} + +type ReqReadType byte + +const ( + ReqLeader ReqReadType = iota + ReqReplicaRead + ReqStaleRead +) + +func (s *ReplicaAccessStats) recordReplicaAccessInfo(staleRead, replicaRead bool, peerID, storeID uint64, err string) { + if len(s.AccessInfos) < 5 { + tp := ReqLeader + if replicaRead { + tp = ReqReplicaRead + } else if staleRead { + tp = ReqStaleRead + } + s.AccessInfos = append(s.AccessInfos, ReplicaAccessInfo{ + Peer: peerID, + Store: storeID, + ReqReadTp: tp, + Err: err, + }) + return + } + if s.OverflowAccessStat == nil { + s.OverflowAccessStat = make(map[uint64]*RequestErrorStats) + } + stat, ok := s.OverflowAccessStat[peerID] if !ok { - stats[cmd] = &RPCRuntimeStats{ - Count: 1, - Consume: int64(d), + stat = &RequestErrorStats{} + s.OverflowAccessStat[peerID] = stat + } + stat.RecordRPCErrorStats(err) +} + +// String implements fmt.Stringer interface. +func (s *ReplicaAccessStats) String() string { + if s == nil { + return "" + } + var builder strings.Builder + for i, info := range s.AccessInfos { + if i > 0 { + builder.WriteString(", ") + } + switch info.ReqReadTp { + case ReqLeader: + builder.WriteString("{") + case ReqReplicaRead: + builder.WriteString("{replica_read, ") + case ReqStaleRead: + builder.WriteString("{stale_read, ") + } + builder.WriteString("peer:") + builder.WriteString(strconv.FormatUint(info.Peer, 10)) + builder.WriteString(", store:") + builder.WriteString(strconv.FormatUint(info.Store, 10)) + builder.WriteString(", err:") + builder.WriteString(info.Err) + builder.WriteString("}") + } + if len(s.OverflowAccessStat) > 0 { + builder.WriteString(", overflow_count:{") + cnt := 0 + for peerID, stat := range s.OverflowAccessStat { + if stat == nil { + continue + } + if cnt > 0 { + builder.WriteString(", ") + } + builder.WriteString("{peer:") + builder.WriteString(strconv.FormatUint(peerID, 10)) + builder.WriteString(", error_stats:") + builder.WriteString(stat.String()) + builder.WriteString("}") + cnt++ } - return + builder.WriteString("}") } - stat.Count++ - stat.Consume += int64(d) + return builder.String() } // NewRegionRequestSender creates a new sender. @@ -262,6 +447,16 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *tikvrpc.Request, return resp, err } +func (s *RegionRequestSender) recordRPCAccessInfo(req *tikvrpc.Request, rpcCtx *RPCContext, err string) { + if req == nil || rpcCtx == nil || rpcCtx.Peer == nil || rpcCtx.Store == nil { + return + } + if s.AccessStats == nil { + s.AccessStats = &ReplicaAccessStats{} + } + s.AccessStats.recordReplicaAccessInfo(req.StaleRead, req.ReplicaRead, rpcCtx.Peer.GetId(), rpcCtx.Store.storeID, err) +} + type replica struct { store *Store peer *metapb.Peer @@ -294,6 +489,68 @@ type replicaSelector struct { ReplicaSelectorExperimentalOptions } +func selectorStateToString(state selectorState) string { + replicaSelectorState := "nil" + if state != nil { + switch state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + } + return replicaSelectorState +} + +func (s *replicaSelector) String() string { + selectorStateStr := "nil" + if s != nil { + selectorStateStr = selectorStateToString(s.state) + } + var replicaStatus []string + cacheRegionIsValid := "unknown" + leaderPeerID := uint64(0) + if s != nil { + if s.region != nil { + leaderPeerID = s.region.GetLeaderPeerID() + if s.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+ + "attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.epoch, + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + return fmt.Sprintf("{state: %v, cacheRegionIsValid: %v, leaderPeerID: %v, replicaStatus: %v}", + selectorStateStr, cacheRegionIsValid, leaderPeerID, replicaStatus) +} + // selectorState is the interface of states of the replicaSelector. // Here is the main state transition diagram: // @@ -1110,6 +1367,8 @@ func IsFakeRegionError(err *errorpb.Error) bool { return err != nil && err.GetEpochNotMatch() != nil && len(err.GetEpochNotMatch().CurrentRegions) == 0 } +const slowLogSendReqTime = 100 * time.Millisecond + // SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC. func (s *RegionRequestSender) SendReqCtx( bo *retry.Backoffer, @@ -1176,6 +1435,8 @@ func (s *RegionRequestSender) SendReqCtx( s.reset() tryTimes := 0 + startTime := time.Now() + startBackOff := bo.GetTotalSleep() defer func() { if tryTimes > 0 { metrics.TiKVRequestRetryTimesHistogram.Observe(float64(tryTimes)) @@ -1194,7 +1455,6 @@ func (s *RegionRequestSender) SendReqCtx( }() } - totalErrors := make(map[string]int) for { if tryTimes > 0 { if tryTimes%100 == 0 { @@ -1222,7 +1482,9 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } @@ -1257,11 +1519,20 @@ func (s *RegionRequestSender) SendReqCtx( resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) req.IsRetryRequest = true if err != nil { - msg := fmt.Sprintf("send request failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + msg := fmt.Sprintf("send request failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, err } + if _, err1 := util.EvalFailpoint("afterSendReqToRegion"); err1 == nil { + if hook := bo.GetCtx().Value("sendReqToRegionFinishHook"); hook != nil { + h := hook.(func(*tikvrpc.Request, *tikvrpc.Response, error)) + h(req, resp, err) + } + } + // recheck whether the session/query is killed during the Next() boVars := bo.GetVars() if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 { @@ -1283,19 +1554,21 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, err } if regionErr != nil { - regionErrLabel := regionErrorToLabel(regionErr) - totalErrors[regionErrLabel]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { - msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, err } if retry { tryTimes++ continue } - s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { + s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } } else { if s.replicaSelector != nil { s.replicaSelector.onSendSuccess() @@ -1308,74 +1581,49 @@ func (s *RegionRequestSender) SendReqCtx( } } -func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { - var replicaStatus []string - replicaSelectorState := "nil" - cacheRegionIsValid := "unknown" - if s.replicaSelector != nil { - switch s.replicaSelector.state.(type) { - case *accessKnownLeader: - replicaSelectorState = "accessKnownLeader" - case *accessFollower: - replicaSelectorState = "accessFollower" - case *accessByKnownProxy: - replicaSelectorState = "accessByKnownProxy" - case *tryFollower: - replicaSelectorState = "tryFollower" - case *tryNewProxy: - replicaSelectorState = "tryNewProxy" - case *invalidLeader: - replicaSelectorState = "invalidLeader" - case *invalidStore: - replicaSelectorState = "invalidStore" - case *stateBase: - replicaSelectorState = "stateBase" - case nil: - replicaSelectorState = "nil" - } - if s.replicaSelector.region != nil { - if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "true" - } else { - cacheRegionIsValid = "false" - } - } - for _, replica := range s.replicaSelector.replicas { - replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", - replica.peer.GetId(), - replica.store.storeID, - replica.isEpochStale(), - replica.attempts, - replica.epoch, - atomic.LoadUint32(&replica.store.epoch), - replica.store.getResolveState(), - replica.store.getLivenessState(), - )) - } +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, tryTimes int, req *tikvrpc.Request, cost time.Duration, currentBackoffMs int, timeout time.Duration) { + var builder strings.Builder + // build the total round stats string. + builder.WriteString("{total-backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(bo.GetTotalSleep() * int(time.Millisecond)))) + builder.WriteString(", total-backoff-times: ") + builder.WriteString(strconv.Itoa(bo.GetTotalBackoffTimes())) + if s.Stats != nil { + builder.WriteString(", total-rpc: {") + builder.WriteString(s.Stats.String()) + builder.WriteString("}") } - var totalErrorStr bytes.Buffer - for err, cnt := range totalErrors { - if totalErrorStr.Len() > 0 { - totalErrorStr.WriteString(", ") - } - totalErrorStr.WriteString(err) - totalErrorStr.WriteString(":") - totalErrorStr.WriteString(strconv.Itoa(cnt)) + builder.WriteString("}") + totalRoundStats := builder.String() + + // build the current round stats string. + builder.Reset() + builder.WriteString("{time: ") + builder.WriteString(util.FormatDuration(cost)) + builder.WriteString(", backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(currentBackoffMs * int(time.Millisecond)))) + builder.WriteString(", timeout: ") + builder.WriteString(util.FormatDuration(timeout)) + builder.WriteString(", req-max-exec-timeout: ") + builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond)))) + builder.WriteString(", try-times: ") + builder.WriteString(strconv.Itoa(tryTimes)) + if s.AccessStats != nil { + builder.WriteString(", replica-access: {") + builder.WriteString(s.AccessStats.String()) + builder.WriteString("}") } + builder.WriteString("}") + currentRoundStats := builder.String() logutil.Logger(bo.GetCtx()).Info(msg, zap.Uint64("req-ts", req.GetStartTS()), zap.String("req-type", req.Type.String()), zap.String("region", regionID.String()), - zap.String("region-is-valid", cacheRegionIsValid), - zap.Int("retry-times", retryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.String("replica-selector-state", replicaSelectorState), zap.Bool("stale-read", req.StaleRead), - zap.String("replica-status", strings.Join(replicaStatus, "; ")), - zap.Int("total-backoff-ms", bo.GetTotalSleep()), - zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), - zap.Uint64("max-exec-timeout-ms", req.Context.MaxExecutionDurationMs), - zap.String("total-region-errors", totalErrorStr.String())) + zap.Stringer("request-sender", s), + zap.String("total-round-stats", totalRoundStats), + zap.String("current-round-stats", currentRoundStats)) } // RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. @@ -1501,7 +1749,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo start := time.Now() resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) if s.Stats != nil { - RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + s.Stats.RecordRPCRuntimeStats(req.Type, time.Since(start)) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { @@ -1559,11 +1807,16 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo if err != nil { s.rpcError = err - + if s.Stats != nil { + errStr := getErrMsg(err) + s.Stats.RecordRPCErrorStats(errStr) + s.recordRPCAccessInfo(req, rpcCtx, errStr) + } // Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel, // we need to retry the request. But for context cancel active, for example, limitExec gets the required rows, // we shouldn't retry the request, it will go to backoff and hang in retry logic. if ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(rpcCtx)).Inc() return nil, false, errors.WithStack(ctx.Err()) } @@ -1580,6 +1833,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo return } +func storeIDLabel(rpcCtx *RPCContext) string { + if rpcCtx != nil && rpcCtx.Store != nil { + return strconv.FormatUint(rpcCtx.Store.storeID, 10) + } + return "nil" +} + func (s *RegionRequestSender) getStoreToken(st *Store, limit int64) error { // Checking limit is not thread safe, preferring this for avoiding load in loop. count := st.tokenCount.Load() @@ -1608,27 +1868,38 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } + storeLabel := storeIDLabel(ctx) // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeLabel).Inc() return errors.WithStack(err) } else if LoadShuttingDown() > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues("shutting-down", storeLabel).Inc() return errors.WithStack(tikverr.ErrTiDBShuttingDown) } else if isCauseByDeadlineExceeded(err) { if s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) { + errLabel := "read-timeout-" + strconv.FormatUint(req.MaxExecutionDurationMs, 10) + "ms" + metrics.TiKVRPCErrorCounter.WithLabelValues(errLabel, storeLabel).Inc() return nil } } if status.Code(errors.Cause(err)) == codes.Canceled { select { case <-bo.GetCtx().Done(): + metrics.TiKVRPCErrorCounter.WithLabelValues("grpc-canceled", storeLabel).Inc() return errors.WithStack(err) default: // If we don't cancel, but the error code is Canceled, it must be from grpc remote. // This may happen when tikv is killed and exiting. // Backoff and retry in this case. - logutil.BgLogger().Warn("receive a grpc cancel signal from remote", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal from remote", zap.Error(err)) } } + if errStr := getErrMsg(err); len(errStr) > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues(errStr, storeLabel).Inc() + } else { + metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc() + } if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) @@ -1687,6 +1958,20 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { return } +// regionErrorToLogging constructs the logging content with extra information like returned leader peer id. +func regionErrorToLogging(e *errorpb.Error, errLabel string) string { + str := errLabel + if e.GetNotLeader() != nil { + notLeader := e.GetNotLeader() + if notLeader.GetLeader() != nil { + str = fmt.Sprintf("%v_with_leader_%v", str, notLeader.GetLeader().GetId()) + } else { + str = fmt.Sprintf("%v_with_no_leader", str) + } + } + return str +} + func regionErrorToLabel(e *errorpb.Error) string { if e.GetNotLeader() != nil { return "not_leader" @@ -1747,13 +2032,17 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } - // NOTE: Please add the region error handler in the same order of errorpb.Error. - errLabel := regionErrorToLabel(regionErr) - metrics.TiKVRegionErrorCounter.WithLabelValues(errLabel).Inc() + regionErrLabel := regionErrorToLabel(regionErr) + metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrLabel, storeIDLabel(ctx)).Inc() + if s.Stats != nil { + s.Stats.RecordRPCErrorStats(regionErrLabel) + s.recordRPCAccessInfo(req, ctx, regionErrorToLogging(regionErr, regionErrLabel)) + } + // NOTE: Please add the region error handler in the same order of errorpb.Error. if notLeader := regionErr.GetNotLeader(); notLeader != nil { // Retry if error is `NotLeader`. - logutil.BgLogger().Debug("tikv reports `NotLeader` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `NotLeader` retry later", zap.String("notLeader", notLeader.String()), zap.String("ctx", ctx.String())) @@ -1786,7 +2075,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if regionErr.GetRecoveryInProgress() != nil { s.regionCache.InvalidateCachedRegion(ctx.Region) - logutil.BgLogger().Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx)) err = bo.Backoff(retry.BoRegionRecoveryInProgress, errors.Errorf("region recovery in progress, ctx: %v", ctx)) if err != nil { return false, err @@ -1798,14 +2087,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // if a request meets the FlashbackInProgress error, it should stop retrying immediately // to avoid unnecessary backoff and potential unexpected data status to the user. if regionErr.GetFlashbackInProgress() != nil { - logutil.BgLogger().Debug("tikv reports `FlashbackInProgress`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `FlashbackInProgress`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) return false, errors.Errorf("region %d is in flashback progress", regionErr.GetFlashbackInProgress().GetRegionId()) } // This error means a second-phase flashback request is sent to a region that is not // prepared for the flashback before, it should stop retrying immediately to avoid // unnecessary backoff. if regionErr.GetFlashbackNotPrepared() != nil { - logutil.BgLogger().Debug("tikv reports `FlashbackNotPrepared`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `FlashbackNotPrepared`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) return false, errors.Errorf("region %d is not prepared for the flashback", regionErr.GetFlashbackNotPrepared().GetRegionId()) } @@ -1817,13 +2106,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetKeyNotInRegion() != nil { - logutil.BgLogger().Debug("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) s.regionCache.InvalidateCachedRegion(ctx.Region) return false, nil } if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil { - logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `EpochNotMatch` retry later", zap.Stringer("EpochNotMatch", epochNotMatch), zap.Stringer("ctx", ctx)) retry, err := s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions) @@ -1862,7 +2151,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // We can't know whether the request is committed or not, so it's an undetermined error too, // but we don't handle it now. if regionErr.GetStaleCommand() != nil { - logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) if s.replicaSelector != nil { // Needn't backoff because the new leader should be elected soon // and the replicaSelector will try the next peer. @@ -1877,7 +2166,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil { // store not match - logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `StoreNotMatch` retry later", zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx)) ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh) @@ -1889,12 +2178,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetRaftEntryTooLarge() != nil { - logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx)) return false, errors.New(regionErr.String()) } if regionErr.GetMaxTimestampNotSynced() != nil { - logutil.BgLogger().Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx)) err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx)) if err != nil { return false, err @@ -1904,7 +2193,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // A read request may be sent to a peer which has not been initialized yet, we should retry in this case. if regionErr.GetRegionNotInitialized() != nil { - logutil.BgLogger().Debug("tikv reports `RegionNotInitialized` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `RegionNotInitialized` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), zap.Stringer("ctx", ctx)) @@ -1917,7 +2206,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // The read-index can't be handled timely because the region is splitting or merging. if regionErr.GetReadIndexNotReady() != nil { - logutil.BgLogger().Debug("tikv reports `ReadIndexNotReady` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `ReadIndexNotReady` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), zap.Stringer("ctx", ctx)) @@ -1930,7 +2219,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetProposalInMergingMode() != nil { - logutil.BgLogger().Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx)) // The region is merging and it can't provide service until merge finished, so backoff. err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("region is merging, ctx: %v", ctx)) if err != nil { @@ -1943,7 +2232,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // This error is specific to stale read and the target replica is randomly selected. If the request is sent // to the leader, the data must be ready, so we don't backoff here. if regionErr.GetDataIsNotReady() != nil { - logutil.BgLogger().Debug("tikv reports `DataIsNotReady` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `DataIsNotReady` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()), zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()), @@ -1970,7 +2259,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.Stringer("ctx", ctx)) if s.replicaSelector != nil { - if errLabel == "mismatch_peer_id" { + if regionErrLabel == "mismatch_peer_id" { s.replicaSelector.invalidateRegion() return false, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c674f1218..c7c755353 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1314,7 +1314,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { leaderAddr := "" reqTargetAddrs := make(map[string]struct{}) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() bo := retry.NewBackoffer(context.Background(), 10000) mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { reqTargetAddrs[addr] = struct{}{} @@ -1338,7 +1338,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { resetStats := func() { reqTargetAddrs = make(map[string]struct{}) s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() } //Test different read type. @@ -1361,10 +1361,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { regionErr, err := resp.GetRegionError() s.Nil(err) s.True(IsFakeRegionError(regionErr)) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc - s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(3), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. // warn: must reset MaxExecutionDurationMs before retry. resetStats() if staleRead { @@ -1380,9 +1380,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(1), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } } @@ -1416,7 +1416,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil }} s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) - s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() getLocFn := func() *KeyLocation { loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) s.Nil(err) @@ -1438,9 +1438,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo s.Nil(err) s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) - s.Equal(1, len(s.regionRequestSender.Stats)) - s.Equal(int64(2), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc - s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(2), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index b95f15de0..484daec66 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -39,6 +39,7 @@ import ( "fmt" "math/rand" "net" + "strconv" "sync" "sync/atomic" "testing" @@ -52,6 +53,7 @@ import ( "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/internal/client" @@ -744,3 +746,63 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { // batchSendLoop should not panic. s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0)) } + +func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() { + reqStats := NewRegionRequestRuntimeStats() + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Second*2) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Millisecond*200) + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("region_not_found") + reqStats.Merge(NewRegionRequestRuntimeStats()) + reqStats2 := NewRegionRequestRuntimeStats() + reqStats2.Merge(reqStats.Clone()) + expecteds := []string{ + // Since map iteration order is random, we need to check all possible orders. + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{region_not_found:1, context canceled:2}", + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{region_not_found:1, context canceled:2}", + } + s.Contains(expecteds, reqStats.String()) + s.Contains(expecteds, reqStats2.String()) + for i := 0; i < 50; i++ { + reqStats.RecordRPCErrorStats("err_" + strconv.Itoa(i)) + } + s.Regexp("{.*err_.*:1.*, other_error:36}", reqStats.RequestErrorStats.String()) + s.Regexp(".*num_rpc.*total_time.*, rpc_errors:{.*err.*, other_error:36}", reqStats.String()) + + access := &ReplicaAccessStats{} + access.recordReplicaAccessInfo(true, false, 1, 2, "data_not_ready") + access.recordReplicaAccessInfo(false, false, 3, 4, "not_leader") + access.recordReplicaAccessInfo(false, true, 5, 6, "server_is_Busy") + s.Equal("{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}", access.String()) + for i := 0; i < 20; i++ { + access.recordReplicaAccessInfo(false, false, 5+uint64(i)%2, 6, "server_is_Busy") + } + expecteds = []string{ + // Since map iteration order is random, we need to check all possible orders. + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:5, error_stats:{server_is_Busy:9}}, {peer:6, error_stats:{server_is_Busy:9}}}", + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:6, error_stats:{server_is_Busy:9}}, {peer:5, error_stats:{server_is_Busy:9}}}", + } + s.Contains(expecteds, access.String()) +} + +type noCauseError struct { + error +} + +func (_ noCauseError) Cause() error { + return nil +} + +func TestGetErrMsg(t *testing.T) { + err := noCauseError{error: errors.New("no cause err")} + require.Equal(t, nil, errors.Cause(err)) + require.Panicsf(t, func() { + _ = errors.Cause(err).Error() + }, "should panic") + require.Equal(t, "no cause err", getErrMsg(err)) +} diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 03ba5e38e..a2723e05b 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -133,7 +133,7 @@ func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) er // and never sleep more than maxSleepMs for each sleep. func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error { if strings.Contains(err.Error(), tikverr.MismatchClusterID) { - logutil.BgLogger().Fatal("critical error", zap.Error(err)) + logutil.Logger(b.ctx).Fatal("critical error", zap.Error(err)) } select { case <-b.ctx.Done(): @@ -176,7 +176,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime) returnedErr = longestSleepCfg.err } - logutil.BgLogger().Warn(errMsg) + logutil.Logger(b.ctx).Warn(errMsg) // Use the backoff type that contributes most to the timeout to generate a MySQL error. return errors.WithStack(returnedErr) } diff --git a/metrics/metrics.go b/metrics/metrics.go index 5db99cd33..3d7eb2526 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -50,6 +50,7 @@ var ( TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec + TiKVRPCErrorCounter *prometheus.CounterVec TiKVTxnWriteKVCountHistogram prometheus.Histogram TiKVTxnWriteSizeHistogram prometheus.Histogram TiKVRawkvCmdHistogram *prometheus.HistogramVec @@ -198,7 +199,15 @@ func initMetrics(namespace, subsystem string) { Subsystem: subsystem, Name: "region_err_total", Help: "Counter of region errors.", - }, []string{LblType}) + }, []string{LblType, LblStore}) + + TiKVRPCErrorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rpc_err_total", + Help: "Counter of rpc errors.", + }, []string{LblType, LblStore}) TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -641,6 +650,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) + prometheus.MustRegister(TiKVRPCErrorCounter) prometheus.MustRegister(TiKVTxnWriteKVCountHistogram) prometheus.MustRegister(TiKVTxnWriteSizeHistogram) prometheus.MustRegister(TiKVRawkvCmdHistogram) diff --git a/tikv/region.go b/tikv/region.go index 8b9576578..567595ade 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -111,11 +111,6 @@ var ( ModeTxn Mode = client.ModeTxn ) -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - locate.RecordRegionRequestRuntimeStats(stats, cmd, d) -} - // Store contains a kv process's address. type Store = locate.Store @@ -171,7 +166,7 @@ func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption { } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { return locate.NewRegionRequestRuntimeStats() } diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index 34a6636d5..834a04f18 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -62,7 +62,7 @@ type ClientHelper struct { committedLocks *util.TSSet client client.Client resolveLite bool - locate.RegionRequestRuntimeStats + Stats *locate.RegionRequestRuntimeStats } // NewClientHelper creates a helper instance. @@ -81,7 +81,7 @@ func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, committedLocks *u func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.ResolveLocksOptions) (txnlock.ResolveLockResult, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } opts.ForRead = true @@ -103,7 +103,7 @@ func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.R func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } msBeforeTxnExpired, resolvedLocks, committedLocks, err := ch.lockResolver.ResolveLocksForRead(bo, callerStartTS, locks, ch.resolveLite) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 091396e95..fbb91bf21 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -361,7 +361,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, false) s.mu.RLock() if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -587,7 +587,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, true) if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -924,25 +924,17 @@ func (s *KVSnapshot) recordBackoffInfo(bo *retry.Backoffer) { } } -func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { +func (s *KVSnapshot) mergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats.Stats == nil { - s.mu.stats.rpcStats.Stats = stats + if s.mu.stats.rpcStats == nil { + s.mu.stats.rpcStats = rpcStats return } - for k, v := range stats { - stat, ok := s.mu.stats.rpcStats.Stats[k] - if !ok { - s.mu.stats.rpcStats.Stats[k] = v - continue - } - stat.Count += v.Count - stat.Consume += v.Consume - } + s.mu.stats.rpcStats.Merge(rpcStats) } // SetKVReadTimeout sets timeout for individual KV read operations under this snapshot @@ -966,7 +958,7 @@ func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats locate.RegionRequestRuntimeStats + rpcStats *locate.RegionRequestRuntimeStats backoffSleepMS map[string]int backoffTimes map[string]int scanDetail *util.ScanDetail @@ -976,11 +968,9 @@ type SnapshotRuntimeStats struct { // Clone implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { - newRs := SnapshotRuntimeStats{rpcStats: locate.NewRegionRequestRuntimeStats()} - if rs.rpcStats.Stats != nil { - for k, v := range rs.rpcStats.Stats { - newRs.rpcStats.Stats[k] = v - } + newRs := SnapshotRuntimeStats{} + if rs.rpcStats != nil { + newRs.rpcStats = rs.rpcStats.Clone() } if len(rs.backoffSleepMS) > 0 { newRs.backoffSleepMS = make(map[string]int) @@ -1010,9 +1000,9 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { // Merge implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { - if other.rpcStats.Stats != nil { - if rs.rpcStats.Stats == nil { - rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats, len(other.rpcStats.Stats)) + if other.rpcStats != nil { + if rs.rpcStats == nil { + rs.rpcStats = locate.NewRegionRequestRuntimeStats() } rs.rpcStats.Merge(other.rpcStats) } @@ -1035,7 +1025,9 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - buf.WriteString(rs.rpcStats.String()) + if rs.rpcStats != nil { + buf.WriteString(rs.rpcStats.String()) + } for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/txnkv/txnsnapshot/test_probe.go b/txnkv/txnsnapshot/test_probe.go index e1d4d9310..521d6498d 100644 --- a/txnkv/txnsnapshot/test_probe.go +++ b/txnkv/txnsnapshot/test_probe.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/retry" - "github.com/tikv/client-go/v2/tikvrpc" ) // SnapshotProbe exposes some snapshot utilities for testing purpose. @@ -27,8 +26,8 @@ type SnapshotProbe struct { } // MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. -func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { - s.mergeRegionRequestStats(stats) +func (s SnapshotProbe) MergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { + s.mergeRegionRequestStats(rpcStats) } // RecordBackoffInfo records backoff stats into snapshot's stats.