Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix checkpoint cannot advance after pause->stop->start #53091

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 27,
shard_count = 28,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
Expand All @@ -450,11 +451,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
c.isPaused.Store(true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
c.isPaused.Store(false)
}
case EventErr:
return e.Err
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,33 @@ func TestCheckPointResume(t *testing.T) {
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
return assert.ErrorContains(t, adv.OnTick(ctx), "check point lagged too large")
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
}, 5*time.Second, 300*time.Millisecond)
}

func TestOwnershipLost(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 10240)...)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,22 @@ func (t *testEnv) unregisterTask() {
}
}

func (t *testEnv) putTask() {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
t.taskCh <- tsk
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down
Loading