Skip to content

Commit

Permalink
store/tikv: move Backoffer into a single package (#24525)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche committed May 13, 2021
1 parent 9692c13 commit 7c8ddd8
Show file tree
Hide file tree
Showing 30 changed files with 731 additions and 614 deletions.
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
Expand Down Expand Up @@ -381,7 +381,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down
11 changes: 7 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
Expand Down Expand Up @@ -829,11 +829,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
return nil, nil
}

boRPCType := tikv.BoTiKVRPC
err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task)
if task.storeType == kv.TiFlash {
boRPCType = tikv.BoTiFlashRPC
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
}
if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil {

if err1 != nil {
return nil, errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
Expand Down Expand Up @@ -343,7 +343,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down
4 changes: 3 additions & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ var (
ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No
)

const getAllMembersBackoff = 5000

// EtcdAddrs returns etcd server addresses.
func (s *tikvStore) EtcdAddrs() ([]string, error) {
if s.etcdAddrs == nil {
Expand All @@ -220,7 +222,7 @@ func (s *tikvStore) EtcdAddrs() ([]string, error) {
}

ctx := context.Background()
bo := tikv.NewBackoffer(ctx, tikv.GetAllMembersBackoff)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
etcdAddrs := make([]string, 0)
pdClient := s.GetPDClient()
if pdClient == nil {
Expand Down
14 changes: 8 additions & 6 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s

var stat tikv.RangeTaskStat
key := startKey
bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
// cooperate with github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry
Expand Down Expand Up @@ -1147,7 +1147,7 @@ retryScanAndResolve:
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
bo = tikv.NewBackofferWithVars(ctx, sleep, nil)
Expand Down Expand Up @@ -1460,7 +1460,7 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
failpoint.Return(errors.New("injectedError"))
})

bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)

for {
if len(locks) == 0 {
Expand Down Expand Up @@ -1496,18 +1496,20 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
}

// Recreate backoffer for next region
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
locks = locks[len(locksInRegion):]
}

return nil
}

const gcOneRegionMaxBackoff = 20000

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
var newSafePoint uint64
var err error

bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
for {
newSafePoint, err = w.pdClient.UpdateGCSafePoint(ctx, safePoint)
if err != nil {
Expand Down Expand Up @@ -1544,7 +1546,7 @@ func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []b
}()
key := startKey
for {
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := w.tikvStore.GetRegionCache().LocateKey(bo, key)
if err != nil {
return stat, errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) {

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
ctx := context.Background()
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte(""))
c.Assert(err, IsNil)
var regionErr *errorpb.Error
Expand Down Expand Up @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
retry.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 7c8ddd8

Please sign in to comment.