diff --git a/internal/client/client.go b/internal/client/client.go index 98d847a90..89ee1093b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -596,7 +596,16 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. // tiflash/tiflash_mpp/tidb don't use BatchCommand. enableBatch := req.StoreTp == tikvrpc.TiKV + var spanConn opentracing.Span + if spanRPC != nil { + spanConn = spanRPC.Tracer().StartSpan( + "RPCClient.getConnArray", opentracing.ChildOf(spanRPC.Context()), + ) + } connArray, err := c.getConnArray(addr, enableBatch) + if spanConn != nil { + spanConn.Finish() + } if err != nil { return nil, err } @@ -657,6 +666,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // Or else it's a unary call. ctx1, cancel := context.WithTimeout(ctx, timeout) defer cancel() + var spanCall opentracing.Span + if spanRPC != nil { + spanCall = spanRPC.Tracer().StartSpan("tikvrpc.CallRPC", opentracing.ChildOf(spanRPC.Context())) + defer spanCall.Finish() + ctx1 = opentracing.ContextWithSpan(ctx1, spanCall) + } return tikvrpc.CallRPC(ctx1, client, req) } diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 6a270bc6d..635896e30 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -43,6 +43,7 @@ import ( "sync/atomic" "time" + "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -772,6 +773,12 @@ func sendBatchRequest( req *tikvpb.BatchCommandsRequest_Request, timeout time.Duration, ) (*tikvrpc.Response, error) { + var span0 opentracing.Span + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span0 = span.Tracer().StartSpan("sendBatchRequest", opentracing.ChildOf(span.Context())) + defer span0.Finish() + ctx = opentracing.ContextWithSpan(ctx, span0) + } entry := &batchCommandsEntry{ ctx: ctx, req: req, @@ -783,6 +790,10 @@ func sendBatchRequest( timer := time.NewTimer(timeout) defer timer.Stop() + var spanWaitBatch opentracing.Span + if span0 != nil { + spanWaitBatch = span0.Tracer().StartSpan("waitBatch", opentracing.ChildOf(span0.Context())) + } start := time.Now() select { case batchConn.batchCommandsCh <- entry: @@ -794,7 +805,15 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } metrics.TiKVBatchWaitDuration.Observe(float64(time.Since(start))) + if spanWaitBatch != nil { + spanWaitBatch.Finish() + } + var spanWaitRes opentracing.Span + if span0 != nil { + spanWaitRes = span0.Tracer().StartSpan("waitRes", opentracing.ChildOf(span0.Context())) + defer spanWaitRes.Finish() + } select { case res, ok := <-entry.res: if !ok { diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 87cd06704..38d6f873e 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -863,6 +863,11 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly. func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionCache.GetTiFlashRPCContext", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } ts := time.Now().Unix() cachedRegion := c.GetCachedRegionWithRLock(id) @@ -1834,6 +1839,11 @@ func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) } func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *Store) (addr string, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("regionCache.getStoreAddr", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } state := store.getResolveState() switch state { case resolved, needCheck: @@ -2504,6 +2514,10 @@ func (s *Store) StoreID() uint64 { // initResolve resolves the address of the store that never resolved and returns an // empty string if it's a tombstone. func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("store.initResolve", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } s.resolveMutex.Lock() state := s.getResolveState() defer s.resolveMutex.Unlock() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 287fc175c..833bada4a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -337,6 +337,11 @@ type accessKnownLeader struct { } func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessKnownLeader.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } leader := selector.replicas[state.leaderIdx] liveness := leader.store.getLivenessState() if liveness == unreachable && selector.regionCache.enableForwarding { @@ -399,6 +404,11 @@ type tryFollower struct { } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryFollower.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } var targetReplica *replica hasDeadlineExceededErr := false // Search replica that is not attempted from the last accessed replica @@ -462,6 +472,12 @@ type accessByKnownProxy struct { } func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessByKnownProxy.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + leader := selector.replicas[state.leaderIdx] if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) @@ -496,6 +512,11 @@ type tryNewProxy struct { } func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryNewProxy.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } leader := selector.replicas[state.leaderIdx] if leader.store.getLivenessState() == reachable { selector.regionStore.unsetProxyStoreIfNeeded(selector.region) @@ -567,6 +588,11 @@ type accessFollower struct { } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("accessFollower.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } replicaSize := len(selector.replicas) resetStaleRead := false if state.lastIdx < 0 { @@ -722,6 +748,11 @@ type tryIdleReplica struct { } func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tryIdleReplica.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } // Select a follower replica that has the lowest estimated wait duration minWait := time.Duration(math.MaxInt64) targetIdx := state.leaderIdx @@ -856,6 +887,11 @@ const maxReplicaAttempt = 10 // next creates the RPCContext of the current candidate replica. // It returns a SendError if runs out of all replicas or the cached region is invalidated. func (s *replicaSelector) next(bo *retry.Backoffer) (rpcCtx *RPCContext, err error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("replicaSelector.next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } if !s.region.isValid() { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() return nil, nil @@ -922,6 +958,14 @@ func (s *replicaSelector) refreshRegionStore() { } func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) { + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan( + "replicaSelector.BuildRPCContext", + opentracing.ChildOf(span.Context()), + ) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica() // Backoff and retry if no replica is selected or the selected replica is stale @@ -1123,10 +1167,24 @@ func (s *RegionRequestSender) getRPCContext( et tikvrpc.EndpointType, opts ...StoreSelectorOption, ) (*RPCContext, error) { + var span1 opentracing.Span + if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { + span1 = span.Tracer().StartSpan("regionRequest.getRPCContext", opentracing.ChildOf(span.Context())) + defer span1.Finish() + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } + switch et { case tikvrpc.TiKV, tikvrpc.TiKVRemoteCoprocessor: if s.replicaSelector == nil { + var span2 opentracing.Span + if span1 != nil && span1.Tracer() != nil { + span2 = span1.Tracer().StartSpan("newReplicaSelector", opentracing.ChildOf(span1.Context())) + } selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...) + if span2 != nil { + span2.Finish() + } if selector == nil || err != nil { return nil, err } @@ -1171,8 +1229,9 @@ func (s *RegionRequestSender) SendReqCtx( retryTimes int, err error, ) { + var span1 opentracing.Span if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) + span1 = span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } @@ -1298,6 +1357,9 @@ func (s *RegionRequestSender) SendReqCtx( } } + if span1 != nil { + bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) + } var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) if err != nil {