diff --git a/config/retry/config.go b/config/retry/config.go index 89a9bc382..b1b0117e1 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -95,6 +95,11 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +// Base returns the base time of the backoff function. +func (c *Config) Base() int { + return c.fnCfg.base +} + func (c *Config) String() string { return c.name } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index c84c4e77b..020a4aa84 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1057,7 +1057,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() { r := ctxTiFlash.Meta reqSend := NewRegionRequestSender(s.cache, nil) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} - reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil) // check leader read should not go to tiflash lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) @@ -1694,14 +1694,14 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() { s.NotNil(ctx) s.NoError(err) reqSend := NewRegionRequestSender(s.cache, nil) - shouldRetry, err := reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackInProgress: &errorpb.FlashbackInProgress{}}) + shouldRetry, err := reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackInProgress: &errorpb.FlashbackInProgress{}}, nil) s.Error(err) s.False(shouldRetry) - shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackNotPrepared: &errorpb.FlashbackNotPrepared{}}) + shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackNotPrepared: &errorpb.FlashbackNotPrepared{}}, nil) s.Error(err) s.False(shouldRetry) - shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{BucketVersionNotMatch: &errorpb.BucketVersionNotMatch{Keys: [][]byte{[]byte("a")}, Version: 1}}) + shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{BucketVersionNotMatch: &errorpb.BucketVersionNotMatch{Keys: [][]byte{[]byte("a")}, Version: 1}}, nil) s.Nil(err) s.False(shouldRetry) ctx.Region.GetID() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 214a4a05b..58b0e3329 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1097,14 +1097,21 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } -func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { +func isReadWithTinyTimeout(req *tikvrpc.Request) bool { if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) { - // Configurable timeout should less than `ReadTimeoutShort`. return false } switch req.Type { case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + return true + default: + return false + } +} + +func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { + if isReadWithTinyTimeout(req) { if target := s.targetReplica(); target != nil { target.deadlineErrUsingConfTimeout = true } @@ -1113,10 +1120,8 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} } return true - default: - // Only work for read requests, return false for non-read requests. - return false } + return false } func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { @@ -1203,6 +1208,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { func (s *replicaSelector) onServerIsBusy( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy, + onServerBusyFastRetry func(string), ) (shouldRetry bool, err error) { if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil { estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond @@ -1240,6 +1246,14 @@ func (s *replicaSelector) onServerIsBusy( return true, nil } } + if onServerBusyFastRetry != nil && isReadWithTinyTimeout(req) { + if target := s.targetReplica(); target != nil { + // avoid retrying on this replica again + target.deadlineErrUsingConfTimeout = true + } + onServerBusyFastRetry(serverIsBusy.GetReason()) + return true, nil + } err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) if err != nil { return false, err @@ -1397,6 +1411,48 @@ func (s *RegionRequestSender) SendReqCtx( }() } + type backoffArgs struct { + cfg *retry.Config + err error + } + pendingBackoffs := make(map[string]*backoffArgs) + delayBoTiKVServerBusy := func(addr string, reason string) { + if _, ok := pendingBackoffs[addr]; !ok { + pendingBackoffs[addr] = &backoffArgs{ + cfg: retry.BoTiKVServerBusy, + err: errors.Errorf("server is busy, reason: %s", reason), + } + } + } + backoffOnRetry := func(addr string) error { + if args, ok := pendingBackoffs[addr]; ok { + delete(pendingBackoffs, addr) + logutil.Logger(bo.GetCtx()).Warn( + "apply pending backoff on retry", + zap.String("target", addr), + zap.String("bo", args.cfg.String()), + zap.String("err", args.err.Error())) + return bo.Backoff(args.cfg, args.err) + } + return nil + } + backoffOnFakeErr := func() error { + var args *backoffArgs + // if there are multiple pending backoffs, choose the one with the largest base duration. + for _, it := range pendingBackoffs { + if args == nil || args.cfg.Base() < it.cfg.Base() { + args = it + } + } + if args != nil { + logutil.Logger(bo.GetCtx()).Warn( + "apply pending backoff on pseudo region error", + zap.String("bo", args.cfg.String()), + zap.String("err", args.err.Error())) + return bo.Backoff(args.cfg, args.err) + } + return nil + } totalErrors := make(map[string]int) for { if retryTimes > 0 { @@ -1429,6 +1485,9 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. + if err = backoffOnFakeErr(); err != nil { + return nil, nil, retryTimes, err + } s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, retryTimes, err @@ -1460,6 +1519,9 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, retryTimes, err } + if err = backoffOnRetry(rpcCtx.Addr); err != nil { + return nil, nil, retryTimes, err + } var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) req.IsRetryRequest = true @@ -1499,7 +1561,9 @@ func (s *RegionRequestSender) SendReqCtx( if regionErr != nil { regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr) totalErrors[regionErrLogging]++ - retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) + retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, func(reason string) { + delayBoTiKVServerBusy(rpcCtx.Addr, reason) + }) if err != nil { msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors) @@ -1962,6 +2026,7 @@ func isDeadlineExceeded(e *errorpb.Error) bool { func (s *RegionRequestSender) onRegionError( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, + onServerBusyFastRetry func(string), ) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) @@ -2126,7 +2191,7 @@ func (s *RegionRequestSender) onRegionError( } } if s.replicaSelector != nil { - return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy) + return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy, onServerBusyFastRetry) } logutil.Logger(bo.GetCtx()).Warn( "tikv reports `ServerIsBusy` retry later", diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index b5486bc76..49baa99c3 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -992,7 +992,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { // Receive a ServerIsBusy error replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 500, - }) + }, nil) rpcCtx, err = replicaSelector.next(bo) s.Nil(err) @@ -1003,7 +1003,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 800, - }) + }, nil) rpcCtx, err = replicaSelector.next(bo) s.Nil(err) @@ -1016,7 +1016,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { // All peers are too busy replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 150, - }) + }, nil) lessBusyPeer := rpcCtx.Peer.Id // Then, send to the leader again with no threshold. @@ -1620,3 +1620,83 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } + +func (s *testRegionRequestToThreeStoresSuite) TestRetryOnServerBusy() { + key := []byte("key") + region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) + s.Nil(err) + regionStore := region.getStore() + leader, _, _, _ := region.WorkStorePeer(regionStore) + follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleWithMixedReplicaRead() + + var ( + bo *retry.Backoffer + resp *tikvrpc.Response + regionErr *errorpb.Error + ) + + // the target follower is busy, try leader immediately and return value + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr != follower.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.Nil(regionErr) + s.Equal(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal(0, bo.GetBackoffTimes()["tikvServerBusy"]) + + // try target follower, fallback to leader on data-not-ready error, then try the follower again and return value + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if req.StaleRead && addr == follower.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil + } + if addr == leader.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.Nil(regionErr) + s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal(0, bo.GetBackoffTimes()["tikvServerBusy"]) + + // all peers are busy, return fake region error with final server-busy backoff + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.NotNil(regionErr) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, bo.GetBackoffTimes()["tikvServerBusy"]) + + // followers are not initiallized and leader is busy, return fake region error with final server-busy backoff + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{RegionNotInitialized: &errorpb.RegionNotInitialized{}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.NotNil(regionErr) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, bo.GetBackoffTimes()["tikvServerBusy"]) +}