diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index afc2db4bf67ff..810585525db8d 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "@com_github_klauspost_compress//snappy", "@com_github_klauspost_compress//zstd", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", @@ -70,7 +71,7 @@ go_test( ], embed = [":storage"], flaky = True, - shard_count = 42, + shard_count = 43, deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", @@ -81,6 +82,7 @@ go_test( "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_golang_mock//gomock", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_stretchr_testify//require", ], diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 97b9b09a69272..d6d29706825bc 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -27,6 +27,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -956,7 +957,21 @@ func isDeadlineExceedError(err error) bool { return strings.Contains(err.Error(), "context deadline exceeded") } +func isConnectionResetError(err error) bool { + return strings.Contains(err.Error(), "read: connection reset") +} + func (rl retryerWithLog) ShouldRetry(r *request.Request) bool { + // for unit test + failpoint.Inject("replace-error-to-connection-reset-by-peer", func(_ failpoint.Value) { + log.Info("original error", zap.Error(r.Error)) + if r.Error != nil { + r.Error = errors.New("read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer") + } + }) + if isConnectionResetError(r.Error) { + return true + } if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress { // fast fail for unreachable linklocal address in EC2 containers. log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error)) diff --git a/br/pkg/storage/s3_test.go b/br/pkg/storage/s3_test.go index 0541e66b8617f..a1f185824417b 100644 --- a/br/pkg/storage/s3_test.go +++ b/br/pkg/storage/s3_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "os" + "sync" "testing" "github.com/aws/aws-sdk-go/aws" @@ -20,6 +21,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/golang/mock/gomock" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/br/pkg/mock" . "github.com/pingcap/tidb/br/pkg/storage" @@ -1292,3 +1294,49 @@ func TestS3StorageBucketRegion(t *testing.T) { }(ca.name, ca.expectRegion, ca.s3) } } + +func TestRetryError(t *testing.T) { + var count int32 = 0 + var errString string = "read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer" + var lock sync.Mutex + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "PUT" { + var curCnt int32 + t.Log(r.URL) + lock.Lock() + count += 1 + curCnt = count + lock.Unlock() + if curCnt < 2 { + // write an cannot-retry error, but we modify the error to specific error, so client would retry. + w.WriteHeader(403) + return + } + } + + w.WriteHeader(200) + })) + + defer server.Close() + t.Log(server.URL) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer", "return(true)")) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer") + }() + + ctx := context.Background() + s, err := NewS3Storage(ctx, &backuppb.S3{ + Endpoint: server.URL, + Bucket: "test", + Prefix: "retry", + AccessKey: "none", + SecretAccessKey: "none", + Provider: "skip check region", + ForcePathStyle: true, + }, &ExternalStorageOptions{}) + require.NoError(t, err) + err = s.WriteFile(ctx, "reset", []byte(errString)) + require.NoError(t, err) + require.Equal(t, count, int32(2)) +} diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 67ded2df49172..e1139d2b22586 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -194,11 +194,11 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { // bo.attempt-- e := errors.Cause(err) switch e { // nolint:errorlint - case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: + case nil, context.Canceled, context.DeadlineExceeded, sql.ErrNoRows: // Excepted error, finish the operation bo.delayTime = 0 bo.attempt = 0 - case berrors.ErrRestoreTotalKVMismatch: + case berrors.ErrRestoreTotalKVMismatch, io.EOF: bo.delayTime = 2 * bo.delayTime bo.attempt-- default: diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 1ddff4b7d7ca7..31778052f77e1 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -4,6 +4,7 @@ package utils_test import ( "context" + "io" "testing" "time" @@ -101,13 +102,16 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() + if counter == 2 { + return io.EOF + } return gRPCError }, backoffer) require.Equal(t, 16, counter) require.Equal(t, []error{ gRPCError, gRPCError, - gRPCError, + io.EOF, gRPCError, gRPCError, gRPCError,