diff --git a/dm/config/subtask.go b/dm/config/subtask.go index cad16d1c24..6340a7b7d5 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -186,6 +186,8 @@ type SubTaskConfig struct { ConfigFile string `toml:"-" json:"config-file"` + CleanDumpFile bool `toml:"clean-dump-file" json:"clean-dump-file"` + // still needed by Syncer / Loader bin printVersion bool } diff --git a/dm/config/task.go b/dm/config/task.go index b08ed1ea3d..8ebb6d67f0 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -290,6 +290,8 @@ type TaskConfig struct { Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"` Loaders map[string]*LoaderConfig `yaml:"loaders"` Syncers map[string]*SyncerConfig `yaml:"syncers"` + + CleanDumpFile bool `yaml:"clean-dump-file"` } // NewTaskConfig creates a TaskConfig @@ -310,6 +312,7 @@ func NewTaskConfig() *TaskConfig { Mydumpers: make(map[string]*MydumperConfig), Loaders: make(map[string]*LoaderConfig), Syncers: make(map[string]*SyncerConfig), + CleanDumpFile: true, } cfg.FlagSet = flag.NewFlagSet("task", flag.ContinueOnError) return cfg @@ -556,6 +559,8 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf cfg.LoaderConfig = *inst.Loader cfg.SyncerConfig = *inst.Syncer + cfg.CleanDumpFile = c.CleanDumpFile + err := cfg.Adjust(true) if err != nil { return nil, terror.Annotatef(err, "source %s", inst.SourceID) diff --git a/loader/checkpoint.go b/loader/checkpoint.go index 6a1411a4ea..50ddeccf01 100644 --- a/loader/checkpoint.go +++ b/loader/checkpoint.go @@ -288,16 +288,17 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos } // fields[0] -> db name, fields[1] -> table name + schema, table := fields[0], fields[1] sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table) cp.logCtx.L().Info("initial checkpoint record", zap.String("sql", sql2), zap.String("id", cp.id), zap.String("filename", filename), - zap.String("schema", fields[0]), - zap.String("table", fields[1]), + zap.String("schema", schema), + zap.String("table", table), zap.Int64("offset", 0), zap.Int64("end position", endPos)) - args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos} + args := []interface{}{cp.id, filename, schema, table, 0, endPos} cp.connMutex.Lock() err := cp.conn.executeSQL(tctx, []string{sql2}, args) cp.connMutex.Unlock() @@ -308,6 +309,18 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos } return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream) } + // checkpoint not exists and no error, cache endPos in memory + if _, ok := cp.restoringFiles[schema]; !ok { + cp.restoringFiles[schema] = make(map[string]FilePosSet) + } + tables := cp.restoringFiles[schema] + if _, ok := tables[table]; !ok { + tables[table] = make(map[string][]int64) + } + restoringFiles := tables[table] + if _, ok := restoringFiles[filename]; !ok { + restoringFiles[filename] = []int64{0, endPos} + } return nil } diff --git a/loader/loader.go b/loader/loader.go index 26b617a25c..d5005ba4fe 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -59,7 +59,9 @@ type Tables2DataFiles map[string]DataFiles type dataJob struct { sql string schema string + table string file string + absPath string offset int64 lastOffset int64 } @@ -180,6 +182,20 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh return } w.loader.finishedDataSize.Add(job.offset - job.lastOffset) + + if w.cfg.CleanDumpFile { + fileInfos := w.checkPoint.GetRestoringFileInfo(job.schema, job.table) + if pos, ok := fileInfos[job.file]; ok { + if job.offset == pos[1] { + w.tctx.L().Info("try to remove loaded dump file", zap.String("data file", job.file)) + if err := os.Remove(job.absPath); err != nil { + w.tctx.L().Warn("error when remove loaded dump file", zap.String("data file", job.file), zap.Error(err)) + } + } + } else { + w.tctx.L().Warn("file not recorded in checkpoint", zap.String("data file", job.file)) + } + } } } } @@ -319,7 +335,9 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab j := &dataJob{ sql: query, schema: table.targetSchema, + table: table.targetTable, file: baseFile, + absPath: file, offset: cur, lastOffset: lastOffset, } @@ -595,6 +613,25 @@ func (l *Loader) Restore(ctx context.Context) error { if err == nil { l.logCtx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + if l.cfg.CleanDumpFile { + files := CollectDirFiles(l.cfg.Dir) + hasDatafile := false + for file := range files { + if strings.HasSuffix(file, ".sql") && + !strings.HasSuffix(file, "-schema.sql") && + !strings.HasSuffix(file, "-schema-create.sql") && + !strings.Contains(file, "-schema-view.sql") && + !strings.Contains(file, "-schema-triggers.sql") && + !strings.Contains(file, "-schema-post.sql") { + hasDatafile = true + } + } + + if !hasDatafile { + l.logCtx.L().Info("clean dump files after importing all files") + l.cleanDumpFiles() + } + } } else if errors.Cause(err) != context.Canceled { return err } @@ -1211,3 +1248,27 @@ func (l *Loader) getMydumpMetadata() error { l.metaBinlog.Set(pos.String()) return nil } + +// cleanDumpFiles is called when finish restoring data, to clean useless files +func (l *Loader) cleanDumpFiles() { + if l.cfg.Mode == config.ModeFull { + // in full-mode all files won't be need in the future + if err := os.RemoveAll(l.cfg.Dir); err != nil { + l.logCtx.L().Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err)) + } + } else { + // leave metadata file, only delete structure files + for db, tables := range l.db2Tables { + dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db) + if err := os.Remove(dbFile); err != nil { + l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", dbFile), zap.Error(err)) + } + for table := range tables { + tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) + if err := os.Remove(tableFile); err != nil { + l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", tableFile), zap.Error(err)) + } + } + } + } +} diff --git a/syncer/syncer.go b/syncer/syncer.go index c25f3b3b3e..ed8c91ae5e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,6 +16,8 @@ package syncer import ( "context" "fmt" + "os" + "path" "reflect" "strconv" "strings" @@ -1049,6 +1051,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return err } + + // for fresh and all-mode task, flush checkpoint so we could delete metadata file + if s.cfg.Mode == config.ModeAll { + if err := s.flushCheckPoints(); err != nil { + s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) + } else { + s.tctx.L().Info("try to remove loaded files") + metadataFile := path.Join(s.cfg.Dir, "metadata") + if err := os.Remove(metadataFile); err != nil { + s.tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err)) + } + if err := os.Remove(s.cfg.Dir); err != nil { + s.tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) + } + } + } } // currentPos is the pos for current received event (End_log_pos in `show binlog events` for mysql) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index a4345d2cd1..d22e46ccb2 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -142,6 +142,10 @@ function run() { check_not_contains "ignore_db" check_contains "all_mode" + echo "check dump files have been cleaned" + ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files" + ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files" + export GO_FAILPOINTS='' } diff --git a/tests/dm_syncer/conf/dm-task.yaml b/tests/dm_syncer/conf/dm-task.yaml index f6407e3e92..00c4bc0330 100644 --- a/tests/dm_syncer/conf/dm-task.yaml +++ b/tests/dm_syncer/conf/dm-task.yaml @@ -7,6 +7,7 @@ meta-schema: "dm_meta" heartbeat-update-interval: 1 heartbeat-report-interval: 1 timezone: "Asia/Shanghai" +clean-dump-file: false target-database: host: "127.0.0.1" diff --git a/tests/full_mode/run.sh b/tests/full_mode/run.sh index 3aa4afd127..b8a56d81ce 100755 --- a/tests/full_mode/run.sh +++ b/tests/full_mode/run.sh @@ -36,6 +36,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + echo "check dump files have been cleaned" + ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files" + ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files" } cleanup_data full_mode diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index a14d8a3cb0..98914fba47 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -27,7 +27,7 @@ function run() { run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - echo "dm-worker paninc, doJob of import unit workers don't exit" + echo "dm-worker panic, doJob of import unit workers don't exit" # check doJobs of import unit worker exit inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" diff --git a/tests/incremental_mode/conf/dm-task.yaml b/tests/incremental_mode/conf/dm-task.yaml index 6810a951d5..d21aeb8000 100644 --- a/tests/incremental_mode/conf/dm-task.yaml +++ b/tests/incremental_mode/conf/dm-task.yaml @@ -7,6 +7,7 @@ meta-schema: "dm_meta" heartbeat-update-interval: 1 heartbeat-report-interval: 1 timezone: "Asia/Shanghai" +clean-dump-file: false target-database: host: "127.0.0.1" diff --git a/tests/load_interrupt/run.sh b/tests/load_interrupt/run.sh index 52ba545b00..936e0e0193 100755 --- a/tests/load_interrupt/run.sh +++ b/tests/load_interrupt/run.sh @@ -58,6 +58,9 @@ function run() { check_port_offline $WORKER1_PORT 20 + # check dump files are generated before worker down + ls $WORK_DIR/worker1/dumped_data.test + run_sql "SELECT count(*) from dm_meta.test_loader_checkpoint where cp_schema = '$TEST_NAME' and offset < $THRESHOLD" $TIDB_PORT $TIDB_PASSWORD check_contains "count(*): 1" # TODO: block for dumpling temporarily @@ -87,6 +90,7 @@ function run() { check_contains "count(*): 1" export GO_FAILPOINTS='' + ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files" } cleanup_data load_interrupt