Skip to content

Commit

Permalink
br: fix checkpoint cannot advance after pause->stop->start (#53091) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed May 10, 2024
1 parent 48ffd75 commit fd5e1e1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 25,
shard_count = 26,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
Expand All @@ -450,11 +451,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
c.isPaused.Store(true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
c.isPaused.Store(false)
}
case EventErr:
return e.Err
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamhelper_test
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -540,3 +541,31 @@ func TestCheckPointResume(t *testing.T) {
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
}
16 changes: 16 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,22 @@ func (t *testEnv) unregisterTask() {
}
}

func (t *testEnv) putTask() {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
t.taskCh <- tsk
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down

0 comments on commit fd5e1e1

Please sign in to comment.