Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reparo/:Add worker-count, txn-batch config (#742) #746

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/reparo/reparo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ log-level = "info"
# for print, it just prints decoded value.
dest-type = "mysql"

# number of binlog events in a transaction batch
txn-batch = 20

# work count to execute binlogs
# if the latency between reparo and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
worker-count = 16

##replicate-do-db priority over replicate-do-table if have same db name
##and we support regular expression , start with '~' declare use regular expression.
#
Expand Down
4 changes: 4 additions & 0 deletions reparo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
StopDatetime string `toml:"stop-datetime" json:"stop-datetime"`
StartTSO int64 `toml:"start-tso" json:"start-tso"`
StopTSO int64 `toml:"stop-tso" json:"stop-tso"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`

DestType string `toml:"dest-type" json:"dest-type"`
DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"`
Expand Down Expand Up @@ -64,6 +66,8 @@ func NewConfig() *Config {
fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.")
fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format")
fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format")
fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.IntVar(&c.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&c.LogFile, "log-file", "", "log file path")
fs.StringVar(&c.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.StringVar(&c.DestType, "dest-type", "print", "dest type, values can be [print,mysql]")
Expand Down
2 changes: 1 addition & 1 deletion reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Infof("cfg %+v", cfg)

syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 5 additions & 7 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@ type mysqlSyncer struct {
}

var (
_ Syncer = &mysqlSyncer{}
defaultWorkerCount = 16
defaultBatchSize = 20
_ Syncer = &mysqlSyncer{}
)

// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
if err != nil {
return nil, errors.Trace(err)
}

return newMysqlSyncerFromSQLDB(db, safemode)
return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize))
func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize))
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}
Expand Down
7 changes: 1 addition & 6 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
var (
mock sqlmock.Sqlmock
)
originWorkerCount := defaultWorkerCount
defaultWorkerCount = 1
defer func() {
defaultWorkerCount = originWorkerCount
}()

oldCreateDB := createDB
createDB = func(string, string, string, int) (db *sql.DB, err error) {
Expand All @@ -37,7 +32,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, safemode)
syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
Expand Down
4 changes: 2 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) {
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg, safemode)
return newMysqlSyncer(cfg, worker, batchSize, safemode)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
2 changes: 1 addition & 1 deletion reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, false)
syncer, err := New(testCase.typeStr, cfg, 16, 20, false)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
Expand Down