From 3a7f815d4246e4192f5042dead2d25f7ec04b889 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 7 Dec 2023 14:59:15 +0800 Subject: [PATCH 1/5] add more trace for getRPCContext Signed-off-by: iosmanthus --- internal/locate/region_cache.go | 15 ++++++++ internal/locate/region_request.go | 64 ++++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 87cd06704..aba27a331 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -863,6 +863,11 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly. func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionCache.GetTiFlashRPCContext", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } ts := time.Now().Unix() cachedRegion := c.GetCachedRegionWithRLock(id) @@ -1834,6 +1839,11 @@ func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) } func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *Store) (addr string, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionCache.getStoreAddr", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } state := store.getResolveState() switch state { case resolved, needCheck: @@ -2504,6 +2514,11 @@ func (s *Store) StoreID() uint64 { // initResolve resolves the address of the store that never resolved and returns an // empty string if it's a tombstone. func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("store.initResolve", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } s.resolveMutex.Lock() state := s.getResolveState() defer s.resolveMutex.Unlock() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 287fc175c..c11840434 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -337,6 +337,11 @@ type accessKnownLeader struct { } func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessKnownLeader.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } leader := selector.replicas[state.leaderIdx] liveness := leader.store.getLivenessState() if liveness == unreachable && selector.regionCache.enableForwarding { @@ -399,6 +404,11 @@ type tryFollower struct { } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryFollower.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } var targetReplica *replica hasDeadlineExceededErr := false // Search replica that is not attempted from the last accessed replica @@ -462,6 +472,12 @@ type accessByKnownProxy struct { } func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessByKnownProxy.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + leader := selector.replicas[state.leaderIdx] if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) @@ -496,6 +512,11 @@ type tryNewProxy struct { } func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryNewProxy.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } leader := selector.replicas[state.leaderIdx] if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) @@ -567,6 +588,11 @@ type accessFollower struct { } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessFollower.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } replicaSize := len(selector.replicas) resetStaleRead := false if state.lastIdx < 0 { @@ -722,6 +748,11 @@ type tryIdleReplica struct { } func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryIdleReplica.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } // Select a follower replica that has the lowest estimated wait duration minWait := time.Duration(math.MaxInt64) targetIdx := state.leaderIdx @@ -774,7 +805,12 @@ type invalidStore struct { stateBase } -func (state *invalidStore) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { +func (state *invalidStore) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(context.Background()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("invalidStore.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidStore").Inc() return nil, nil } @@ -785,7 +821,12 @@ type invalidLeader struct { stateBase } -func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { +func (state *invalidLeader) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("invalidLeader.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidLeader").Inc() return nil, nil } @@ -856,6 +897,11 @@ const maxReplicaAttempt = 10 // next creates the RPCContext of the current candidate replica. // It returns a SendError if runs out of all replicas or the cached region is invalidated. func (s *replicaSelector) next(bo *retry.Backoffer) (rpcCtx *RPCContext, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("replicaSelector.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } if !s.region.isValid() { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() return nil, nil @@ -922,6 +968,14 @@ func (s *replicaSelector) refreshRegionStore() { } func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan( + "replicaSelector.BuildRPCContext", + opentracing.ChildOf(span.Context()), + ) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica() // Backoff and retry if no replica is selected or the selected replica is stale @@ -1123,6 +1177,12 @@ func (s *RegionRequestSender) getRPCContext( et tikvrpc.EndpointType, opts ...StoreSelectorOption, ) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionRequest.getRPCContext", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + switch et { case tikvrpc.TiKV, tikvrpc.TiKVRemoteCoprocessor: if s.replicaSelector == nil { From c69ffd18bf8f4ff36cf7be06afbed05c1a51fd54 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 7 Dec 2023 15:43:56 +0800 Subject: [PATCH 2/5] remove trace for invalidStore/Leader Signed-off-by: iosmanthus --- internal/locate/region_request.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c11840434..2dd84be21 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -806,11 +806,6 @@ type invalidStore struct { } func (state *invalidStore) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { - if span := opentracing.SpanFromContext(context.Background()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("invalidStore.next", opentracing.ChildOf(span.Context())) - defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) - } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidStore").Inc() return nil, nil } @@ -822,11 +817,6 @@ type invalidLeader struct { } func (state *invalidLeader) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { - if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("invalidLeader.next", opentracing.ChildOf(span.Context())) - defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) - } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidLeader").Inc() return nil, nil } From fa7ce465b226cceaff1a5ddd7be5412abc65ba5e Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 7 Dec 2023 16:04:58 +0800 Subject: [PATCH 3/5] address comments from zhousir Signed-off-by: iosmanthus --- internal/locate/region_request.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 2dd84be21..81aeaef93 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -805,7 +805,7 @@ type invalidStore struct { stateBase } -func (state *invalidStore) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { +func (state *invalidStore) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidStore").Inc() return nil, nil } @@ -816,7 +816,7 @@ type invalidLeader struct { stateBase } -func (state *invalidLeader) next(bo *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { +func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidLeader").Inc() return nil, nil } @@ -1167,8 +1167,9 @@ func (s *RegionRequestSender) getRPCContext( et tikvrpc.EndpointType, opts ...StoreSelectorOption, ) (*RPCContext, error) { + var span1 opentracing.Span if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("regionRequest.getRPCContext", opentracing.ChildOf(span.Context())) + span1 = span.Tracer().StartSpan("regionRequest.getRPCContext", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } @@ -1176,7 +1177,14 @@ func (s *RegionRequestSender) getRPCContext( switch et { case tikvrpc.TiKV, tikvrpc.TiKVRemoteCoprocessor: if s.replicaSelector == nil { + var span2 opentracing.Span + if span1 != nil && span1.Tracer() != nil { + span2 = span1.Tracer().StartSpan("newReplicaSelector", opentracing.ChildOf(span1.Context())) + } selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...) + if span2 != nil { + span2.Finish() + } if selector == nil || err != nil { return nil, err } From 3658c03e208ca9b5f037448294d756b1b9c1ace5 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 8 Dec 2023 16:18:27 +0800 Subject: [PATCH 4/5] track down RPCClient.sendRequest Signed-off-by: iosmanthus --- internal/client/client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/client/client.go b/internal/client/client.go index 98d847a90..1c7e61234 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -596,7 +596,17 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. // tiflash/tiflash_mpp/tidb don't use BatchCommand. enableBatch := req.StoreTp == tikvrpc.TiKV + var spanConn opentracing.Span + if spanRPC != nil { + spanConn = spanRPC.Tracer().StartSpan( + "RPCClient.getConnArray", opentracing.ChildOf(spanRPC.Context()), + ) + ctx = opentracing.ContextWithSpan(ctx, spanConn) + } connArray, err := c.getConnArray(addr, enableBatch) + if spanConn != nil { + spanConn.Finish() + } if err != nil { return nil, err } @@ -657,6 +667,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // Or else it's a unary call. ctx1, cancel := context.WithTimeout(ctx, timeout) defer cancel() + var spanCall opentracing.Span + if spanRPC != nil { + spanCall = spanRPC.Tracer().StartSpan("tikvrpc.CallRPC", opentracing.ChildOf(spanRPC.Context())) + defer spanCall.Finish() + ctx1 = opentracing.ContextWithSpan(ctx1, spanCall) + } return tikvrpc.CallRPC(ctx1, client, req) } From c3701d00955b8298513572821bc80a559a3140dd Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Fri, 8 Dec 2023 17:28:05 +0800 Subject: [PATCH 5/5] track down sendBatchRequest Signed-off-by: iosmanthus --- internal/client/client.go | 1 - internal/client/client_batch.go | 19 +++++++++++++++++++ internal/locate/region_cache.go | 1 - internal/locate/region_request.go | 6 +++++- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 1c7e61234..89ee1093b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -601,7 +601,6 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R spanConn = spanRPC.Tracer().StartSpan( "RPCClient.getConnArray", opentracing.ChildOf(spanRPC.Context()), ) - ctx = opentracing.ContextWithSpan(ctx, spanConn) } connArray, err := c.getConnArray(addr, enableBatch) if spanConn != nil { diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 6a270bc6d..635896e30 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -43,6 +43,7 @@ import ( "sync/atomic" "time" + "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -772,6 +773,12 @@ func sendBatchRequest( req *tikvpb.BatchCommandsRequest_Request, timeout time.Duration, ) (*tikvrpc.Response, error) { + var span0 opentracing.Span + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span0 = span.Tracer().StartSpan("sendBatchRequest", opentracing.ChildOf(span.Context())) + defer span0.Finish() + ctx = opentracing.ContextWithSpan(ctx, span0) + } entry := &batchCommandsEntry{ ctx: ctx, req: req, @@ -783,6 +790,10 @@ func sendBatchRequest( timer := time.NewTimer(timeout) defer timer.Stop() + var spanWaitBatch opentracing.Span + if span0 != nil { + spanWaitBatch = span0.Tracer().StartSpan("waitBatch", opentracing.ChildOf(span0.Context())) + } start := time.Now() select { case batchConn.batchCommandsCh <- entry: @@ -794,7 +805,15 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } metrics.TiKVBatchWaitDuration.Observe(float64(time.Since(start))) + if spanWaitBatch != nil { + spanWaitBatch.Finish() + } + var spanWaitRes opentracing.Span + if span0 != nil { + spanWaitRes = span0.Tracer().StartSpan("waitRes", opentracing.ChildOf(span0.Context())) + defer spanWaitRes.Finish() + } select { case res, ok := <-entry.res: if !ok { diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index aba27a331..38d6f873e 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2517,7 +2517,6 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("store.initResolve", opentracing.ChildOf(span.Context())) defer span1.Finish() - bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } s.resolveMutex.Lock() state := s.getResolveState() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 81aeaef93..833bada4a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1229,8 +1229,9 @@ func (s *RegionRequestSender) SendReqCtx( retryTimes int, err error, ) { + var span1 opentracing.Span if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) + span1 = span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } @@ -1356,6 +1357,9 @@ func (s *RegionRequestSender) SendReqCtx( } } + if span1 != nil { + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) if err != nil {