Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Decouple Checkpoint type from dest-type (#786) #790

Merged
merged 2 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,17 @@ password = ""
port = 3306

[syncer.to.checkpoint]
# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb
#schema = "tidb_binlog"
# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved.
# the default way how checkpoint is saved according to db-type is:
# mysql/tidb -> the according downstream mysql/tidb
# file/kafka -> file in `data-dir`
# type = "mysql"
# you can uncomment this to change the database to save checkpoint when the checkpoint type is mysql or tidb
# schema = "tidb_binlog"
# host = "127.0.0.1"
# user = "root"
# password = ""
# port = 3306

# Uncomment this if you want to use file as db-type.
#[syncer.to]
Expand Down
16 changes: 7 additions & 9 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,26 @@ type CheckPoint interface {
}

// NewCheckPoint returns a CheckPoint instance by giving name
func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
func NewCheckPoint(cfg *Config) (CheckPoint, error) {
var (
cp CheckPoint
err error
)
switch name {
switch cfg.CheckpointType {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
cp, err = newMysql(cfg)
case "file":
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
cp, err = NewFile(cfg)
case "flash":
cp, err = newFlash(cfg)
default:
err = errors.Errorf("unsupported checkpoint type %s", name)
err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType)
}
if err != nil {
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", name, cfg)
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg)
}

log.Info("initialize checkpoint", zap.String("name", name), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))
log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))

return cp, nil
}
18 changes: 9 additions & 9 deletions drainer/checkpoint/pb.go → drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/siddontang/go/ioutil2"
)

// PbCheckPoint is local CheckPoint struct.
type PbCheckPoint struct {
// FileCheckPoint is local CheckPoint struct.
type FileCheckPoint struct {
sync.RWMutex
closed bool
initialCommitTS int64
Expand All @@ -34,9 +34,9 @@ type PbCheckPoint struct {
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

// NewPb creates a new Pb.
func NewPb(cfg *Config) (CheckPoint, error) {
pb := &PbCheckPoint{
// NewFile creates a new FileCheckpoint.
func NewFile(cfg *Config) (CheckPoint, error) {
pb := &FileCheckPoint{
initialCommitTS: cfg.InitialCommitTS,
name: cfg.CheckPointFile,
}
Expand All @@ -49,7 +49,7 @@ func NewPb(cfg *Config) (CheckPoint, error) {
}

// Load implements CheckPointor.Load interface.
func (sp *PbCheckPoint) Load() error {
func (sp *FileCheckPoint) Load() error {
sp.Lock()
defer sp.Unlock()

Expand Down Expand Up @@ -81,7 +81,7 @@ func (sp *PbCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *PbCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -107,15 +107,15 @@ func (sp *PbCheckPoint) Save(ts, slaveTS int64) error {
}

// TS implements CheckPoint.TS interface
func (sp *PbCheckPoint) TS() int64 {
func (sp *FileCheckPoint) TS() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.CommitTS
}

// Close implements CheckPoint.Close interface
func (sp *PbCheckPoint) Close() error {
func (sp *FileCheckPoint) Close() error {
sp.Lock()
defer sp.Unlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/pingcap/errors"
)

func (t *testCheckPointSuite) TestPb(c *C) {
func (t *testCheckPointSuite) TestFile(c *C) {
fileName := "/tmp/test"
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := NewPb(cfg)
meta, err := NewFile(cfg)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

Expand All @@ -48,15 +48,15 @@ func (t *testCheckPointSuite) TestPb(c *C) {

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = NewPb(cfg)
meta, err = NewFile(cfg)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = NewPb(cfg)
meta, err = NewFile(cfg)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)

Expand Down
58 changes: 0 additions & 58 deletions drainer/checkpoint/kafka.go

This file was deleted.

44 changes: 0 additions & 44 deletions drainer/checkpoint/kafka_test.go

This file was deleted.

9 changes: 2 additions & 7 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,15 @@ type MysqlCheckPoint struct {
db *sql.DB
schema string
table string
// type, tidb or mysql
tp string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
}

var sqlOpenDB = pkgsql.OpenDB

func newMysql(tp string, cfg *Config) (CheckPoint, error) {
if err := checkConfig(cfg); err != nil {
return nil, errors.Annotate(err, "check config failed")
}
func newMysql(cfg *Config) (CheckPoint, error) {
setDefaultConfig(cfg)

db, err := sqlOpenDB("mysql", cfg.Db.Host, cfg.Db.Port, cfg.Db.User, cfg.Db.Password)
if err != nil {
Expand All @@ -59,7 +55,6 @@ func newMysql(tp string, cfg *Config) (CheckPoint, error) {
initialCommitTS: cfg.InitialCommitTS,
schema: cfg.Schema,
table: cfg.Table,
tp: tp,
TsMap: make(map[string]int64),
}

Expand Down
9 changes: 4 additions & 5 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl", tp: "other"}
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0)
c.Assert(err, IsNil)
}
Expand All @@ -61,7 +61,6 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
db: db,
schema: "db",
table: "tbl",
tp: "tidb",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333)
Expand Down Expand Up @@ -122,7 +121,7 @@ func (s *newMysqlSuite) TestCannotOpenDB(c *C) {
return nil, errors.New("no db")
}

_, err := newMysql("tidb", &Config{})
_, err := newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*no db.*")
}
Expand All @@ -138,14 +137,14 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) {
}

mock.ExpectExec("create schema.*").WillReturnError(errors.New("fail schema"))
_, err = newMysql("tidb", &Config{})
_, err = newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*fail schema.*")

mock.ExpectExec("create schema.*").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("create table.*").WillReturnError(errors.New("fail table"))

_, err = newMysql("tidb", &Config{})
_, err = newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*fail table.*")
}
9 changes: 3 additions & 6 deletions drainer/checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type DBConfig struct {

// Config is the savepoint configuration
type Config struct {
CheckpointType string

Db *DBConfig
Schema string
Table string
Expand All @@ -39,10 +41,7 @@ type Config struct {
CheckPointFile string `toml:"dir" json:"dir"`
}

func checkConfig(cfg *Config) error {
if cfg == nil {
cfg = new(Config)
}
func setDefaultConfig(cfg *Config) {
if cfg.Db == nil {
cfg.Db = new(DBConfig)
}
Expand All @@ -61,8 +60,6 @@ func checkConfig(cfg *Config) error {
if cfg.Table == "" {
cfg.Table = "checkpoint"
}

return nil
}

func genCreateSchema(sp *MysqlCheckPoint) string {
Expand Down
8 changes: 6 additions & 2 deletions drainer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ func NewServer(cfg *Config) (*Server, error) {
cfg.SyncerCfg.To.ClusterID = clusterID
pdCli.Close()

cpCfg := GenCheckPointCfg(cfg, clusterID)
cp, err := checkpoint.NewCheckPoint(cfg.SyncerCfg.DestDBType, cpCfg)
cpCfg, err := GenCheckPointCfg(cfg, clusterID)
if err != nil {
return nil, errors.Trace(err)
}

cp, err := checkpoint.NewCheckPoint(cpCfg)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,6 @@ func (s *newServerSuite) TestInvalidDestDBType(c *C) {
cfg.SyncerCfg.DestDBType = "nothing"
cfg.adjustConfig()
_, err := NewServer(cfg)
c.Assert(err, ErrorMatches, ".*unsupported checkpoint type.*")
c.Assert(err, ErrorMatches, ".*unknown DestDBType.*")
c.Assert(cfg.SyncerCfg.To.ClusterID, Equals, uint64(8012))
}
7 changes: 6 additions & 1 deletion drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ type DBConfig struct {

// CheckpointConfig is the Checkpoint configuration.
type CheckpointConfig struct {
Schema string `toml:"schema" json:"schema"`
Type string `toml:"type" json:"type"`
Schema string `toml:"schema" json:"schema"`
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
}

type baseError struct {
Expand Down