Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: move Backoffer into a single package #24525

Merged
merged 10 commits into from
May 13, 2021
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
Expand Down Expand Up @@ -381,7 +381,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down
11 changes: 7 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
Expand Down Expand Up @@ -829,11 +829,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
return nil, nil
}

boRPCType := tikv.BoTiKVRPC
err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task)
if task.storeType == kv.TiFlash {
boRPCType = tikv.BoTiFlashRPC
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
}
if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil {

if err1 != nil {
return nil, errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
Expand Down Expand Up @@ -343,7 +343,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I believe it's actually a bug for TiFlash requests to use TiKV backoff. This will make the timeout error message to be "TiKV server timeout" which is incorrect.
(needn't be fixed in this pr)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,you are right

if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down
4 changes: 3 additions & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ var (
ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No
)

const getAllMembersBackoff = 5000

// EtcdAddrs returns etcd server addresses.
func (s *tikvStore) EtcdAddrs() ([]string, error) {
if s.etcdAddrs == nil {
Expand All @@ -220,7 +222,7 @@ func (s *tikvStore) EtcdAddrs() ([]string, error) {
}

ctx := context.Background()
bo := tikv.NewBackoffer(ctx, tikv.GetAllMembersBackoff)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
etcdAddrs := make([]string, 0)
pdClient := s.GetPDClient()
if pdClient == nil {
Expand Down
14 changes: 8 additions & 6 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s

var stat tikv.RangeTaskStat
key := startKey
bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
// cooperate with github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry
Expand Down Expand Up @@ -1147,7 +1147,7 @@ retryScanAndResolve:
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
bo = tikv.NewBackofferWithVars(ctx, sleep, nil)
Expand Down Expand Up @@ -1460,7 +1460,7 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
failpoint.Return(errors.New("injectedError"))
})

bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)

for {
if len(locks) == 0 {
Expand Down Expand Up @@ -1496,18 +1496,20 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
}

// Recreate backoffer for next region
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
locks = locks[len(locksInRegion):]
}

return nil
}

const gcOneRegionMaxBackoff = 20000

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
var newSafePoint uint64
var err error

bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
for {
newSafePoint, err = w.pdClient.UpdateGCSafePoint(ctx, safePoint)
if err != nil {
Expand Down Expand Up @@ -1544,7 +1546,7 @@ func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []b
}()
key := startKey
for {
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := w.tikvStore.GetRegionCache().LocateKey(bo, key)
if err != nil {
return stat, errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) {

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
ctx := context.Background()
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte(""))
c.Assert(err, IsNil)
var regionErr *errorpb.Error
Expand Down Expand Up @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
retry.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
c.Assert(err, IsNil)
Expand Down
Loading