Skip to content

Commit

Permalink
client: use Tracer from Span and fix unfinished span (#7860)
Browse files Browse the repository at this point in the history
close #7861

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] committed Mar 5, 2024
1 parent 41012b5 commit ba33aa5
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 72 deletions.
64 changes: 32 additions & 32 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,10 +773,10 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture {
}

func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture {
defer trace.StartRegion(ctx, "GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
defer trace.StartRegion(ctx, "pdclient.GetLocalTSAsync").End()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetLocalTSAsync", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

req := tsoReqPool.Get().(*tsoRequest)
Expand Down Expand Up @@ -875,8 +875,8 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -913,8 +913,8 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -951,8 +951,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -989,8 +989,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1027,8 +1027,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1102,8 +1102,8 @@ func handleRegionsResponse(resp *pdpb.ScanRegionsResponse) []*Region {
}

func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1146,8 +1146,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetAllStores", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1173,8 +1173,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
}

func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -1204,8 +1204,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

Expand Down Expand Up @@ -1234,8 +1234,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithGroup(ctx, regionID, "")
Expand Down Expand Up @@ -1268,16 +1268,16 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
}

func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
return c.scatterRegionsWithOptions(ctx, regionsID, opts...)
}

func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1304,8 +1304,8 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
}

func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.GetOperator", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand All @@ -1327,8 +1327,8 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe

// SplitRegions split regions by given split keys
func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type GCClient interface {

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -63,8 +63,8 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
12 changes: 6 additions & 6 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {

// LoadKeyspace loads and returns target keyspace's metadata.
func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.LoadKeyspace", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -84,8 +84,8 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
//
// Updated keyspace meta will be returned.
func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.UpdateKeyspaceState", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -123,8 +123,8 @@ func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.Keyspac

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
8 changes: 4 additions & 4 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
opt(options)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Put", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down Expand Up @@ -148,8 +148,8 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
options.rangeEnd = getPrefix(key)
}

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.Get", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
Expand Down
35 changes: 10 additions & 25 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, requ
return err
}

defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End()
defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -104,7 +104,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds())
select {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "tsoReqDone").End()
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
if err != nil {
Expand Down Expand Up @@ -350,7 +350,6 @@ func (c *tsoClient) handleDispatcher(
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
)
defer func() {
log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc))
Expand Down Expand Up @@ -499,8 +498,7 @@ tsoBatchLoop:
return
case tsDeadlineCh.(chan *deadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
err = c.processRequests(stream, dc, tbc)
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand Down Expand Up @@ -758,26 +756,16 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
return nil
}

func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption {
for _, req := range tbc.getCollectedRequests() {
if span := opentracing.SpanFromContext(req.requestCtx); span != nil {
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
return opts
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption,
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
if len(opts) > 0 {
span := opentracing.StartSpan("pdclient.processRequests", opts...)
defer span.Finish()
}

requests := tbc.getCollectedRequests()
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "tsoReqSend").End()
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
Expand Down Expand Up @@ -849,11 +837,8 @@ func (c *tsoClient) compareAndSwapTS(

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil {
span.Finish()
}
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(requests[i].requestCtx, "tsoReqDequeue").End()
defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End()
requests[i].done <- err
}
}
18 changes: 18 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"time"

"github.com/docker/go-units"
"github.com/opentracing/basictracer-go"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -469,6 +471,22 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember"))

recorder := basictracer.NewInMemoryRecorder()
tracer := basictracer.New(recorder)
span := tracer.StartSpan("trace")
ctx = opentracing.ContextWithSpan(ctx, span)
future := cli.GetLocalTSAsync(ctx, "error-dc")
spans := recorder.GetSpans()
re.Len(spans, 1)
_, _, err = future.Wait()
re.Error(err)
spans = recorder.GetSpans()
re.Len(spans, 1)
_, _, err = cli.GetTS(ctx)
re.NoError(err)
spans = recorder.GetSpans()
re.Len(spans, 3)

// Test the TSO follower proxy while enabling the Local TSO.
cli.UpdateOption(pd.EnableTSOFollowerProxy, true)
// Sleep a while here to prevent from canceling the ongoing TSO request.
Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/oleiade/reflections v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/opentracing/basictracer-go v1.1.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect
Expand Down

0 comments on commit ba33aa5

Please sign in to comment.