From 3533d05b8c7679d817af37bc343697705a1844a3 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 30 Oct 2019 12:08:11 +0800 Subject: [PATCH 1/2] *: Decouple Checkpoint type from dest-type after this pr: we have this checkpoint type: mysql/tidb, file, flash defalut using checkpoint type accroding to dest-type like before mysql/tidb -> mysql/tidb (the same downstream) flash -> flash kafka/file -> file and we can configure the checkpoint type despite what the dest-type is; currently only can configure the checkpoint type to be mysql/tidb --- cmd/drainer/drainer.toml | 14 ++++- drainer/checkpoint/checkpoint.go | 16 +++-- drainer/checkpoint/{pb.go => file.go} | 18 +++--- .../checkpoint/{pb_test.go => file_test.go} | 8 +-- drainer/checkpoint/kafka.go | 58 ------------------- drainer/checkpoint/kafka_test.go | 44 -------------- drainer/checkpoint/mysql.go | 9 +-- drainer/checkpoint/mysql_test.go | 9 ++- drainer/checkpoint/util.go | 9 +-- drainer/server.go | 8 ++- drainer/server_test.go | 2 +- drainer/sync/util.go | 7 ++- drainer/syncer_test.go | 2 +- drainer/util.go | 50 ++++++++++++---- pump/storage/storage_test.go | 2 +- tests/kafka/drainer.toml | 5 ++ 16 files changed, 100 insertions(+), 161 deletions(-) rename drainer/checkpoint/{pb.go => file.go} (85%) rename drainer/checkpoint/{pb_test.go => file_test.go} (93%) delete mode 100644 drainer/checkpoint/kafka.go delete mode 100644 drainer/checkpoint/kafka_test.go diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 4948ffd45..c2fcf4aec 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -84,8 +84,18 @@ password = "" port = 3306 [syncer.to.checkpoint] -# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb -#schema = "tidb_binlog" +# you can uncomment this to change the database to save checkpoint when the checkpoint type is mysql or tidb +# schema = "tidb_binlog" +# +# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved. +# the default way how checkpoint is saved according to db-type is: +# mysql/tidb -> the according downstream mysql/tidb +# file/kafka -> file in `data-dir` +# type = "mysql" +# host = "127.0.0.1" +# user = "root" +# password = "" +# port = 3306 # Uncomment this if you want to use file as db-type. #[syncer.to] diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index fd4b07fba..08ddf8881 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -41,28 +41,26 @@ type CheckPoint interface { } // NewCheckPoint returns a CheckPoint instance by giving name -func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) { +func NewCheckPoint(cfg *Config) (CheckPoint, error) { var ( cp CheckPoint err error ) - switch name { + switch cfg.CheckpointType { case "mysql", "tidb": - cp, err = newMysql(name, cfg) + cp, err = newMysql(cfg) case "file": - cp, err = NewPb(cfg) - case "kafka": - cp, err = newKafka(cfg) + cp, err = NewFile(cfg) case "flash": cp, err = newFlash(cfg) default: - err = errors.Errorf("unsupported checkpoint type %s", name) + err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType) } if err != nil { - return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", name, cfg) + return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg) } - log.Info("initialize checkpoint", zap.String("name", name), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg)) + log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg)) return cp, nil } diff --git a/drainer/checkpoint/pb.go b/drainer/checkpoint/file.go similarity index 85% rename from drainer/checkpoint/pb.go rename to drainer/checkpoint/file.go index acefc91f0..e7d287c73 100644 --- a/drainer/checkpoint/pb.go +++ b/drainer/checkpoint/file.go @@ -23,8 +23,8 @@ import ( "github.com/siddontang/go/ioutil2" ) -// PbCheckPoint is local CheckPoint struct. -type PbCheckPoint struct { +// FileCheckPoint is local CheckPoint struct. +type FileCheckPoint struct { sync.RWMutex closed bool initialCommitTS int64 @@ -34,9 +34,9 @@ type PbCheckPoint struct { CommitTS int64 `toml:"commitTS" json:"commitTS"` } -// NewPb creates a new Pb. -func NewPb(cfg *Config) (CheckPoint, error) { - pb := &PbCheckPoint{ +// NewFile creates a new FileCheckpoint. +func NewFile(cfg *Config) (CheckPoint, error) { + pb := &FileCheckPoint{ initialCommitTS: cfg.InitialCommitTS, name: cfg.CheckPointFile, } @@ -49,7 +49,7 @@ func NewPb(cfg *Config) (CheckPoint, error) { } // Load implements CheckPointor.Load interface. -func (sp *PbCheckPoint) Load() error { +func (sp *FileCheckPoint) Load() error { sp.Lock() defer sp.Unlock() @@ -81,7 +81,7 @@ func (sp *PbCheckPoint) Load() error { } // Save implements CheckPoint.Save interface -func (sp *PbCheckPoint) Save(ts, slaveTS int64) error { +func (sp *FileCheckPoint) Save(ts, slaveTS int64) error { sp.Lock() defer sp.Unlock() @@ -107,7 +107,7 @@ func (sp *PbCheckPoint) Save(ts, slaveTS int64) error { } // TS implements CheckPoint.TS interface -func (sp *PbCheckPoint) TS() int64 { +func (sp *FileCheckPoint) TS() int64 { sp.RLock() defer sp.RUnlock() @@ -115,7 +115,7 @@ func (sp *PbCheckPoint) TS() int64 { } // Close implements CheckPoint.Close interface -func (sp *PbCheckPoint) Close() error { +func (sp *FileCheckPoint) Close() error { sp.Lock() defer sp.Unlock() diff --git a/drainer/checkpoint/pb_test.go b/drainer/checkpoint/file_test.go similarity index 93% rename from drainer/checkpoint/pb_test.go rename to drainer/checkpoint/file_test.go index 2b025d00a..9862313a6 100644 --- a/drainer/checkpoint/pb_test.go +++ b/drainer/checkpoint/file_test.go @@ -20,12 +20,12 @@ import ( "github.com/pingcap/errors" ) -func (t *testCheckPointSuite) TestPb(c *C) { +func (t *testCheckPointSuite) TestFile(c *C) { fileName := "/tmp/test" notExistFileName := "test_not_exist" cfg := new(Config) cfg.CheckPointFile = fileName - meta, err := NewPb(cfg) + meta, err := NewFile(cfg) c.Assert(err, IsNil) defer os.RemoveAll(fileName) @@ -48,7 +48,7 @@ func (t *testCheckPointSuite) TestPb(c *C) { // check not exist meta file cfg.CheckPointFile = notExistFileName - meta, err = NewPb(cfg) + meta, err = NewFile(cfg) c.Assert(err, IsNil) err = meta.Load() c.Assert(err, IsNil) @@ -56,7 +56,7 @@ func (t *testCheckPointSuite) TestPb(c *C) { // check not exist meta file, but with initialCommitTs cfg.InitialCommitTS = 123 - meta, err = NewPb(cfg) + meta, err = NewFile(cfg) c.Assert(err, IsNil) c.Assert(meta.TS(), Equals, cfg.InitialCommitTS) diff --git a/drainer/checkpoint/kafka.go b/drainer/checkpoint/kafka.go deleted file mode 100644 index 2442b56c3..000000000 --- a/drainer/checkpoint/kafka.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package checkpoint - -import ( - "sync" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "go.uber.org/zap" -) - -// KafkaCheckpoint is local CheckPoint struct. -type KafkaCheckpoint struct { - sync.Mutex - *PbCheckPoint -} - -func newKafka(cfg *Config) (CheckPoint, error) { - pb, err := NewPb(cfg) - if err != nil { - return nil, errors.Trace(err) - } - - cp := &KafkaCheckpoint{ - PbCheckPoint: pb.(*PbCheckPoint), - } - - return cp, nil -} - -// Save implements CheckPoint.Save() -func (cp *KafkaCheckpoint) Save(ts, slaveTS int64) error { - cp.Lock() - defer cp.Unlock() - - if cp.closed { - return errors.Trace(ErrCheckPointClosed) - } - - if ts <= cp.CommitTS { - log.Error("ignore save checkpoint", zap.Int64("ts", ts)) - return nil - } - - return cp.PbCheckPoint.Save(ts, slaveTS) -} diff --git a/drainer/checkpoint/kafka_test.go b/drainer/checkpoint/kafka_test.go deleted file mode 100644 index 47644a4ea..000000000 --- a/drainer/checkpoint/kafka_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package checkpoint - -import ( - "path/filepath" - - . "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -func (t *testCheckPointSuite) TestKafka(c *C) { - dir := c.MkDir() - fileName := filepath.Join(dir, "test_kafka") - cfg := new(Config) - cfg.CheckPointFile = fileName - cp, err := newKafka(cfg) - c.Assert(err, IsNil) - c.Assert(cp.TS(), Equals, int64(0)) - - testTs := int64(1) - err = cp.Save(testTs, 0) - c.Assert(err, IsNil) - ts := cp.TS() - c.Assert(ts, Equals, testTs) - - // close the checkpoint - err = cp.Close() - c.Assert(err, IsNil) - c.Assert(errors.Cause(cp.Load()), Equals, ErrCheckPointClosed) - c.Assert(errors.Cause(cp.Save(0, 0)), Equals, ErrCheckPointClosed) - c.Assert(errors.Cause(cp.Close()), Equals, ErrCheckPointClosed) -} diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index 9b50d1fe6..2c60465a6 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -34,8 +34,6 @@ type MysqlCheckPoint struct { db *sql.DB schema string table string - // type, tidb or mysql - tp string CommitTS int64 `toml:"commitTS" json:"commitTS"` TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` @@ -43,10 +41,8 @@ type MysqlCheckPoint struct { var sqlOpenDB = pkgsql.OpenDB -func newMysql(tp string, cfg *Config) (CheckPoint, error) { - if err := checkConfig(cfg); err != nil { - return nil, errors.Annotate(err, "check config failed") - } +func newMysql(cfg *Config) (CheckPoint, error) { + setDefaultConfig(cfg) db, err := sqlOpenDB("mysql", cfg.Db.Host, cfg.Db.Port, cfg.Db.User, cfg.Db.Password) if err != nil { @@ -59,7 +55,6 @@ func newMysql(tp string, cfg *Config) (CheckPoint, error) { initialCommitTS: cfg.InitialCommitTS, schema: cfg.Schema, table: cfg.Table, - tp: tp, TsMap: make(map[string]int64), } diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 5d9c48e5f..411bb0b79 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -48,7 +48,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0)) - cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl", tp: "other"} + cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"} err = cp.Save(1111, 0) c.Assert(err, IsNil) } @@ -61,7 +61,6 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) { db: db, schema: "db", table: "tbl", - tp: "tidb", TsMap: make(map[string]int64), } err = cp.Save(65536, 3333) @@ -122,7 +121,7 @@ func (s *newMysqlSuite) TestCannotOpenDB(c *C) { return nil, errors.New("no db") } - _, err := newMysql("tidb", &Config{}) + _, err := newMysql(&Config{}) c.Assert(err, NotNil) c.Assert(err, ErrorMatches, ".*no db.*") } @@ -138,14 +137,14 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) { } mock.ExpectExec("create schema.*").WillReturnError(errors.New("fail schema")) - _, err = newMysql("tidb", &Config{}) + _, err = newMysql(&Config{}) c.Assert(err, NotNil) c.Assert(err, ErrorMatches, ".*fail schema.*") mock.ExpectExec("create schema.*").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec("create table.*").WillReturnError(errors.New("fail table")) - _, err = newMysql("tidb", &Config{}) + _, err = newMysql(&Config{}) c.Assert(err, NotNil) c.Assert(err, ErrorMatches, ".*fail table.*") } diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index ac5829135..e48132e9e 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -30,6 +30,8 @@ type DBConfig struct { // Config is the savepoint configuration type Config struct { + CheckpointType string + Db *DBConfig Schema string Table string @@ -39,10 +41,7 @@ type Config struct { CheckPointFile string `toml:"dir" json:"dir"` } -func checkConfig(cfg *Config) error { - if cfg == nil { - cfg = new(Config) - } +func setDefaultConfig(cfg *Config) { if cfg.Db == nil { cfg.Db = new(DBConfig) } @@ -61,8 +60,6 @@ func checkConfig(cfg *Config) error { if cfg.Table == "" { cfg.Table = "checkpoint" } - - return nil } func genCreateSchema(sp *MysqlCheckPoint) string { diff --git a/drainer/server.go b/drainer/server.go index df00b2cd6..bef3a820c 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -119,8 +119,12 @@ func NewServer(cfg *Config) (*Server, error) { cfg.SyncerCfg.To.ClusterID = clusterID pdCli.Close() - cpCfg := GenCheckPointCfg(cfg, clusterID) - cp, err := checkpoint.NewCheckPoint(cfg.SyncerCfg.DestDBType, cpCfg) + cpCfg, err := GenCheckPointCfg(cfg, clusterID) + if err != nil { + return nil, errors.Trace(err) + } + + cp, err := checkpoint.NewCheckPoint(cpCfg) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/server_test.go b/drainer/server_test.go index 4e36ef5c1..bb3d87a6c 100644 --- a/drainer/server_test.go +++ b/drainer/server_test.go @@ -348,6 +348,6 @@ func (s *newServerSuite) TestInvalidDestDBType(c *C) { cfg.SyncerCfg.DestDBType = "nothing" cfg.adjustConfig() _, err := NewServer(cfg) - c.Assert(err, ErrorMatches, ".*unsupported checkpoint type.*") + c.Assert(err, ErrorMatches, ".*unknown DestDBType.*") c.Assert(cfg.SyncerCfg.To.ClusterID, Equals, uint64(8012)) } diff --git a/drainer/sync/util.go b/drainer/sync/util.go index 4be69162a..cfa922e19 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -40,7 +40,12 @@ type DBConfig struct { // CheckpointConfig is the Checkpoint configuration. type CheckpointConfig struct { - Schema string `toml:"schema" json:"schema"` + Type string `toml:"type" json:"type"` + Schema string `toml:"schema" json:"schema"` + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + Port int `toml:"port" json:"port"` } type baseError struct { diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index e9557dfc6..6902d0b3d 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -64,7 +64,7 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) { } cpFile := c.MkDir() + "/checkpoint" - cp, err := checkpoint.NewPb(&checkpoint.Config{CheckPointFile: cpFile}) + cp, err := checkpoint.NewFile(&checkpoint.Config{CheckPointFile: cpFile}) c.Assert(err, check.IsNil) syncer, err := NewSyncer(cp, cfg, nil) diff --git a/drainer/util.go b/drainer/util.go index de590f3a0..512e6a1c8 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -75,25 +75,53 @@ func (g *taskGroup) Wait() { } // GenCheckPointCfg returns an CheckPoint config instance -func GenCheckPointCfg(cfg *Config, id uint64) *checkpoint.Config { - dbCfg := checkpoint.DBConfig{ - Host: cfg.SyncerCfg.To.Host, - User: cfg.SyncerCfg.To.User, - Password: cfg.SyncerCfg.To.Password, - Port: cfg.SyncerCfg.To.Port, - } +func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { + checkpointCfg := &checkpoint.Config{ - Db: &dbCfg, ClusterID: id, InitialCommitTS: cfg.InitialCommitTS, CheckPointFile: path.Join(cfg.DataDir, "savepoint"), } - if cfg.SyncerCfg.To.Checkpoint.Schema != "" { - checkpointCfg.Schema = cfg.SyncerCfg.To.Checkpoint.Schema + toCheckpoint := cfg.SyncerCfg.To.Checkpoint + + if toCheckpoint.Schema != "" { + checkpointCfg.Schema = toCheckpoint.Schema + } + + switch toCheckpoint.Type { + case "mysql", "tidb": + checkpointCfg.CheckpointType = toCheckpoint.Type + checkpointCfg.Db = &checkpoint.DBConfig{ + Host: toCheckpoint.Host, + User: toCheckpoint.User, + Password: toCheckpoint.Password, + Port: toCheckpoint.Port, + } + case "": + switch cfg.SyncerCfg.DestDBType { + case "mysql", "tidb": + checkpointCfg.CheckpointType = cfg.SyncerCfg.DestDBType + checkpointCfg.Db = &checkpoint.DBConfig{ + Host: cfg.SyncerCfg.To.Host, + User: cfg.SyncerCfg.To.User, + Password: cfg.SyncerCfg.To.Password, + Port: cfg.SyncerCfg.To.Port, + } + case "pb", "file": + checkpointCfg.CheckpointType = "file" + case "kafka": + checkpointCfg.CheckpointType = "file" + case "flash": + checkpointCfg.CheckpointType = "flash" + default: + return nil, errors.Errorf("unknown DestDBType: %s", cfg.SyncerCfg.DestDBType) + } + default: + return nil, errors.Errorf("unknown checkpoint type: %s", toCheckpoint.Type) } - return checkpointCfg + return checkpointCfg, nil } func initializeSaramaGlobalConfig() { diff --git a/pump/storage/storage_test.go b/pump/storage/storage_test.go index 2419dc263..f5d6140c1 100644 --- a/pump/storage/storage_test.go +++ b/pump/storage/storage_test.go @@ -119,7 +119,7 @@ func (as *AppendSuit) TestBlockedWriteKVShouldNotStopWritingVlogs(c *check.C) { store.options.KVChanCapacity, check.Commentf("No consumer of the written channel is set up, it should be full at this point"), ) - case <-time.After(1 * time.Second): + case <-time.After(3 * time.Second): c.Fatal("Takes too long to finish writing binlogs, writing may have been blocked.") } } diff --git a/tests/kafka/drainer.toml b/tests/kafka/drainer.toml index 398335c00..cff9b3c83 100644 --- a/tests/kafka/drainer.toml +++ b/tests/kafka/drainer.toml @@ -30,6 +30,11 @@ db-type = "kafka" # size-limit = "100000" [syncer.to.checkpoint] #schema = "tidb_binlog" +type = "mysql" +host = "127.0.0.1" +user = "root" +password = "" +port = 4000 [syncer.to] topic-name = "binlog_test_topic" From c246e79e0807545c643b1a7955e5035b15e2cf5c Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sun, 3 Nov 2019 13:41:10 +0800 Subject: [PATCH 2/2] Address comment --- cmd/drainer/drainer.toml | 5 ++--- drainer/util.go | 6 ++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index c2fcf4aec..fef6c17e3 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -84,14 +84,13 @@ password = "" port = 3306 [syncer.to.checkpoint] -# you can uncomment this to change the database to save checkpoint when the checkpoint type is mysql or tidb -# schema = "tidb_binlog" -# # only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved. # the default way how checkpoint is saved according to db-type is: # mysql/tidb -> the according downstream mysql/tidb # file/kafka -> file in `data-dir` # type = "mysql" +# you can uncomment this to change the database to save checkpoint when the checkpoint type is mysql or tidb +# schema = "tidb_binlog" # host = "127.0.0.1" # user = "root" # password = "" diff --git a/drainer/util.go b/drainer/util.go index 512e6a1c8..90e2a5dc7 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -114,6 +114,12 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { checkpointCfg.CheckpointType = "file" case "flash": checkpointCfg.CheckpointType = "flash" + checkpointCfg.Db = &checkpoint.DBConfig{ + Host: cfg.SyncerCfg.To.Host, + User: cfg.SyncerCfg.To.User, + Password: cfg.SyncerCfg.To.Password, + Port: cfg.SyncerCfg.To.Port, + } default: return nil, errors.Errorf("unknown DestDBType: %s", cfg.SyncerCfg.DestDBType) }