Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: make more sane variables of ticking timeout #39887

Merged
merged 5 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 61 additions & 26 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -285,14 +286,18 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
c.subscriber.Clear()
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand All @@ -303,31 +308,34 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
return false
}
if cp <= c.lastCheckpoint {
return false
}
c.lastCheckpoint = cp
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
if cp <= c.lastCheckpoint {
return nil
}

log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
if c.setCheckpoint(cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
return nil
}

Expand Down Expand Up @@ -375,16 +383,17 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
Expand All @@ -397,6 +406,32 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
if err != nil {
return err
}

return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}

var errs error

cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
err := c.optionalTick(cx)
if err != nil {
log.Warn("[log backup advancer] option tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

err = c.importantTick(ctx)
if err != nil {
log.Warn("[log backup advancer] important tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

return errs
}
38 changes: 37 additions & 1 deletion br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ package streamhelper
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type EventType int
Expand Down Expand Up @@ -181,11 +185,43 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
return 0, err
}

if len(resp.Kvs) == 0 {
return 0, nil
}

firstKV := resp.Kvs[0]
value := firstKV.Value
if len(value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the global checkpoint isn't 64bits (it is %d bytes, value = %s)",
len(value),
redact.Key(value))
}

return binary.BigEndian.Uint64(value), nil
}

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := t.KV.Put(ctx, key, value)
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}

if checkpoint < oldValue {
log.Warn("[log backup advancer] skipping upload global checkpoint", zap.Uint64("old", oldValue), zap.Uint64("new", checkpoint))
return nil
}

_, err = t.KV.Put(ctx, key, value)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
flagTryAdvanceThreshold = "try-advance-threshold"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 9 * time.Minute
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
Expand Down Expand Up @@ -76,11 +76,18 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
// GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint
// when the subscriber meets error.
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
// 0.45x of the origin threshold.
// The origin threshold is 0.8x the target RPO,
// and the default flush interval is about 0.5x the target RPO.
// So the relationship between the RPO and the threshold is:
// When subscription is all available, it is 1.7x of the flush interval (which allow us to save in abnormal condition).
// When some of subscriptions are not available, it is 0.75x of the flush interval.
// NOTE: can we make subscription better and give up the poll model?
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
return conf.TryAdvanceThreshold * 9 / 20
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
// If a tick blocks longer than the interval of ticking, we may need to break it and retry.
return conf.TickDuration
}
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,12 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
metaCli.ClearV3GlobalCheckpointForTask(ctx, task)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
}