diff --git a/lightning/config/config.go b/lightning/config/config.go index 48bf73bd3..9e020a988 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -27,7 +27,7 @@ import ( "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/tidb-tools/pkg/table-router" + router "github.com/pingcap/tidb-tools/pkg/table-router" tidbcfg "github.com/pingcap/tidb/config" "go.uber.org/zap" ) @@ -180,6 +180,9 @@ func NewConfig() *Config { IOConcurrency: 5, CheckRequirements: true, }, + Checkpoint: Checkpoint{ + Enable: true, + }, TiDB: DBStore{ Host: "127.0.0.1", User: "root", @@ -198,8 +201,13 @@ func NewConfig() *Config { Mydumper: MydumperRuntime{ ReadBlockSize: ReadBlockSize, CSV: CSVConfig{ - Separator: ",", - Delimiter: `"`, + Separator: ",", + Delimiter: `"`, + Header: true, + NotNull: false, + Null: `\n`, + BackslashEscape: true, + TrimLastSep: false, }, }, TikvImporter: TikvImporter{ @@ -208,6 +216,7 @@ func NewConfig() *Config { }, PostRestore: PostRestore{ Checksum: true, + Analyze: true, }, BWList: &filter.Rules{}, } @@ -222,12 +231,17 @@ func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error { cfg.TiDB.Host = global.TiDB.Host cfg.TiDB.Port = global.TiDB.Port cfg.TiDB.User = global.TiDB.User - cfg.TiDB.Psw = global.TiDB.Psw + cfg.TiDB.Psw = global.TiDB.Psw cfg.TiDB.StatusPort = global.TiDB.StatusPort cfg.TiDB.PdAddr = global.TiDB.PdAddr cfg.Mydumper.SourceDir = global.Mydumper.SourceDir + cfg.Mydumper.NoSchema = global.Mydumper.NoSchema cfg.TikvImporter.Addr = global.TikvImporter.Addr cfg.TikvImporter.Backend = global.TikvImporter.Backend + cfg.Checkpoint.Enable = global.Checkpoint.Enable + cfg.PostRestore.Checksum = global.PostRestore.Checksum + cfg.PostRestore.Analyze = global.PostRestore.Analyze + cfg.App.CheckRequirements = global.App.CheckRequirements return nil } diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index d3ca6ea0f..86cc59632 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -203,6 +203,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { input: ` [mydumper.csv] separator = '\' + backslash-escape = false `, err: "", }, @@ -231,6 +232,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) { input: ` [mydumper.csv] delimiter = '\' + backslash-escape = false `, err: "", }, @@ -373,6 +375,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { "-pd-urls", "172.16.30.11:2379,172.16.30.12:2379", "-d", "/path/to/import", "-importer", "172.16.30.11:23008", + "-checksum=false", }, nil) c.Assert(err, IsNil) c.Assert(cfg.App.Config.Level, Equals, "debug") @@ -384,10 +387,14 @@ func (s *configTestSuite) TestLoadConfig(c *C) { c.Assert(cfg.TiDB.PdAddr, Equals, "172.16.30.11:2379,172.16.30.12:2379") c.Assert(cfg.Mydumper.SourceDir, Equals, "/path/to/import") c.Assert(cfg.TikvImporter.Addr, Equals, "172.16.30.11:23008") + c.Assert(cfg.PostRestore.Checksum, IsFalse) + c.Assert(cfg.PostRestore.Analyze, IsTrue) taskCfg := config.NewConfig() err = taskCfg.LoadFromGlobal(cfg) c.Assert(err, IsNil) + c.Assert(taskCfg.PostRestore.Checksum, IsFalse) + c.Assert(taskCfg.PostRestore.Analyze, IsTrue) taskCfg.Checkpoint.DSN = "" taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL diff --git a/lightning/config/global.go b/lightning/config/global.go index 4600510bf..90ebce64a 100644 --- a/lightning/config/global.go +++ b/lightning/config/global.go @@ -27,8 +27,9 @@ import ( type GlobalLightning struct { log.Config - StatusAddr string `toml:"status-addr" json:"status-addr"` - ServerMode bool `toml:"server-mode" json:"server-mode"` + StatusAddr string `toml:"status-addr" json:"status-addr"` + ServerMode bool `toml:"server-mode" json:"server-mode"` + CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` // The legacy alias for setting "status-addr". The value should always the // same as StatusAddr, and will not be published in the JSON encoding. @@ -47,6 +48,7 @@ type GlobalTiDB struct { type GlobalMydumper struct { SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` } type GlobalImporter struct { @@ -55,18 +57,33 @@ type GlobalImporter struct { } type GlobalConfig struct { - App GlobalLightning `toml:"lightning" json:"lightning"` - TiDB GlobalTiDB `toml:"tidb" json:"tidb"` - Mydumper GlobalMydumper `toml:"mydumper" json:"mydumper"` - TikvImporter GlobalImporter `toml:"tikv-importer" json:"tikv-importer"` + App GlobalLightning `toml:"lightning" json:"lightning"` + Checkpoint GlobalCheckpoint `toml:"checkpoint" json:"checkpoint"` + TiDB GlobalTiDB `toml:"tidb" json:"tidb"` + Mydumper GlobalMydumper `toml:"mydumper" json:"mydumper"` + TikvImporter GlobalImporter `toml:"tikv-importer" json:"tikv-importer"` + PostRestore GlobalPostRestore `toml:"post-restore" json:"post-restore"` ConfigFileContent []byte } +type GlobalCheckpoint struct { + Enable bool `toml:"enable" json:"enable"` +} + +type GlobalPostRestore struct { + Checksum bool `toml:"checksum" json:"checksum"` + Analyze bool `toml:"analyze" json:"analyze"` +} + func NewGlobalConfig() *GlobalConfig { return &GlobalConfig{ App: GlobalLightning{ - ServerMode: false, + ServerMode: false, + CheckRequirements: true, + }, + Checkpoint: GlobalCheckpoint{ + Enable: true, }, TiDB: GlobalTiDB{ Host: "127.0.0.1", @@ -77,6 +94,10 @@ func NewGlobalConfig() *GlobalConfig { TikvImporter: GlobalImporter{ Backend: "importer", }, + PostRestore: GlobalPostRestore{ + Checksum: true, + Analyze: true, + }, } } @@ -112,12 +133,17 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon tidbHost := fs.String("tidb-host", "", "TiDB server host") tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)") tidbUser := fs.String("tidb-user", "", "TiDB user name to connect") - tidbPsw := fs.String("tidb-password", "", "TiDB password to connect") + tidbPsw := fs.String("tidb-password", "", "TiDB password to connect") tidbStatusPort := fs.Int("tidb-status", 0, "TiDB server status port (default 10080)") pdAddr := fs.String("pd-urls", "", "PD endpoint address") dataSrcPath := fs.String("d", "", "Directory of the dump to import") importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer") backend := fs.String("backend", "", `delivery backend ("importer" or "tidb")`) + enableCheckpoint := fs.Bool("enable-checkpoint", true, "whether to enable checkpoints") + noSchema := fs.Bool("no-schema", false, "ignore schema files, get schema directly from TiDB instead") + checksum := fs.Bool("checksum", true, "compare checksum after importing") + analyze := fs.Bool("analyze", true, "analyze table after importing") + checkRequirements := fs.Bool("check-requirements", true, "check cluster version before starting") statusAddr := fs.String("status-addr", "", "the Lightning server address") serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately") @@ -184,9 +210,24 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon if *backend != "" { cfg.TikvImporter.Backend = *backend } + if !*enableCheckpoint { + cfg.Checkpoint.Enable = false + } + if *noSchema { + cfg.Mydumper.NoSchema = true + } + if !*checksum { + cfg.PostRestore.Checksum = false + } + if !*analyze { + cfg.PostRestore.Analyze = false + } if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 { cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort) } + if !*checkRequirements { + cfg.App.CheckRequirements = false + } if cfg.App.StatusAddr == "" && cfg.App.ServerMode { return nil, errors.New("If server-mode is enabled, the status-addr must be a valid listen address")