Skip to content

Commit

Permalink
stpre/copr: fix tests caused by usage of tikv.TxnStartKey
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche committed May 11, 2021
1 parent 9248d1d commit e77c20b
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 18 deletions.
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
Expand Down Expand Up @@ -386,7 +386,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down
11 changes: 7 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
Expand Down Expand Up @@ -830,11 +830,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
return nil, nil
}

boRPCType := tikv.BoTiKVRPC
err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task)
if task.storeType == kv.TiFlash {
boRPCType = tikv.BoTiFlashRPC
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
}
if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil {

if err1 != nil {
return nil, errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
Expand Down Expand Up @@ -344,7 +344,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down
5 changes: 3 additions & 2 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) {

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
ctx := context.Background()
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte(""))
c.Assert(err, IsNil)
var regionErr *errorpb.Error
Expand Down Expand Up @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
retry.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
c.Assert(err, IsNil)
Expand Down
1 change: 0 additions & 1 deletion store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type BackoffType = retry.BackoffType
// Back off types.
const (
BoRegionMiss = retry.BoRegionMiss
BoTiKVRPC = retry.BoTiKVRPC
BoTiFlashRPC = retry.BoTiFlashRPC
BoTxnLockFast = retry.BoTxnLockFast
BoTxnLock = retry.BoTxnLock
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b
break
}

err2 := b.Backoff(retry.BoTiKVRPC, err1)
err2 := b.BackoffTiKVRPC(err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx))
} else {
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
}
return errors.Trace(err)
}
Expand Down
15 changes: 10 additions & 5 deletions store/tikv/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
func (t BackoffType) metric() prometheus.Observer {
switch t {
// TODO: distinguish tikv and tiflash in metrics
case BoTiKVRPC, BoTiFlashRPC:
case boTiKVRPC, BoTiFlashRPC:
return metrics.BackoffHistogramRPC
case BoTxnLock:
return metrics.BackoffHistogramLock
Expand Down Expand Up @@ -121,7 +121,7 @@ type BackoffType int

// Back off types.
const (
BoTiKVRPC BackoffType = iota
boTiKVRPC BackoffType = iota
BoTiFlashRPC
BoTxnLock
BoTxnLockFast
Expand All @@ -139,7 +139,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int
vars.Hook(t.String(), vars)
}
switch t {
case BoTiKVRPC, BoTiFlashRPC:
case boTiKVRPC, BoTiFlashRPC:
return NewBackoffFn(100, 2000, EqualJitter)
case BoTxnLock:
return NewBackoffFn(200, 3000, EqualJitter)
Expand All @@ -164,7 +164,7 @@ func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int

func (t BackoffType) String() string {
switch t {
case BoTiKVRPC:
case boTiKVRPC:
return "tikvRPC"
case BoTiFlashRPC:
return "tiflashRPC"
Expand Down Expand Up @@ -193,7 +193,7 @@ func (t BackoffType) String() string {
// TError returns pingcap/error of the backoff type.
func (t BackoffType) TError() error {
switch t {
case BoTiKVRPC:
case boTiKVRPC:
return tikverr.ErrTiKVServerTimeout
case BoTiFlashRPC:
return tikverr.ErrTiFlashServerTimeout
Expand Down Expand Up @@ -279,6 +279,11 @@ func (b *Backoffer) Backoff(typ BackoffType, err error) error {
return b.BackoffWithMaxSleep(typ, -1, err)
}

// BackoffTiKVRPC calls Backoff with boTiKVRPC.
func (b *Backoffer) BackoffTiKVRPC(err error) error {
return b.Backoff(boTiKVRPC, err)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error {
Expand Down

0 comments on commit e77c20b

Please sign in to comment.