Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into runtime-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan committed May 29, 2022
2 parents 71f1d30 + 7f023bd commit e4e8b22
Show file tree
Hide file tree
Showing 105 changed files with 5,973 additions and 4,674 deletions.
1 change: 1 addition & 0 deletions br/cmd/br/restore.go
Expand Up @@ -160,5 +160,6 @@ func newStreamRestoreCommand() *cobra.Command {
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
task.DefineStreamRestoreFlags(command)
command.Hidden = true
return command
}
1 change: 1 addition & 0 deletions br/cmd/br/stream.go
Expand Up @@ -55,6 +55,7 @@ func NewStreamCommand() *cobra.Command {
command.Root().HelpFunc()(command, strings)
})

command.Hidden = true
return command
}

Expand Down
12 changes: 4 additions & 8 deletions br/pkg/backup/client.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -480,20 +481,15 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
if err != nil {
return errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
allJobs, err := ddl.GetAllDDLJobs(snapMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
if err != nil {
return errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
historyJobs, err := ddl.GetAllHistoryDDLJobs(snapMeta)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/backup/push.go
Expand Up @@ -127,6 +127,13 @@ func (push *pushDown) pushBackup(
// Finished.
return res, nil
}
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Expand Up @@ -24,6 +24,7 @@ var retryableServerError = []string{
"error during dispatch",
"put object timeout",
"internalerror",
"not read from or written to within the timeout period",
}

// RetryableFunc presents a retryable operation.
Expand Down
15 changes: 12 additions & 3 deletions br/tests/br_full/run.sh
Expand Up @@ -46,14 +46,23 @@ if ps -q $pid ; then
exit 1
fi


# backup full
echo "backup with lz4 start..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4
test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"not read from or written to within the timeout period\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')

if ! grep -i "$error_str" $test_log; then
echo "${error_str} not found in log"
echo "TEST: [$TEST_NAME] test restore failed!"
exit 1
fi

echo "backup with zstd start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --concurrency 4 --compression zstd --compression-level 6
size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}')
Expand Down
149 changes: 120 additions & 29 deletions config/config.go
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/user"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
"github.com/pingcap/tidb/parser/terror"
typejson "github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/tikvutil"
"github.com/pingcap/tidb/util/versioninfo"
Expand Down Expand Up @@ -115,8 +117,6 @@ var (
map[string]string{
"check-mb4-value-in-utf8": "tidb_check_mb4_value_in_utf8",
"enable-collect-execution-info": "tidb_enable_collect_execution_info",
"plugin.load": "plugin_load",
"plugin.dir": "plugin_dir",
},
},
{
Expand All @@ -134,6 +134,13 @@ var (
"memory-usage-alarm-ratio": "tidb_memory_usage_alarm_ratio",
},
},
{
"plugin",
map[string]string{
"load": "plugin_load",
"dir": "plugin_dir",
},
},
}

// ConflictOptions indicates the conflict config options existing in both [instance] and other sections in config file.
Expand Down Expand Up @@ -761,6 +768,8 @@ var defaultConf = Config{
OOMUseTmpStorage: true,
TempStorageQuota: -1,
TempStoragePath: tempStorageDirName,
MemQuotaQuery: 1 << 30,
OOMAction: "cancel",
EnableBatchDML: false,
CheckMb4ValueInUTF8: *NewAtomicBool(true),
MaxIndexLength: 3072,
Expand Down Expand Up @@ -791,6 +800,7 @@ var defaultConf = Config{
EnableErrorStack: nbUnset, // If both options are nbUnset, getDisableErrorStack() returns true
EnableTimestamp: nbUnset,
DisableTimestamp: nbUnset, // If both options are nbUnset, getDisableTimestamp() returns false
QueryLogMaxLen: logutil.DefaultQueryLogMaxLen,
RecordPlanInSlowLog: logutil.DefaultRecordPlanInSlowLog,
EnableSlowLog: *NewAtomicBool(logutil.DefaultTiDBEnableSlowLog),
},
Expand Down Expand Up @@ -839,6 +849,7 @@ var defaultConf = Config{
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
ProjectionPushDown: false,
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand All @@ -848,14 +859,15 @@ var defaultConf = Config{
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
HeaderTimeout: 5,
},
PreparedPlanCache: PreparedPlanCache{
Enabled: false,
Capacity: 1000,
Enabled: true,
Capacity: 100,
MemoryGuardRatio: 0.1,
},
OpenTracing: OpenTracing{
Expand Down Expand Up @@ -925,7 +937,10 @@ func StoreGlobalConfig(config *Config) {
tikvcfg.StoreGlobalConfig(&cfg)
}

var deprecatedConfig = map[string]struct{}{
// removedConfig contains items that are no longer supported.
// they might still be in the config struct to support import,
// but are not actively used.
var removedConfig = map[string]struct{}{
"pessimistic-txn.ttl": {},
"pessimistic-txn.enable": {},
"log.file.log-rotate": {},
Expand All @@ -937,8 +952,6 @@ var deprecatedConfig = map[string]struct{}{
"max-txn-time-use": {},
"experimental.allow-auto-random": {},
"enable-redact-log": {}, // use variable tidb_redact_log instead
"tikv-client.copr-cache.enable": {},
"alter-primary-key": {}, // use NONCLUSTERED keyword instead
"enable-streaming": {},
"performance.mem-profile-interval": {},
"security.require-secure-transport": {},
Expand All @@ -960,11 +973,14 @@ var deprecatedConfig = map[string]struct{}{
"prepared-plan-cache.enabled": {},
"prepared-plan-cache.capacity": {},
"prepared-plan-cache.memory-guard-ratio": {},
"oom-action": {},
}

func isAllDeprecatedConfigItems(items []string) bool {
// isAllRemovedConfigItems returns true if all the items that couldn't validate
// belong to the list of removedConfig items.
func isAllRemovedConfigItems(items []string) bool {
for _, item := range items {
if _, ok := deprecatedConfig[item]; !ok {
if _, ok := removedConfig[item]; !ok {
return false
}
}
Expand All @@ -986,7 +1002,7 @@ func InitializeConfig(confPath string, configCheck, configStrict bool, enforceCm
// is not the default behavior of TiDB. The warning message must be deferred until
// logging has been set up. After strict config checking is the default behavior,
// This should all be removed.
if (!configCheck && !configStrict) || isAllDeprecatedConfigItems(tmp.UndecodedItems) {
if (!configCheck && !configStrict) || isAllRemovedConfigItems(tmp.UndecodedItems) {
fmt.Fprintln(os.Stderr, err.Error())
err = nil
}
Expand All @@ -995,6 +1011,15 @@ func InitializeConfig(confPath string, configCheck, configStrict bool, enforceCm
err = nil
}
}
// In configCheck we always print out which options in the config file
// have been removed. This helps users upgrade better.
if configCheck {
err = cfg.RemovedVariableCheck(confPath)
if err != nil {
logutil.BgLogger().Warn(err.Error())
err = nil // treat as warning
}
}

terror.MustNil(err)
} else {
Expand Down Expand Up @@ -1023,6 +1048,31 @@ func InitializeConfig(confPath string, configCheck, configStrict bool, enforceCm
StoreGlobalConfig(cfg)
}

// RemovedVariableCheck checks if the config file contains any items
// which have been removed. These will not take effect any more.
func (c *Config) RemovedVariableCheck(confFile string) error {
metaData, err := toml.DecodeFile(confFile, c)
if err != nil {
return err
}
var removed []string
for item := range removedConfig {
// We need to split the string to account for the top level
// and the section hierarchy of config.
tmp := strings.Split(item, ".")
if len(tmp) == 2 && metaData.IsDefined(tmp[0], tmp[1]) {
removed = append(removed, item)
} else if len(tmp) == 1 && metaData.IsDefined(tmp[0]) {
removed = append(removed, item)
}
}
if len(removed) > 0 {
sort.Strings(removed) // deterministic for tests
return fmt.Errorf("The following configuration options are no longer supported in this version of TiDB. Check the release notes for more information: %s", strings.Join(removed, ", "))
}
return nil
}

// Load loads config options from a toml file.
func (c *Config) Load(confFile string) error {
metaData, err := toml.DecodeFile(confFile, c)
Expand Down Expand Up @@ -1230,32 +1280,68 @@ func initByLDFlags(edition, checkBeforeDropLDFlag string) {

// hideConfig is used to filter a single line of config for hiding.
var hideConfig = []string{
"index-usage-sync-lease",
"performance.index-usage-sync-lease",
}

// HideConfig is used to filter the configs that needs to be hidden.
func HideConfig(s string) string {
configs := strings.Split(s, "\n")
hideMap := make([]bool, len(configs))
for i, c := range configs {
for _, hc := range hideConfig {
if strings.Contains(c, hc) {
hideMap[i] = true
break
}
}
// jsonifyPath converts the item to json path, so it can be extracted.
func jsonifyPath(str string) string {
s := strings.Split(str, ".")
return fmt.Sprintf("$.\"%s\"", strings.Join(s, "\".\""))
}

// GetJSONConfig returns the config as JSON with hidden items removed
// It replaces the earlier HideConfig() which used strings.Split() in
// an way that didn't work for similarly named items (like enable).
func GetJSONConfig() (string, error) {
j, err := json.Marshal(GetGlobalConfig())
if err != nil {
return "", err
}
jsonValue, err := typejson.ParseBinaryFromString(string(j))
if err != nil {
return "", err
}
var buf bytes.Buffer
for i, c := range configs {
if hideMap[i] {
// Approximately length of removed items + hidden items.
pathExprs := make([]typejson.PathExpression, 0, len(removedConfig)+len(hideConfig))
var pathExpr typejson.PathExpression

// Patch out removed items.
for removedItem := range removedConfig {
s := jsonifyPath(removedItem)
pathExpr, err = typejson.ParseJSONPathExpr(s)
if err != nil {
// Should not be reachable, but not worth bailing for.
// It just means we can't patch out this line.
continue
}
if i != 0 {
buf.WriteString("\n")
pathExprs = append(pathExprs, pathExpr)
}
// Patch out hidden items
for _, hiddenItem := range hideConfig {
s := jsonifyPath(hiddenItem)
pathExpr, err = typejson.ParseJSONPathExpr(s)
if err != nil {
// Should not be reachable, but not worth bailing for.
// It just means we can't patch out this line.
continue
}
buf.WriteString(c)
pathExprs = append(pathExprs, pathExpr)
}
newJSONValue, err := jsonValue.Remove(pathExprs)
if err != nil {
return "", err
}
// Convert back to GoJSON so it can be pretty formatted.
// This is expected for compatibility with previous versions.
buf, err := newJSONValue.MarshalJSON()
if err != nil {
return "", err
}
var resBuf bytes.Buffer
if err = json.Indent(&resBuf, buf, "", "\t"); err != nil {
return "", err
}
return buf.String()
return resBuf.String(), nil
}

// ContainHiddenConfig checks whether it contains the configuration that needs to be hidden.
Expand All @@ -1266,5 +1352,10 @@ func ContainHiddenConfig(s string) bool {
return true
}
}
for dc := range removedConfig {
if strings.Contains(s, dc) {
return true
}
}
return false
}
3 changes: 0 additions & 3 deletions config/config.toml.example
Expand Up @@ -357,9 +357,6 @@ max-batch-wait-time = 0
# Batch wait size, to avoid waiting too long.
batch-wait-size = 8

# Enable chunk encoded data for coprocessor requests.
enable-chunk-rpc = true

# If a Region has not been accessed for more than the given duration (in seconds), it
# will be reloaded from the PD.
region-cache-ttl = 600
Expand Down

0 comments on commit e4e8b22

Please sign in to comment.