Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/streamhelper: added timeout for ticking #39625

Merged
merged 9 commits into from
Dec 7, 2022
1 change: 1 addition & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) {
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")
marked := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// blocking the thread.
// this may happen when TiKV goes down or too busy.
<-(chan struct{})(nil)
return nil
}
s.clientMu.Unlock()
marked = true
}
req.True(marked, "failed to mark the cluster: ")
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(c *config.Config) {
// ... So the tick timeout would be 100ms
c.TickDuration = 10 * time.Millisecond
})
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.ErrorIs(errors.Cause(err), context.DeadlineExceeded)
}
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type fakeStore struct {
id uint64
regions map[uint64]*region

clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
}

type fakeCluster struct {
Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) {
}

func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
if f.onGetRegionCheckpoint != nil {
err := f.onGetRegionCheckpoint(in)
if err != nil {
return nil, err
}
}
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
}