Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix checkpoint cannot advance after pause->stop->start (#53091) #53450

Open
wants to merge 1 commit into
base: release-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 22,
=======
shard_count = 28,
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
Expand All @@ -446,6 +447,17 @@
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
<<<<<<< HEAD

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected <<, expected case or default or }

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected <<, expected case or default or }

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected <<, expected case or default or }
=======
case EventPause:

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected case, expected :

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected case, expected :

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected case, expected :
if c.task.GetName() == e.Name {
c.isPaused.Store(true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.Store(false)
}
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

invalid character U+0023 '#'

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

invalid character U+0023 '#'

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

invalid character U+0023 '#'
case EventErr:
return e.Err
}
Expand Down
161 changes: 161 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamhelper_test
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -463,3 +464,163 @@ func TestRemoveTaskAndFlush(t *testing.T) {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
<<<<<<< HEAD
=======

func TestEnableCheckPointLimit(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
}
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
}

func TestCheckPointResume(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//now the checkpoint issue is fixed and resumed
c.advanceCheckpointBy(1 * time.Minute)
env.ResumeTask(ctx)
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//with time passed, the checkpoint will exceed the limit again
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
}

func TestOwnershipLost(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 10240)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
require.NoError(t, adv.OnTick(ctx))
c.advanceCheckpoints()
c.flushAll()
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "pause")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/FlushSubscriber.Clear.timeoutMs", "return(500)")
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))
cancel()
failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend")
wg.Wait()
}

func TestSubscriptionPanic(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 20)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))

require.NoError(t, adv.OnTick(ctx))
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "5*panic")
ckpt := c.advanceCheckpoints()
c.flushAll()
cnt := 0
for {
require.NoError(t, adv.OnTick(ctx))
cnt++
if env.checkpoint >= ckpt {
break
}
if cnt > 100 {
t.Fatalf("After 100 times, the progress cannot be advanced.")
}
}
cancel()
wg.Wait()
}
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))
16 changes: 16 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,22 @@ func (t *testEnv) unregisterTask() {
}
}

func (t *testEnv) putTask() {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
t.taskCh <- tsk
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down
Loading