From 132c6dfe3fa2934009ad75406aafb1073db64697 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Wed, 8 May 2024 04:58:20 +0800 Subject: [PATCH 1/6] fix --- br/pkg/streamhelper/advancer.go | 5 ++-- br/pkg/streamhelper/advancer_test.go | 28 +++++++++++++++++++++++ br/pkg/streamhelper/basic_lib_for_test.go | 16 +++++++++++++ 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 0ed0c79b70d01..47e075967f77c 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -434,6 +434,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error case EventDel: utils.LogBackupTaskCountDec() c.task = nil + c.isPaused.Store(false) c.taskRange = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. // Do the null check because some of test cases won't equip the advancer with subscriber. @@ -450,11 +451,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error metrics.LastCheckpoint.DeleteLabelValues(e.Name) case EventPause: if c.task.GetName() == e.Name { - c.isPaused.CompareAndSwap(false, true) + c.isPaused.Store(true) } case EventResume: if c.task.GetName() == e.Name { - c.isPaused.CompareAndSwap(true, false) + c.isPaused.Store(false) } case EventErr: return e.Err diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index f8889c4f4e8a2..0dc5a56626843 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -541,6 +541,34 @@ func TestCheckPointResume(t *testing.T) { require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") } +func TestUnregisterAfterPause(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx,"whole") + time.Sleep(1 * time.Second) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + env.unregisterTask() + env.putTask() + time.Sleep(1 * time.Second) + require.Eventually(t, func() bool { + return assert.ErrorContains(t, adv.OnTick(ctx), "check point lagged too large") + }, 5*time.Second, 100*time.Millisecond) +} + func TestOwnershipLost(t *testing.T) { c := createFakeCluster(t, 4, false) c.splitAndScatter(manyRegions(0, 10240)...) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 82cdb709f11ca..00e4ed324deab 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -717,6 +717,22 @@ func (t *testEnv) unregisterTask() { } } +func (t *testEnv) putTask() { + rngs := t.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + tsk := streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + }, + Ranges: rngs, + } + t.taskCh <- tsk +} + func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { for _, r := range t.regions { if len(r.locks) != 0 { From a2b0d8175c4a8bc0df1ac9c1eceeb5ab6907eafd Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Wed, 8 May 2024 05:05:13 +0800 Subject: [PATCH 2/6] simplify --- br/pkg/streamhelper/advancer_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 0dc5a56626843..a7fc37090f158 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -563,10 +563,9 @@ func TestUnregisterAfterPause(t *testing.T) { require.NoError(t, adv.OnTick(ctx)) env.unregisterTask() env.putTask() - time.Sleep(1 * time.Second) require.Eventually(t, func() bool { return assert.ErrorContains(t, adv.OnTick(ctx), "check point lagged too large") - }, 5*time.Second, 100*time.Millisecond) + }, 5*time.Second, 300*time.Millisecond) } func TestOwnershipLost(t *testing.T) { From 991580e7d4c78387be5e400225c741132b9f796a Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Wed, 8 May 2024 05:06:29 +0800 Subject: [PATCH 3/6] lint --- br/pkg/streamhelper/advancer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index a7fc37090f158..9655e44efb818 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -557,7 +557,7 @@ func TestUnregisterAfterPause(t *testing.T) { adv.StartTaskListener(ctx) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) - env.PauseTask(ctx,"whole") + env.PauseTask(ctx, "whole") time.Sleep(1 * time.Second) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) From ea021dc2782509c6551100591352047e3ad2d5d8 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Wed, 8 May 2024 05:16:33 +0800 Subject: [PATCH 4/6] bazel fix --- br/pkg/streamhelper/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 1eaf87efc9978..a823c1059b94c 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -69,7 +69,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 27, + shard_count = 28, deps = [ ":streamhelper", "//br/pkg/errors", From d8d34e3ca60a55b35795357b9d2300a45aaa96bb Mon Sep 17 00:00:00 2001 From: ris <79858083+RidRisR@users.noreply.github.com> Date: Tue, 7 May 2024 21:09:55 -0700 Subject: [PATCH 5/6] Update br/pkg/streamhelper/advancer_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> --- br/pkg/streamhelper/advancer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 9655e44efb818..81215c9c74068 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -564,7 +564,8 @@ func TestUnregisterAfterPause(t *testing.T) { env.unregisterTask() env.putTask() require.Eventually(t, func() bool { - return assert.ErrorContains(t, adv.OnTick(ctx), "check point lagged too large") + err := adv.OnTick(ctx) + return err != nil && strings.Contains(err.Error(), "check point lagged too large") }, 5*time.Second, 300*time.Millisecond) } From 307e8c92a91497573b1b984c9c8cdbd3f63efe38 Mon Sep 17 00:00:00 2001 From: RidRisR <79858083+RidRisR@users.noreply.github.com> Date: Wed, 8 May 2024 22:18:56 +0800 Subject: [PATCH 6/6] fix --- br/pkg/streamhelper/advancer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 81215c9c74068..88971c7962de6 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -5,6 +5,7 @@ package streamhelper_test import ( "context" "fmt" + "strings" "sync" "testing" "time"