diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index c8e6c2b45c738..469d8021b748a 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,11 @@ go_test( ], flaky = True, race = "on", +<<<<<<< HEAD shard_count = 22, +======= + shard_count = 28, +>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091)) deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index a172d9d0cf49a..784ed735bd7c0 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -432,6 +432,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error case EventDel: utils.LogBackupTaskCountDec() c.task = nil + c.isPaused.Store(false) c.taskRange = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. // Do the null check because some of test cases won't equip the advancer with subscriber. @@ -446,6 +447,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) +<<<<<<< HEAD +======= + case EventPause: + if c.task.GetName() == e.Name { + c.isPaused.Store(true) + } + case EventResume: + if c.task.GetName() == e.Name { + c.isPaused.Store(false) + } +>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091)) case EventErr: return e.Err } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 1cd41dd0daa87..7b7d83e05cc1a 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -5,6 +5,7 @@ package streamhelper_test import ( "context" "fmt" + "strings" "sync" "testing" "time" @@ -463,3 +464,163 @@ func TestRemoveTaskAndFlush(t *testing.T) { return !adv.HasSubscribion() }, 10*time.Second, 100*time.Millisecond) } +<<<<<<< HEAD +======= + +func TestEnableCheckPointLimit(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + for i := 0; i < 5; i++ { + c.advanceClusterTimeBy(30 * time.Second) + c.advanceCheckpointBy(20 * time.Second) + require.NoError(t, adv.OnTick(ctx)) + } +} + +func TestCheckPointLagged(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + c.advanceClusterTimeBy(1 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") + // after some times, the isPaused will be set and ticks are skipped + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) +} + +func TestCheckPointResume(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + c.advanceClusterTimeBy(1 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) + //now the checkpoint issue is fixed and resumed + c.advanceCheckpointBy(1 * time.Minute) + env.ResumeTask(ctx) + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) + //with time passed, the checkpoint will exceed the limit again + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") +} + +func TestUnregisterAfterPause(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx, "whole") + time.Sleep(1 * time.Second) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.unregisterTask() + env.putTask() + require.Eventually(t, func() bool { + err := adv.OnTick(ctx) + return err != nil && strings.Contains(err.Error(), "check point lagged too large") + }, 5*time.Second, 300*time.Millisecond) +} + +func TestOwnershipLost(t *testing.T) { + c := createFakeCluster(t, 4, false) + c.splitAndScatter(manyRegions(0, 10240)...) + installSubscribeSupport(c) + ctx, cancel := context.WithCancel(context.Background()) + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.OnStart(ctx) + adv.OnBecomeOwner(ctx) + require.NoError(t, adv.OnTick(ctx)) + c.advanceCheckpoints() + c.flushAll() + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "pause") + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/FlushSubscriber.Clear.timeoutMs", "return(500)") + wg := new(sync.WaitGroup) + wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done)) + cancel() + failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend") + wg.Wait() +} + +func TestSubscriptionPanic(t *testing.T) { + c := createFakeCluster(t, 4, false) + c.splitAndScatter(manyRegions(0, 20)...) + installSubscribeSupport(c) + ctx, cancel := context.WithCancel(context.Background()) + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.OnStart(ctx) + adv.OnBecomeOwner(ctx) + wg := new(sync.WaitGroup) + wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done)) + + require.NoError(t, adv.OnTick(ctx)) + failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "5*panic") + ckpt := c.advanceCheckpoints() + c.flushAll() + cnt := 0 + for { + require.NoError(t, adv.OnTick(ctx)) + cnt++ + if env.checkpoint >= ckpt { + break + } + if cnt > 100 { + t.Fatalf("After 100 times, the progress cannot be advanced.") + } + } + cancel() + wg.Wait() +} +>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091)) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 70fa53d6b8e23..5bcae9262c64a 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -667,6 +667,22 @@ func (t *testEnv) unregisterTask() { } } +func (t *testEnv) putTask() { + rngs := t.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + tsk := streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + }, + Ranges: rngs, + } + t.taskCh <- tsk +} + func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { for _, r := range t.regions { if len(r.locks) != 0 {