From cf1d56aed5ce56fb3eb47c242919cdab0e7fb733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 16 Apr 2021 12:13:51 +0800 Subject: [PATCH] cherry pick #1015 to release-4.0 Signed-off-by: ti-srebot --- pkg/logutil/logging.go | 22 ++++++++++++++++++++++ pkg/task/backup.go | 21 +++++++++++++++++++++ pkg/task/common.go | 4 +++- 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pkg/logutil/logging.go b/pkg/logutil/logging.go index 1bd5467d3..279926e80 100644 --- a/pkg/logutil/logging.go +++ b/pkg/logutil/logging.go @@ -11,6 +11,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -186,3 +187,24 @@ func Keys(keys [][]byte) zap.Field { func ShortError(err error) zap.Field { return zap.String("error", err.Error()) } +<<<<<<< HEAD +======= + +var loggerToTerm, _, _ = log.InitLogger(new(log.Config), zap.AddCallerSkip(1)) + +// WarnTerm put a log both to terminal and to the log file. +func WarnTerm(message string, fields ...zap.Field) { + log.Warn(message, fields...) + if loggerToTerm != nil { + loggerToTerm.Warn(message, fields...) + } +} + +// RedactAny constructs a redacted field that carries an interface{}. +func RedactAny(fieldKey string, key interface{}) zap.Field { + if redact.NeedRedact() { + return zap.String(fieldKey, "?") + } + return zap.Any(fieldKey, key) +} +>>>>>>> 0576f071... backup: set concurrency to 1 when ratelimit enabled (#1015) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index b86172c6c..58f8c1a92 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -8,8 +8,13 @@ import ( "strings" "time" +<<<<<<< HEAD "github.com/pingcap/br/pkg/utils" +======= + "github.com/docker/go-units" + "github.com/opentracing/opentracing-go" +>>>>>>> 0576f071... backup: set concurrency to 1 when ratelimit enabled (#1015) "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" @@ -25,6 +30,7 @@ import ( "github.com/pingcap/br/pkg/backup" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/logutil" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" ) @@ -164,12 +170,27 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) { // so that both binary and TiDB will use same default value. func (cfg *BackupConfig) adjustBackupConfig() { cfg.adjust() + usingDefaultConcurrency := false if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultBackupConcurrency + usingDefaultConcurrency = true } if cfg.Config.Concurrency > maxBackupConcurrency { cfg.Config.Concurrency = maxBackupConcurrency } + if cfg.RateLimit != unlimited { + // TiKV limits the upload rate by each backup request. + // When the backup requests are sent concurrently, + // the ratelimit couldn't work as intended. + // Degenerating to sequentially sending backup requests to avoid this. + if !usingDefaultConcurrency { + logutil.WarnTerm("setting `--ratelimit` and `--concurrency` at the same time, "+ + "ignoring `--concurrency`: `--ratelimit` forces sequential (i.e. concurrency = 1) backup", + zap.String("ratelimit", units.HumanSize(float64(cfg.RateLimit))+"/s"), + zap.Uint32("concurrency-specified", cfg.Config.Concurrency)) + } + cfg.Config.Concurrency = 1 + } if cfg.GCTTL == 0 { cfg.GCTTL = utils.DefaultBRGCSafePointTTL diff --git a/pkg/task/common.go b/pkg/task/common.go index a575bfe41..fa73483d9 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -68,6 +68,8 @@ const ( defaultSwitchInterval = 5 * time.Minute defaultGRPCKeepaliveTime = 10 * time.Second defaultGRPCKeepaliveTimeout = 3 * time.Second + + unlimited = 0 ) // TLSConfig is the common configuration for TLS connection. @@ -143,7 +145,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of table checksumming") _ = flags.MarkHidden(flagChecksumConcurrency) - flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node") + flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node") flags.Bool(flagChecksum, true, "Run checksum at end of task") flags.Bool(flagRemoveTiFlash, true, "Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")