Skip to content

Commit

Permalink
lightning: retry for leader change error when GetTS (pingcap#44478)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Jul 12, 2023
1 parent 12c1cf2 commit e8df3d5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Expand Up @@ -163,6 +163,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//errs",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
27 changes: 24 additions & 3 deletions br/pkg/lightning/backend/local/checksum.go
Expand Up @@ -328,12 +328,33 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
return nil, err
}

var retryGetTSInterval = time.Second

// Checksum implements the ChecksumManager interface.
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
var (
physicalTS, logicalTS int64
err error
retryTime int
)
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
for err != nil {
if !pd.IsLeaderChange(err) {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
retryTime++
if retryTime%60 == 0 {
log.FromContext(ctx).Warn("fetch tso from pd failed and retrying",
zap.Int("retryTime", retryTime),
zap.Error(err))
}
select {
case <-ctx.Done():
err = ctx.Err()
case <-time.After(retryGetTSInterval):
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
}
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/lightning/backend/local/checksum_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -198,6 +199,18 @@ func TestDoChecksumWithTikv(t *testing.T) {
require.Zero(t, checksumExec.manager.currentTS)
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
}

// test PD leader change error
backup := retryGetTSInterval
retryGetTSInterval = time.Millisecond
t.Cleanup(func() {
retryGetTSInterval = backup
})
pdClient.leaderChanging = true
kvClient.maxErrCount = 0
checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
_, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
require.NoError(t, err)
}

func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
Expand Down Expand Up @@ -266,6 +279,7 @@ type testPDClient struct {
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
leaderChanging bool
}

func (c *testPDClient) currentSafePoint() uint64 {
Expand All @@ -282,6 +296,9 @@ func (c *testPDClient) currentSafePoint() uint64 {

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
physicalTS := time.Now().UnixMilli()
if c.leaderChanging && physicalTS%2 == 0 {
return 0, 0, errs.ErrClientTSOStreamClosed
}
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}
Expand Down

0 comments on commit e8df3d5

Please sign in to comment.