Skip to content

Commit

Permalink
br: add more retryable error for retryer (#43022) (#43611)
Browse files Browse the repository at this point in the history
close #42909
  • Loading branch information
ti-chi-bot committed Jun 26, 2023
1 parent f5b4ba5 commit 464dc82
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 4 deletions.
3 changes: 3 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"@com_github_klauspost_compress//snappy",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
Expand Down Expand Up @@ -70,6 +71,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 43,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand All @@ -80,6 +82,7 @@ go_test(
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_mock//gomock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
],
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -943,7 +944,21 @@ func isDeadlineExceedError(err error) bool {
return strings.Contains(err.Error(), "context deadline exceeded")
}

func isConnectionResetError(err error) bool {
return strings.Contains(err.Error(), "read: connection reset")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
// for unit test
failpoint.Inject("replace-error-to-connection-reset-by-peer", func(_ failpoint.Value) {
log.Info("original error", zap.Error(r.Error))
if r.Error != nil {
r.Error = errors.New("read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer")
}
})
if isConnectionResetError(r.Error) {
return true
}
if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress {
// fast fail for unreachable linklocal address in EC2 containers.
log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error))
Expand Down
59 changes: 58 additions & 1 deletion br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/mock/gomock"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/mock"
. "github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -285,6 +287,8 @@ func TestS3Storage(t *testing.T) {
sendCredential bool
}

require.NoError(t, os.Setenv("AWS_ACCESS_KEY_ID", "ab"))
require.NoError(t, os.Setenv("AWS_SECRET_ACCESS_KEY", "cd"))
s := createGetBucketRegionServer("us-west-2", 200, true)
defer s.Close()

Expand Down Expand Up @@ -407,7 +411,15 @@ func TestS3Storage(t *testing.T) {
}

func TestS3URI(t *testing.T) {
backend, err := ParseBackend("s3://bucket/prefix/", nil)
accessKey := "ab"
secretAccessKey := "cd"
options := &BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := ParseBackend("s3://bucket/prefix/", options)
require.NoError(t, err)
storage, err := New(context.Background(), backend, &ExternalStorageOptions{})
require.NoError(t, err)
Expand Down Expand Up @@ -1193,3 +1205,48 @@ func TestObjectLock(t *testing.T) {
)
require.Equal(t, true, s.storage.IsObjectLockEnabled())
}

func TestRetryError(t *testing.T) {
var count int32 = 0
var errString string = "read tcp *.*.*.*:*->*.*.*.*:*: read: connection reset by peer"
var lock sync.Mutex
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "PUT" {
var curCnt int32
t.Log(r.URL)
lock.Lock()
count += 1
curCnt = count
lock.Unlock()
if curCnt < 2 {
// write an cannot-retry error, but we modify the error to specific error, so client would retry.
w.WriteHeader(403)
return
}
}

w.WriteHeader(200)
}))

defer server.Close()
t.Log(server.URL)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer", "return(true)"))
defer func() {
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/replace-error-to-connection-reset-by-peer")
}()

ctx := context.Background()
s, err := NewS3Storage(&backuppb.S3{
Endpoint: server.URL,
Bucket: "test",
Prefix: "retry",
AccessKey: "none",
SecretAccessKey: "none",
ForcePathStyle: true,
}, &ExternalStorageOptions{})
require.NoError(t, err)
err = s.WriteFile(ctx, "reset", []byte(errString))
require.NoError(t, err)
require.Equal(t, count, int32(2))
}
4 changes: 2 additions & 2 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
// bo.attempt--
e := errors.Cause(err)
switch e { // nolint:errorlint
case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows:
case nil, context.Canceled, context.DeadlineExceeded, sql.ErrNoRows:
// Excepted error, finish the operation
bo.delayTime = 0
bo.attempt = 0
case berrors.ErrRestoreTotalKVMismatch:
case berrors.ErrRestoreTotalKVMismatch, io.EOF:
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils_test

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -101,13 +102,16 @@ func TestPdBackoffWithRetryableError(t *testing.T) {
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
if counter == 2 {
return io.EOF
}
return gRPCError
}, backoffer)
require.Equal(t, 16, counter)
require.Equal(t, []error{
gRPCError,
gRPCError,
gRPCError,
io.EOF,
gRPCError,
gRPCError,
gRPCError,
Expand Down

0 comments on commit 464dc82

Please sign in to comment.