Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix checkpoint cannot advance after pause->stop->start (#53091) #53166

Open
wants to merge 6 commits into
base: release-7.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 27,
shard_count = 28,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
Expand All @@ -449,11 +450,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
c.isPaused.Store(true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
c.isPaused.Store(false)
}
case EventErr:
return e.Err
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamhelper_test
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -542,6 +543,34 @@ func TestCheckPointResume(t *testing.T) {
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
}

func TestOwnershipLost(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 10240)...)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,22 @@ func (t *testEnv) unregisterTask() {
}
}

func (t *testEnv) putTask() {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
t.taskCh <- tsk
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down