Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: add strict optimist mode (#9113) #9212

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ ErrConfigLoaderS3NotSupport,[code=20059:class=config:scope=internal:level=high],
ErrConfigInvalidSafeModeDuration,[code=20060:class=config:scope=internal:level=medium], "Message: safe-mode-duration '%s' parsed failed: %v, Workaround: Please check the `safe-mode-duration` is correct."
ErrConfigConfictSafeModeDurationAndSafeMode,[code=20061:class=config:scope=internal:level=low], "Message: safe-mode(true) conflicts with safe-mode-duration(0s), Workaround: Please set safe-mode to false or safe-mode-duration to non-zero."
ErrConfigInvalidPhysicalDuplicateResolution,[code=20062:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate-physical option '%s', Workaround: Please choose a valid value in ['none', 'manual'] or leave it empty."
ErrConfigStrictOptimisticShardMode,[code=20063:class=config:scope=internal:level=medium], "Message: cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`, Workaround: Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
10 changes: 7 additions & 3 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ type SubTaskConfig struct {
flagSet *flag.FlagSet

// when in sharding, multi dm-workers do one task
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
StrictOptimisticShardMode bool `toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`

// pt/gh-ost name rule, support regex
ShadowTableRules []string `yaml:"shadow-table-rules" toml:"shadow-table-rules" json:"shadow-table-rules"`
Expand Down Expand Up @@ -401,6 +402,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if c.OnlineDDLScheme != "" && c.OnlineDDLScheme != PT && c.OnlineDDLScheme != GHOST {
return terror.ErrConfigOnlineSchemeNotSupport.Generate(c.OnlineDDLScheme)
Expand Down
82 changes: 44 additions & 38 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,11 @@ func defaultValidatorConfig() ValidatorConfig {
type TaskConfig struct {
*flag.FlagSet `yaml:"-" toml:"-" json:"-"`

Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
Name string `yaml:"name" toml:"name" json:"name"`
TaskMode string `yaml:"task-mode" toml:"task-mode" json:"task-mode"`
IsSharding bool `yaml:"is-sharding" toml:"is-sharding" json:"is-sharding"`
ShardMode string `yaml:"shard-mode" toml:"shard-mode" json:"shard-mode"` // when `shard-mode` set, we always enable sharding support.
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode" toml:"strict-optimistic-shard-mode" json:"strict-optimistic-shard-mode"`
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items" toml:"ignore-checking-items" json:"ignore-checking-items"`
// we store detail status in meta
Expand Down Expand Up @@ -643,6 +644,9 @@ func (c *TaskConfig) adjust() error {
} else if c.ShardMode == "" && c.IsSharding {
c.ShardMode = ShardPessimistic // use the pessimistic mode as default for back compatible.
}
if c.StrictOptimisticShardMode && c.ShardMode != ShardOptimistic {
return terror.ErrConfigStrictOptimisticShardMode.Generate()
}

if c.CollationCompatible != "" && c.CollationCompatible != LooseCollationCompatible && c.CollationCompatible != StrictCollationCompatible {
return terror.ErrConfigCollationCompatibleNotSupport.Generate(c.CollationCompatible)
Expand Down Expand Up @@ -1223,45 +1227,47 @@ type TaskConfigForDowngrade struct {
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
StrictOptimisticShardMode bool `yaml:"strict-optimistic-shard-mode,omitempty"`
}

// NewTaskConfigForDowngrade create new TaskConfigForDowngrade.
func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
return &TaskConfigForDowngrade{
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
StrictOptimisticShardMode: taskConfig.StrictOptimisticShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: NewLoaderConfigForDowngrade(taskConfig.Loaders),
Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers),
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
}
}

Expand Down
6 changes: 6 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*
cfg := NewSubTaskConfig()
cfg.IsSharding = c.IsSharding
cfg.ShardMode = c.ShardMode
cfg.StrictOptimisticShardMode = c.StrictOptimisticShardMode
cfg.OnlineDDL = c.OnlineDDL
cfg.TrashTableRules = c.TrashTableRules
cfg.ShadowTableRules = c.ShadowTableRules
Expand Down Expand Up @@ -179,6 +180,9 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
} else {
subTaskCfg.IsSharding = false
}
if task.StrictOptimisticShardMode != nil {
subTaskCfg.StrictOptimisticShardMode = *task.StrictOptimisticShardMode
}
// set online ddl plugin config
subTaskCfg.OnlineDDL = task.EnhanceOnlineSchemaChange
// set case sensitive from source
Expand Down Expand Up @@ -312,6 +316,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.TaskMode = stCfg0.Mode
c.IsSharding = stCfg0.IsSharding
c.ShardMode = stCfg0.ShardMode
c.StrictOptimisticShardMode = stCfg0.StrictOptimisticShardMode
c.IgnoreCheckingItems = stCfg0.IgnoreCheckingItems
c.MetaSchema = stCfg0.MetaSchema
c.EnableHeartbeat = stCfg0.EnableHeartbeat
Expand Down Expand Up @@ -609,6 +614,7 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
taskShardMode := openapi.TaskShardMode(oneSubtaskConfig.ShardMode)
task.ShardMode = &taskShardMode
}
task.StrictOptimisticShardMode = &oneSubtaskConfig.StrictOptimisticShardMode
if len(filterMap) > 0 {
task.BinlogFilterRule = &filterRuleMap
}
Expand Down
5 changes: 3 additions & 2 deletions dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask1Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[0].BinlogPos))

// check shard config
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask1Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
c.Assert(subTask1Config.StrictOptimisticShardMode, check.IsTrue)
// check online schema change
c.Assert(subTask1Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down Expand Up @@ -217,7 +218,7 @@ func testShardAndFilterTaskToSubTaskConfigs(c *check.C) {
c.Assert(subTask2Config.Meta.BinLogName, check.Equals, *task.SourceConfig.SourceConf[1].BinlogName)
c.Assert(subTask2Config.Meta.BinLogPos, check.Equals, uint32(*task.SourceConfig.SourceConf[1].BinlogPos))
// check shard config
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModePessimistic))
c.Assert(subTask2Config.ShardMode, check.Equals, string(openapi.TaskShardModeOptimistic))
// check online schema change
c.Assert(subTask2Config.OnlineDDL, check.Equals, true)
// check case sensitive
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,12 @@ description = ""
workaround = "Please choose a valid value in ['none', 'manual'] or leave it empty."
tags = ["internal", "medium"]

[error.DM-config-20063]
message = "cannot enable `strict-optimistic-shard-mode` while `shard-mode` is not `optimistic`"
description = ""
workaround = "Please set `shard-mode` to `optimistic` if you want to enable `strict-optimistic-shard-mode`."
tags = ["internal", "medium"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
6 changes: 4 additions & 2 deletions dm/openapi/fixtures/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var (
"security": null,
"user": "root"
},
"task_mode": "all"
"task_mode": "all",
"strict_optimistic_shard_mode": false
}
`

Expand Down Expand Up @@ -103,7 +104,8 @@ var (
"meta_schema": "dm_meta",
"name": "test",
"on_duplicate": "error",
"shard_mode": "pessimistic",
"shard_mode": "optimistic",
"strict_optimistic_shard_mode": true,
"source_config": {
"full_migrate_conf": {
"data_dir": "./exported_data",
Expand Down