Skip to content

Commit

Permalink
br: make more sane variables of ticking timeout (#39887)
Browse files Browse the repository at this point in the history
close #39620
  • Loading branch information
YuJuncen committed Dec 14, 2022
1 parent 565128e commit dd42b72
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 34 deletions.
87 changes: 61 additions & 26 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -285,14 +286,18 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
c.subscriber.Clear()
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand All @@ -303,31 +308,34 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
return false
}
if cp <= c.lastCheckpoint {
return false
}
c.lastCheckpoint = cp
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
if cp <= c.lastCheckpoint {
return nil
}

log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
if c.setCheckpoint(cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
return nil
}

Expand Down Expand Up @@ -375,16 +383,17 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
Expand All @@ -397,6 +406,32 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
if err != nil {
return err
}

return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}

var errs error

cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
err := c.optionalTick(cx)
if err != nil {
log.Warn("[log backup advancer] option tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

err = c.importantTick(ctx)
if err != nil {
log.Warn("[log backup advancer] important tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

return errs
}
38 changes: 37 additions & 1 deletion br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ package streamhelper
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strings"

"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type EventType int
Expand Down Expand Up @@ -181,11 +185,43 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
return 0, err
}

if len(resp.Kvs) == 0 {
return 0, nil
}

firstKV := resp.Kvs[0]
value := firstKV.Value
if len(value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the global checkpoint isn't 64bits (it is %d bytes, value = %s)",
len(value),
redact.Key(value))
}

return binary.BigEndian.Uint64(value), nil
}

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := t.KV.Put(ctx, key, value)
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}

if checkpoint < oldValue {
log.Warn("[log backup advancer] skipping upload global checkpoint", zap.Uint64("old", oldValue), zap.Uint64("new", checkpoint))
return nil
}

_, err = t.KV.Put(ctx, key, value)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
flagTryAdvanceThreshold = "try-advance-threshold"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 9 * time.Minute
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
Expand Down Expand Up @@ -76,11 +76,18 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
// GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint
// when the subscriber meets error.
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
// 0.45x of the origin threshold.
// The origin threshold is 0.8x the target RPO,
// and the default flush interval is about 0.5x the target RPO.
// So the relationship between the RPO and the threshold is:
// When subscription is all available, it is 1.7x of the flush interval (which allow us to save in abnormal condition).
// When some of subscriptions are not available, it is 0.75x of the flush interval.
// NOTE: can we make subscription better and give up the poll model?
return conf.TryAdvanceThreshold * 9 / 20
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
// If a tick blocks longer than the interval of ticking, we may need to break it and retry.
return conf.TickDuration
}
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,12 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
metaCli.ClearV3GlobalCheckpointForTask(ctx, task)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
}

0 comments on commit dd42b72

Please sign in to comment.