Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: add strict optimist mode #9113

Merged
merged 17 commits into from
Jun 13, 2023
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=inter
ErrConfigInvalidPhysicalChecksum,[code=20063:class=config:scope=internal:level=medium], "Message: invalid load checksum-physical option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigColumnMappingDeprecated,[code=20064:class=config:scope=internal:level=high], "Message: column-mapping is not supported since v6.6.0, Workaround: Please use extract-table/extract-schema/extract-source to handle data conflict when merge tables. See https://docs.pingcap.com/tidb/v6.4/task-configuration-file-full#task-configuration-file-template-advanced"
ErrConfigInvalidLoadAnalyze,[code=20065:class=config:scope=internal:level=medium], "Message: invalid load analyze option '%s', Workaround: Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
ErrConfigStrictOptimisticShardMode,[code=20066:class=config:scope=internal:level=medium], "Message: cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`, Workaround: Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
10 changes: 7 additions & 3 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ type SubTaskConfig struct {
flagSet *flag.FlagSet

// when in sharding, multi dm-workers do one task
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
StrictOptimisticShardMode bool `toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move it as a value of shard-mode?

Copy link
Contributor Author

@GMHDBJD GMHDBJD Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have discuss with pm. new config item is better

OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`

// pt/gh-ost name rule, support regex
ShadowTableRules []string `yaml:"shadow-table-rules" toml:"shadow-table-rules" json:"shadow-table-rules"`
Expand Down Expand Up @@ -288,6 +289,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if len(c.ColumnMappingRules) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
Expand Down
82 changes: 44 additions & 38 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,11 @@ func defaultValidatorConfig() ValidatorConfig {
type TaskConfig struct {
*flag.FlagSet `yaml:"-" toml:"-" json:"-"`

Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode" toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"`
// we store detail status in meta
Expand Down Expand Up @@ -682,6 +683,9 @@ func (c *TaskConfig) adjust() error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if len(c.ColumnMappings) > 0 {
return terror.ErrConfigColumnMappingDeprecated.Generate()
Expand Down Expand Up @@ -1237,45 +1241,47 @@ type TaskConfigForDowngrade struct {
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode,omitempty"`
}

// NewTaskConfigForDowngrade create new TaskConfigForDowngrade.
func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
return &TaskConfigForDowngrade{
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
StrictOptimisticShardMode: taskConfig.StrictOptimisticShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
}
}

Expand Down
6 changes: 6 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]dbconfig.DBCon
cfg := NewSubTaskConfig()
cfg.IsSharding = c.IsSharding
cfg.ShardMode = c.ShardMode
cfg.StrictOptimisticShardMode = c.StrictOptimisticShardMode
cfg.OnlineDDL = c.OnlineDDL
cfg.TrashTableRules = c.TrashTableRules
cfg.ShadowTableRules = c.ShadowTableRules
Expand Down Expand Up @@ -181,6 +182,9 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
} else {
subTaskCfg.IsSharding = false
}
if task.StrictOptimisticShardMode != nil {
subTaskCfg.StrictOptimisticShardMode = *task.StrictOptimisticShardMode
}
// set online ddl plugin config
subTaskCfg.OnlineDDL = task.EnhanceOnlineSchemaChange
// set case sensitive from source
Expand Down Expand Up @@ -314,6 +318,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.TaskMode = stCfg0.Mode
c.IsSharding = stCfg0.IsSharding
c.ShardMode = stCfg0.ShardMode
c.StrictOptimisticShardMode = stCfg0.StrictOptimisticShardMode
c.IgnoreCheckingItems = stCfg0.IgnoreCheckingItems
c.MetaSchema = stCfg0.MetaSchema
c.EnableHeartbeat = stCfg0.EnableHeartbeat
Expand Down Expand Up @@ -611,6 +616,7 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
taskShardMode := openapi.TaskShardMode(oneSubtaskConfig.ShardMode)
task.ShardMode = &taskShardMode
}
task.StrictOptimisticShardMode = &oneSubtaskConfig.StrictOptimisticShardMode
if len(filterMap) > 0 {
task.BinlogFilterRule = &filterRuleMap
}
Expand Down
5 changes: 3 additions & 2 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask1Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[0].BinlogPos))

// check shard config
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
c.Assert(subTask1Config.StrictOptimisticShardMode, check.IsTrue)
// check online schema change
c.Assert(subTask1Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down Expand Up @@ -218,7 +219,7 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask2Config.Meta.BinLogName, check.Equals, *task.SourceConfig.SourceConf[1].BinlogName)
c.Assert(subTask2Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[1].BinlogPos))
// check shard config
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
// check online schema change
c.Assert(subTask2Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,12 @@ description = ""
workaround = "Please choose a valid value in ['required', 'optional', 'off'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20066]
message = "cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`"
description = ""
workaround = "Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
tags = ["internal", "medium"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
6 changes: 4 additions & 2 deletions dm/openapi/fixtures/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var (
"security": null,
"user": "root"
},
"task_mode": "all"
"task_mode": "all",
"strict_optimistic_shard_mode": false
}
`

Expand Down Expand Up @@ -103,7 +104,8 @@ var (
"meta_schema": "dm_meta",
"name": "test",
"on_duplicate": "error",
"shard_mode": "pessimistic",
"shard_mode": "optimistic",
"strict_optimistic_shard_mode": true,
"source_config": {
"full_migrate_conf": {
"data_dir": "./exported_data",
Expand Down