Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-ha/: add remove metadata feature #651

Merged
merged 41 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
14d16ba
add remove metadata feature
lichunzhu May 7, 2020
76d4eec
Merge branch 'master' into fixRemoveMeta
lichunzhu May 7, 2020
18d0f7b
add remove-meta to integration test
lichunzhu May 7, 2020
47c7a7a
add annotation
lichunzhu May 7, 2020
9b6214a
add adjust db
lichunzhu May 7, 2020
a0c0061
add decrypt
lichunzhu May 7, 2020
9a7e9de
Merge branch 'master' into fixRemoveMeta
lichunzhu May 7, 2020
9e2d625
add close
lichunzhu May 7, 2020
571f0ee
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 7, 2020
54d62c0
address comments
lichunzhu May 8, 2020
8cffb59
remove metrics
lichunzhu May 8, 2020
ffd5528
move remove-meta from config file to command
lichunzhu May 8, 2020
872471e
fix test
lichunzhu May 8, 2020
8ecc213
fix ut
lichunzhu May 8, 2020
562d42f
address comments
lichunzhu May 12, 2020
0024702
Merge branch 'master' into fixRemoveMeta
lichunzhu May 12, 2020
0713a35
add ut
lichunzhu May 12, 2020
152fcdf
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 12, 2020
6dd58b3
Merge branch 'master' into fixRemoveMeta
lichunzhu May 12, 2020
cf8906c
fix remove meta ut
lichunzhu May 13, 2020
7ad15b8
Merge branch 'master' into fixRemoveMeta
lichunzhu May 13, 2020
e04447d
remove both pesi/opti for any shardmode
lichunzhu May 13, 2020
fb7c8b3
remove shardmeta.MetaTableFormat
lichunzhu May 13, 2020
d2876ef
add part of integration test for remove meta
lichunzhu May 13, 2020
8517c98
fix ut again
lichunzhu May 13, 2020
9dc0a9e
complete sequence_sharding_removemeta test
lichunzhu May 13, 2020
efb33b7
add db operation
lichunzhu May 13, 2020
2e9b9ce
add check ddl locks
lichunzhu May 13, 2020
3f86e9b
address comment
lichunzhu May 14, 2020
b6fb025
add mutex test
lichunzhu May 14, 2020
95b2e41
Merge branch 'master' into fixRemoveMeta
lichunzhu May 14, 2020
22fe815
merge master branch
lichunzhu May 15, 2020
37b8666
Merge branch 'fixRemoveMeta' of https://github.com/lichunzhu/dm into …
lichunzhu May 15, 2020
c578d6b
Merge branch 'master' into fixRemoveMeta
lichunzhu May 15, 2020
2f57b00
fix ut
lichunzhu May 15, 2020
60a0aca
Merge branch 'master' into fixRemoveMeta
lichunzhu May 15, 2020
025b209
merge master
lichunzhu May 19, 2020
59a69a8
add sleep
lichunzhu May 19, 2020
52926d0
add sleep to avoid canceled ddl error
lichunzhu May 19, 2020
c721261
change kill 3 master to kill 2
lichunzhu May 20, 2020
a891447
add todo
lichunzhu May 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ ErrMasterRequestIsNotForwardToLeader,[code=38043:class=dm-master:scope=internal:
ErrMasterIsNotAsyncRequest,[code=38044:class=dm-master:scope=internal:level=medium],"request %s is not an async one, needn't wait for ok"
ErrMasterFailToGetExpectResult,[code=38045:class=dm-master:scope=internal:level=medium],"fail to get expected result"
ErrMasterPessimistNotStarted,[code=38046:class=dm-master:scope=internal:level=medium],"the shardddl pessimist has not started"
ErrMasterOptimistNotStarted,[code=38047:class=dm-master:scope=internal:level=medium],"the shardddl optimist has not started"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down
1 change: 0 additions & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ type SubTaskConfig struct {
ServerID uint32 `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
MetaSchema string `toml:"meta-schema" json:"meta-schema"`
RemoveMeta bool `toml:"remove-meta" json:"remove-meta"`
HeartbeatUpdateInterval int `toml:"heartbeat-update-interval" json:"heartbeat-update-interval"`
HeartbeatReportInterval int `toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
Expand Down
5 changes: 1 addition & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,7 @@ type TaskConfig struct {
// we store detail status in meta
// don't save configuration into it
MetaSchema string `yaml:"meta-schema"`
// remove meta from downstreaming database
// now we delete checkpoint and online ddl information
RemoveMeta bool `yaml:"remove-meta"`

EnableHeartbeat bool `yaml:"enable-heartbeat"`
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval"`
Expand Down Expand Up @@ -510,7 +508,6 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.Mode = c.TaskMode
cfg.CaseSensitive = c.CaseSensitive
cfg.MetaSchema = c.MetaSchema
cfg.RemoveMeta = c.RemoveMeta
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
cfg.EnableHeartbeat = c.EnableHeartbeat
cfg.HeartbeatUpdateInterval = c.HeartbeatUpdateInterval
cfg.HeartbeatReportInterval = c.HeartbeatReportInterval
Expand Down
8 changes: 1 addition & 7 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -56,7 +55,6 @@ name: test1
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -80,7 +78,7 @@ mysql-instances:
err := taskConfig.Decode(errorTaskConfig1)
// field server-id is not a member of TaskConfig
c.Check(err, NotNil)
c.Assert(err, ErrorMatches, "*line 19: field server-id not found in type config.MySQLInstance*")
c.Assert(err, ErrorMatches, "*line 18: field server-id not found in type config.MySQLInstance*")

err = taskConfig.Decode(errorTaskConfig2)
// field name duplicate
Expand All @@ -94,7 +92,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -113,7 +110,6 @@ task-mode: all
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
ignore-checking-items: ["all"]
Expand All @@ -131,7 +127,6 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
Expand Down Expand Up @@ -210,7 +205,6 @@ task-mode: all
is-sharding: true
shard-mode: "optimistic"
meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
Expand Down
15 changes: 12 additions & 3 deletions dm/ctl/master/start_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ import (
"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
)

// NewStartTaskCmd creates a StartTask command
func NewStartTaskCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "start-task [-s source ...] <config-file>",
Use: "start-task [-s source ...] [--remove-meta] <config-file>",
Short: "start a task as defined in the config file",
Run: startTaskFunc,
}
cmd.Flags().BoolP("remove-meta", "", false, "whether to remove task's meta data")
return cmd
}

Expand All @@ -54,14 +56,21 @@ func startTaskFunc(cmd *cobra.Command, _ []string) {
return
}

removeMeta, err := cmd.Flags().GetBool("remove-meta")
if err != nil {
common.PrintLines("%s", terror.Message(err))
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start task
cli := common.MasterClient()
resp, err := cli.StartTask(ctx, &pb.StartTaskRequest{
Task: string(content),
Sources: sources,
Task: string(content),
Sources: sources,
RemoveMeta: removeMeta,
})
if err != nil {
common.PrintLines("can not start task:\n%v", errors.ErrorStack(err))
Expand Down
4 changes: 1 addition & 3 deletions dm/dm-ansible/conf/task_advanced.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name: test # global unique
task-mode: all # full/incremental/all
is-sharding: true # whether multi dm-worker do one sharding job
meta-schema: "dm_meta" # meta schema in downstreaming database to store meta informaton of dm
remove-meta: false # remove meta from downstreaming database, now we delete checkpoint and online ddl information
enable-heartbeat: false # whether to enable heartbeat for calculating lag between master and syncer
# heartbeat-update-interval: 1 # interval to do heartbeat and save timestamp, default 1s
# heartbeat-report-interval: 10 # interval to report time lap to prometheus, default 10s
Expand All @@ -23,8 +22,7 @@ mysql-instances: # one or more source database, config more source d
# `full` / `all`:
# never be used
# `incremental`:
# if `remove-meta` is true, this will be used
# else if checkpoints already exists in `meta-schema`, this will not be used
# if checkpoints already exists in `meta-schema`, this will not be used
# otherwise, this will be used
meta:
binlog-name: mysql-bin.000001
Expand Down
78 changes: 74 additions & 4 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/pingcap/dm/dm/master/workerrpc"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/cputil"
"github.com/pingcap/dm/pkg/election"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -81,6 +83,9 @@ type Server struct {
leaderClient pb.MasterClient
leaderGrpcConn *grpc.ClientConn

// removeMetaLock locks start task when removing meta
removeMetaLock sync.RWMutex

// WaitGroup for background functions.
bgFunWg sync.WaitGroup

Expand Down Expand Up @@ -360,15 +365,31 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
if len(sourceRespCh) > 0 {
sourceResps = sortCommonWorkerResults(sourceRespCh)
} else {
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
s.removeMetaLock.Lock()
if req.RemoveMeta {
if scm := s.scheduler.GetSubTaskCfgsByTask(cfg.Name); len(scm) > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any mechanism to prevent start-task in another client when removing meta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add removeMetaLock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need a lock, may start task with same task name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em... may user start-task again when the previous start-task is still removing meta?

resp.Msg = terror.Annotate(terror.ErrSchedulerSubTaskExist.Generate(cfg.Name, sources),
"while remove-meta is true").Error()
s.removeMetaLock.Unlock()
return resp, nil
}
err = s.removeMetaData(ctx, cfg)
if err != nil {
resp.Msg = terror.Annotate(err, "while removing metadata").Error()
s.removeMetaLock.Unlock()
return resp, nil
}
}
err = s.scheduler.AddSubTasks(subtaskCfgPointersToInstances(stCfgs...)...)
s.removeMetaLock.Unlock()
if err != nil {
resp.Msg = errors.ErrorStack(err)
return resp, nil
}
sources := make([]string, 0, len(stCfgs))
for _, stCfg := range stCfgs {
sources = append(sources, stCfg.SourceID)
}
resp.Result = true
sourceResps = s.getSourceRespsAfterOperation(ctx, cfg.Name, sources, []string{}, req)
}
Expand Down Expand Up @@ -1332,6 +1353,55 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return cfg, stCfgs, nil
}

func (s *Server) removeMetaData(ctx context.Context, cfg *config.TaskConfig) error {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
toDB := *cfg.TargetDB
toDB.Adjust()
if len(toDB.Password) > 0 {
pswdTo, err := utils.Decrypt(toDB.Password)
if err != nil {
return err
}
toDB.Password = pswdTo
}

// clear shard meta data for pessimistic/optimist
err := s.pessimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}
err = s.optimist.RemoveMetaData(cfg.Name)
if err != nil {
return err
}

// set up db and clear meta data in downstream db
baseDB, err := conn.DefaultDBProvider.Apply(toDB)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.Close()
dbConn, err := baseDB.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer baseDB.CloseBaseConn(dbConn)
ctctx := tcontext.Background().WithContext(ctx).WithLogger(log.With(zap.String("job", "remove metadata")))

sqls := make([]string, 0, 4)
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name)))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`",
cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)))

_, err = dbConn.ExecuteSQL(ctctx, nil, cfg.Name, sqls)
return err
}

func extractWorkerError(result *pb.ProcessResult) error {
if result != nil && len(result.Errors) > 0 {
return terror.ErrMasterOperRespNotSuccess.Generate(utils.JoinProcessErrors(result.Errors))
Expand Down
Loading