Skip to content

Commit

Permalink
This is an automated cherry-pick of #53091
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
RidRisR authored and ti-chi-bot committed May 21, 2024
1 parent 7ece220 commit ebbba00
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 0 deletions.
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 22,
=======
shard_count = 28,
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
Expand All @@ -446,6 +447,17 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
<<<<<<< HEAD

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected <<, expected case or default or }

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected <<, expected case or default or }

Check failure on line 450 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected <<, expected case or default or }
=======
case EventPause:

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected case, expected :

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected case, expected :

Check failure on line 452 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected case, expected :
if c.task.GetName() == e.Name {
c.isPaused.Store(true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.Store(false)
}
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

invalid character U+0023 '#'

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

invalid character U+0023 '#'

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

syntax error: unexpected >>, expected case or default or }

Check failure on line 460 in br/pkg/streamhelper/advancer.go

View workflow job for this annotation

GitHub Actions / Compile for ubuntu-latest

invalid character U+0023 '#'
case EventErr:
return e.Err
}
Expand Down
161 changes: 161 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package streamhelper_test
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -463,3 +464,163 @@ func TestRemoveTaskAndFlush(t *testing.T) {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
<<<<<<< HEAD
=======

func TestEnableCheckPointLimit(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
}
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
}

func TestCheckPointResume(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//now the checkpoint issue is fixed and resumed
c.advanceCheckpointBy(1 * time.Minute)
env.ResumeTask(ctx)
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//with time passed, the checkpoint will exceed the limit again
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.unregisterTask()
env.putTask()
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
}

func TestOwnershipLost(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 10240)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
require.NoError(t, adv.OnTick(ctx))
c.advanceCheckpoints()
c.flushAll()
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "pause")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/FlushSubscriber.Clear.timeoutMs", "return(500)")
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))
cancel()
failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend")
wg.Wait()
}

func TestSubscriptionPanic(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 20)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))

require.NoError(t, adv.OnTick(ctx))
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "5*panic")
ckpt := c.advanceCheckpoints()
c.flushAll()
cnt := 0
for {
require.NoError(t, adv.OnTick(ctx))
cnt++
if env.checkpoint >= ckpt {
break
}
if cnt > 100 {
t.Fatalf("After 100 times, the progress cannot be advanced.")
}
}
cancel()
wg.Wait()
}
>>>>>>> 184c76b9162 (br: fix checkpoint cannot advance after pause->stop->start (#53091))
16 changes: 16 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,22 @@ func (t *testEnv) unregisterTask() {
}
}

func (t *testEnv) putTask() {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
t.taskCh <- tsk
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down

0 comments on commit ebbba00

Please sign in to comment.