diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 1eaf87efc997..a823c1059b94 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -69,7 +69,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 27, + shard_count = 28, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 0ed0c79b70d0..47e075967f77 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -434,6 +434,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error case EventDel: utils.LogBackupTaskCountDec() c.task = nil + c.isPaused.Store(false) c.taskRange = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. // Do the null check because some of test cases won't equip the advancer with subscriber. @@ -450,11 +451,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error metrics.LastCheckpoint.DeleteLabelValues(e.Name) case EventPause: if c.task.GetName() == e.Name { - c.isPaused.CompareAndSwap(false, true) + c.isPaused.Store(true) } case EventResume: if c.task.GetName() == e.Name { - c.isPaused.CompareAndSwap(true, false) + c.isPaused.Store(false) } case EventErr: return e.Err diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index f8889c4f4e8a..88971c7962de 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -5,6 +5,7 @@ package streamhelper_test import ( "context" "fmt" + "strings" "sync" "testing" "time" @@ -541,6 +542,34 @@ func TestCheckPointResume(t *testing.T) { require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") } +func TestUnregisterAfterPause(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx, "whole") + time.Sleep(1 * time.Second) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.unregisterTask() + env.putTask() + require.Eventually(t, func() bool { + err := adv.OnTick(ctx) + return err != nil && strings.Contains(err.Error(), "check point lagged too large") + }, 5*time.Second, 300*time.Millisecond) +} + func TestOwnershipLost(t *testing.T) { c := createFakeCluster(t, 4, false) c.splitAndScatter(manyRegions(0, 10240)...) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 82cdb709f11c..00e4ed324dea 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -717,6 +717,22 @@ func (t *testEnv) unregisterTask() { } } +func (t *testEnv) putTask() { + rngs := t.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + tsk := streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + }, + Ranges: rngs, + } + t.taskCh <- tsk +} + func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { for _, r := range t.regions { if len(r.locks) != 0 {