Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more trace for getRPCContext #1073

Open
wants to merge 5 commits into
base: cse-region-client
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,16 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
// tiflash/tiflash_mpp/tidb don't use BatchCommand.
enableBatch := req.StoreTp == tikvrpc.TiKV
var spanConn opentracing.Span
if spanRPC != nil {
spanConn = spanRPC.Tracer().StartSpan(
"RPCClient.getConnArray", opentracing.ChildOf(spanRPC.Context()),
)
}
connArray, err := c.getConnArray(addr, enableBatch)
if spanConn != nil {
spanConn.Finish()
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -657,6 +666,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
// Or else it's a unary call.
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var spanCall opentracing.Span
if spanRPC != nil {
spanCall = spanRPC.Tracer().StartSpan("tikvrpc.CallRPC", opentracing.ChildOf(spanRPC.Context()))
defer spanCall.Finish()
ctx1 = opentracing.ContextWithSpan(ctx1, spanCall)
}
return tikvrpc.CallRPC(ctx1, client, req)
}

Expand Down
19 changes: 19 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -772,6 +773,12 @@ func sendBatchRequest(
req *tikvpb.BatchCommandsRequest_Request,
timeout time.Duration,
) (*tikvrpc.Response, error) {
var span0 opentracing.Span
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span0 = span.Tracer().StartSpan("sendBatchRequest", opentracing.ChildOf(span.Context()))
defer span0.Finish()
ctx = opentracing.ContextWithSpan(ctx, span0)
}
entry := &batchCommandsEntry{
ctx: ctx,
req: req,
Expand All @@ -783,6 +790,10 @@ func sendBatchRequest(
timer := time.NewTimer(timeout)
defer timer.Stop()

var spanWaitBatch opentracing.Span
if span0 != nil {
spanWaitBatch = span0.Tracer().StartSpan("waitBatch", opentracing.ChildOf(span0.Context()))
}
start := time.Now()
select {
case batchConn.batchCommandsCh <- entry:
Expand All @@ -794,7 +805,15 @@ func sendBatchRequest(
return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop")
}
metrics.TiKVBatchWaitDuration.Observe(float64(time.Since(start)))
if spanWaitBatch != nil {
spanWaitBatch.Finish()
}

var spanWaitRes opentracing.Span
if span0 != nil {
spanWaitRes = span0.Tracer().StartSpan("waitRes", opentracing.ChildOf(span0.Context()))
defer spanWaitRes.Finish()
}
select {
case res, ok := <-entry.res:
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,11 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
// must be out of date and already dropped from cache or not flash store found.
// `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly.
func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionCache.GetTiFlashRPCContext", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
ts := time.Now().Unix()

cachedRegion := c.GetCachedRegionWithRLock(id)
Expand Down Expand Up @@ -1834,6 +1839,11 @@ func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region)
}

func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *Store) (addr string, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionCache.getStoreAddr", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
state := store.getResolveState()
switch state {
case resolved, needCheck:
Expand Down Expand Up @@ -2504,6 +2514,10 @@ func (s *Store) StoreID() uint64 {
// initResolve resolves the address of the store that never resolved and returns an
// empty string if it's a tombstone.
func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("store.initResolve", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
s.resolveMutex.Lock()
state := s.getResolveState()
defer s.resolveMutex.Unlock()
Expand Down
64 changes: 63 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ type accessKnownLeader struct {
}

func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("accessKnownLeader.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
leader := selector.replicas[state.leaderIdx]
liveness := leader.store.getLivenessState()
if liveness == unreachable && selector.regionCache.enableForwarding {
Expand Down Expand Up @@ -399,6 +404,11 @@ type tryFollower struct {
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tryFollower.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
var targetReplica *replica
hasDeadlineExceededErr := false
// Search replica that is not attempted from the last accessed replica
Expand Down Expand Up @@ -462,6 +472,12 @@ type accessByKnownProxy struct {
}

func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("accessByKnownProxy.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}

leader := selector.replicas[state.leaderIdx]
if leader.store.getLivenessState() == reachable {
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
Expand Down Expand Up @@ -496,6 +512,11 @@ type tryNewProxy struct {
}

func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tryNewProxy.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
leader := selector.replicas[state.leaderIdx]
if leader.store.getLivenessState() == reachable {
selector.regionStore.unsetProxyStoreIfNeeded(selector.region)
Expand Down Expand Up @@ -567,6 +588,11 @@ type accessFollower struct {
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("accessFollower.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
replicaSize := len(selector.replicas)
resetStaleRead := false
if state.lastIdx < 0 {
Expand Down Expand Up @@ -722,6 +748,11 @@ type tryIdleReplica struct {
}

func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tryIdleReplica.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
// Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx
Expand Down Expand Up @@ -856,6 +887,11 @@ const maxReplicaAttempt = 10
// next creates the RPCContext of the current candidate replica.
// It returns a SendError if runs out of all replicas or the cached region is invalidated.
func (s *replicaSelector) next(bo *retry.Backoffer) (rpcCtx *RPCContext, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("replicaSelector.next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
if !s.region.isValid() {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc()
return nil, nil
Expand Down Expand Up @@ -922,6 +958,14 @@ func (s *replicaSelector) refreshRegionStore() {
}

func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(
"replicaSelector.BuildRPCContext",
opentracing.ChildOf(span.Context()),
)
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica()

// Backoff and retry if no replica is selected or the selected replica is stale
Expand Down Expand Up @@ -1123,10 +1167,24 @@ func (s *RegionRequestSender) getRPCContext(
et tikvrpc.EndpointType,
opts ...StoreSelectorOption,
) (*RPCContext, error) {
var span1 opentracing.Span
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 = span.Tracer().StartSpan("regionRequest.getRPCContext", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}

switch et {
case tikvrpc.TiKV, tikvrpc.TiKVRemoteCoprocessor:
if s.replicaSelector == nil {
var span2 opentracing.Span
if span1 != nil && span1.Tracer() != nil {
span2 = span1.Tracer().StartSpan("newReplicaSelector", opentracing.ChildOf(span1.Context()))
}
selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...)
if span2 != nil {
span2.Finish()
}
if selector == nil || err != nil {
return nil, err
}
Expand Down Expand Up @@ -1171,8 +1229,9 @@ func (s *RegionRequestSender) SendReqCtx(
retryTimes int,
err error,
) {
var span1 opentracing.Span
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context()))
span1 = span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
Expand Down Expand Up @@ -1298,6 +1357,9 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

if span1 != nil {
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
Expand Down
Loading