From 90887dbb31d7f27bf57c5456d323d6a7b6148e5e Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 22 Sep 2023 12:59:55 +0800 Subject: [PATCH 1/5] retry short read reqs immediately on server busy Signed-off-by: zyguan --- internal/locate/region_cache_test.go | 8 ++-- internal/locate/region_request.go | 65 ++++++++++++++++++++++++++-- internal/retry/config.go | 5 +++ 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 6226a1c60..499adf25d 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1005,7 +1005,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() { r := ctxTiFlash.Meta reqSend := NewRegionRequestSender(s.cache, nil) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} - reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil) // check leader read should not go to tiflash lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) @@ -1640,14 +1640,14 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() { s.NotNil(ctx) s.NoError(err) reqSend := NewRegionRequestSender(s.cache, nil) - shouldRetry, err := reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackInProgress: &errorpb.FlashbackInProgress{}}) + shouldRetry, err := reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackInProgress: &errorpb.FlashbackInProgress{}}, nil) s.Error(err) s.False(shouldRetry) - shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackNotPrepared: &errorpb.FlashbackNotPrepared{}}) + shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{FlashbackNotPrepared: &errorpb.FlashbackNotPrepared{}}, nil) s.Error(err) s.False(shouldRetry) - shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{BucketVersionNotMatch: &errorpb.BucketVersionNotMatch{Keys: [][]byte{[]byte("a")}, Version: 1}}) + shouldRetry, err = reqSend.onRegionError(s.bo, ctx, nil, &errorpb.Error{BucketVersionNotMatch: &errorpb.BucketVersionNotMatch{Keys: [][]byte{[]byte("a")}, Version: 1}}, nil) s.Nil(err) s.False(shouldRetry) ctx.Region.GetID() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9d823e4f0..9da8d70cd 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1384,6 +1384,48 @@ func (s *RegionRequestSender) SendReqCtx( }() } + type backoffArgs struct { + cfg *retry.Config + err error + } + pendingBackoffs := make(map[string]*backoffArgs) + delayBoTiKVServerBusy := func(addr string, reason string) { + if _, ok := pendingBackoffs[addr]; !ok { + pendingBackoffs[addr] = &backoffArgs{ + cfg: retry.BoTiKVServerBusy, + err: errors.Errorf("server is busy, reason: %s", reason), + } + } + } + backoffOnRetry := func(addr string) error { + if args, ok := pendingBackoffs[addr]; ok { + delete(pendingBackoffs, addr) + logutil.Logger(bo.GetCtx()).Warn( + "apply pending backoff on retry", + zap.String("target", addr), + zap.String("bo", args.cfg.String()), + zap.String("err", args.err.Error())) + return bo.Backoff(args.cfg, args.err) + } + return nil + } + backoffOnFakeErr := func() error { + var args *backoffArgs + // if there are multiple pending backoffs, choose the one with the largest base duration. + for _, it := range pendingBackoffs { + if args == nil || args.cfg.Base() < it.cfg.Base() { + args = it + } + } + if args != nil { + logutil.Logger(bo.GetCtx()).Warn( + "apply pending backoff on pseudo region error", + zap.String("bo", args.cfg.String()), + zap.String("err", args.err.Error())) + return bo.Backoff(args.cfg, args.err) + } + return nil + } totalErrors := make(map[string]int) for { if retryTimes > 0 { @@ -1416,6 +1458,9 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. + if err = backoffOnFakeErr(); err != nil { + return nil, nil, retryTimes, err + } s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors) resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, retryTimes, err @@ -1447,6 +1492,9 @@ func (s *RegionRequestSender) SendReqCtx( s.replicaSelector.patchRequestSource(req, rpcCtx) } + if err = backoffOnRetry(rpcCtx.Addr); err != nil { + return nil, nil, retryTimes, err + } var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) req.IsRetryRequest = true @@ -1486,7 +1534,9 @@ func (s *RegionRequestSender) SendReqCtx( if regionErr != nil { regionErrLogging := regionErrorToLogging(rpcCtx.Peer.GetId(), regionErr) totalErrors[regionErrLogging]++ - retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) + retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, func(reason string) { + delayBoTiKVServerBusy(rpcCtx.Addr, reason) + }) if err != nil { msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors) @@ -1949,6 +1999,7 @@ func isDeadlineExceeded(e *errorpb.Error) bool { func (s *RegionRequestSender) onRegionError( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, + onServerBusyFastRetry func(string), ) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) @@ -2107,12 +2158,18 @@ func (s *RegionRequestSender) onRegionError( } if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { - if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { + if s.replicaSelector != nil { if s.replicaSelector.onReadReqConfigurableTimeout(req) { + // now `req` must be a read request with configured read-timeout (max-exec-dur < ReadTimeoutShort). + // when the `ServerBusy` is not caused by timeout, we still retry other peer immediately, and also + // delay backoff to avoid accessing busy peers too frequently. + if reason := serverIsBusy.GetReason(); !strings.Contains(reason, "deadline is exceeded") { + if onServerBusyFastRetry != nil { + onServerBusyFastRetry(reason) + } + } return true, nil } - } - if s.replicaSelector != nil { return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy) } logutil.Logger(bo.GetCtx()).Warn( diff --git a/internal/retry/config.go b/internal/retry/config.go index 9c062cc75..f7d4b04d1 100644 --- a/internal/retry/config.go +++ b/internal/retry/config.go @@ -95,6 +95,11 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +// Base returns the base time of the backoff function. +func (c *Config) Base() int { + return c.fnCfg.base +} + func (c *Config) String() string { return c.name } From 999d08a01ac6b0440e09dbf8aac0d9cd8f742df1 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 22 Sep 2023 14:54:41 +0800 Subject: [PATCH 2/5] be compatible with load-based replica read Signed-off-by: zyguan --- internal/locate/region_request.go | 34 ++++++++++++++----------- internal/locate/region_request3_test.go | 6 ++--- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9da8d70cd..ddb8fe9c3 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1085,14 +1085,21 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } -func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { +func isReadWithTinyTimeout(req *tikvrpc.Request) bool { if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) { - // Configurable timeout should less than `ReadTimeoutShort`. return false } switch req.Type { case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + return true + default: + return false + } +} + +func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { + if isReadWithTinyTimeout(req) { if target := s.targetReplica(); target != nil { target.deadlineErrUsingConfTimeout = true } @@ -1101,10 +1108,8 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} } return true - default: - // Only work for read requests, return false for non-read requests. - return false } + return false } func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { @@ -1191,6 +1196,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { func (s *replicaSelector) onServerIsBusy( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy, + onServerBusyFastRetry func(string), ) (shouldRetry bool, err error) { if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil { estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond @@ -1228,6 +1234,10 @@ func (s *replicaSelector) onServerIsBusy( return true, nil } } + if onServerBusyFastRetry != nil && isReadWithTinyTimeout(req) { + onServerBusyFastRetry(serverIsBusy.GetReason()) + return true, nil + } err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) if err != nil { return false, err @@ -2158,19 +2168,13 @@ func (s *RegionRequestSender) onRegionError( } if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { - if s.replicaSelector != nil { + if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { if s.replicaSelector.onReadReqConfigurableTimeout(req) { - // now `req` must be a read request with configured read-timeout (max-exec-dur < ReadTimeoutShort). - // when the `ServerBusy` is not caused by timeout, we still retry other peer immediately, and also - // delay backoff to avoid accessing busy peers too frequently. - if reason := serverIsBusy.GetReason(); !strings.Contains(reason, "deadline is exceeded") { - if onServerBusyFastRetry != nil { - onServerBusyFastRetry(reason) - } - } return true, nil } - return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy) + } + if s.replicaSelector != nil { + return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy, onServerBusyFastRetry) } logutil.Logger(bo.GetCtx()).Warn( "tikv reports `ServerIsBusy` retry later", diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 41dd69b6b..61572699f 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -982,7 +982,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { // Receive a ServerIsBusy error replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 500, - }) + }, nil) rpcCtx, err = replicaSelector.next(bo) s.Nil(err) @@ -993,7 +993,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 800, - }) + }, nil) rpcCtx, err = replicaSelector.next(bo) s.Nil(err) @@ -1006,7 +1006,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { // All peers are too busy replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{ EstimatedWaitMs: 150, - }) + }, nil) lessBusyPeer := rpcCtx.Peer.Id // Then, send to the leader again with no threshold. From 71902c9ed928566a16a000faeefd0153438bf498 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 22 Sep 2023 16:20:16 +0800 Subject: [PATCH 3/5] do not retry same peer after fast retry Signed-off-by: zyguan --- internal/locate/region_request.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index ddb8fe9c3..8b9c31373 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1235,6 +1235,10 @@ func (s *replicaSelector) onServerIsBusy( } } if onServerBusyFastRetry != nil && isReadWithTinyTimeout(req) { + if target := s.targetReplica(); target != nil { + // avoid retrying on this replica again + target.deadlineErrUsingConfTimeout = true + } onServerBusyFastRetry(serverIsBusy.GetReason()) return true, nil } From 1c22121133073cfc76d22d705d845c18cc6b7573 Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 25 Sep 2023 16:16:52 +0800 Subject: [PATCH 4/5] add unit tests Signed-off-by: zyguan --- internal/locate/region_request3_test.go | 80 +++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index a4697272e..654667e73 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1620,3 +1620,83 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } + +func (s *testRegionRequestToThreeStoresSuite) TestRetryOnServerBusy() { + key := []byte("key") + region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) + s.Nil(err) + regionStore := region.getStore() + leader, _, _, _ := region.WorkStorePeer(regionStore) + follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + + var ( + bo *retry.Backoffer + resp *tikvrpc.Response + regionErr *errorpb.Error + ) + + // the target follower is busy, try leader immediately and return value + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr != follower.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.Nil(regionErr) + s.Equal(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal(0, bo.GetBackoffTimes()["tikvServerBusy"]) + + // try target follower, fallback to leader on data-not-ready error, then try the follower again and return value + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if req.StaleRead && addr == follower.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil + } + if addr == leader.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.Nil(regionErr) + s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Equal(0, bo.GetBackoffTimes()["tikvServerBusy"]) + + // all peers are busy, return fake region error with final server-busy backoff + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.NotNil(regionErr) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, bo.GetBackoffTimes()["tikvServerBusy"]) + + // followers are not initiallized and leader is busy, return fake region error with final server-busy backoff + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 2000, Reason: "mock server busy"}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{RegionNotInitialized: &errorpb.RegionNotInitialized{}}}}, nil + }} + bo = retry.NewBackoffer(context.Background(), -1) + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + regionErr, _ = resp.GetRegionError() + s.NotNil(regionErr) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, bo.GetBackoffTimes()["tikvServerBusy"]) +} From d2b78dc8cff281af0cf61d2fd024ed3cdc9c58c9 Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 3 Jan 2024 10:26:08 +0800 Subject: [PATCH 5/5] fix ut Signed-off-by: zyguan --- internal/locate/region_request3_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 451b2aadd..49baa99c3 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1632,7 +1632,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryOnServerBusy() { req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope - req.EnableStaleRead() + req.EnableStaleWithMixedReplicaRead() var ( bo *retry.Backoffer