Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: optimize safemode mechanism to improve performance (#1906) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Jul 26, 2021
1 parent a1802ea commit acfbff4
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 68 deletions.
39 changes: 36 additions & 3 deletions syncer/checkpoint.go
Expand Up @@ -216,6 +216,9 @@ type CheckPoint interface {
// FlushPointWithTableInfo flushed the table point with given table info
FlushPointWithTableInfo(tctx *tcontext.Context, sourceSchema, sourceTable string, ti *model.TableInfo) error

// FlushSafeModeExitPoint flushed the global checkpoint's with given table info
FlushSafeModeExitPoint(tctx *tcontext.Context) error

// GlobalPoint returns the global binlog stream's checkpoint
// corresponding to Meta.Pos and Meta.GTID
GlobalPoint() binlog.Location
Expand Down Expand Up @@ -285,7 +288,8 @@ type RemoteCheckPoint struct {
// this variable is mainly used to decide status of safe mode, so it is access when
// - init safe mode
// - checking in sync and if passed, unset it
safeModeExitPoint *binlog.Location
safeModeExitPoint *binlog.Location
needFlushSafeModeExitPoint bool

logCtx *tcontext.Context
}
Expand Down Expand Up @@ -385,7 +389,11 @@ func (cp *RemoteCheckPoint) saveTablePoint(sourceSchema, sourceTable string, loc
// SaveSafeModeExitPoint implements CheckPoint.SaveSafeModeExitPoint
// shouldn't call concurrently (only called before loop in Syncer.Run and in loop to reset).
func (cp *RemoteCheckPoint) SaveSafeModeExitPoint(point *binlog.Location) {
cp.safeModeExitPoint = point
if cp.safeModeExitPoint == nil || point == nil ||
binlog.CompareLocation(*point, *cp.safeModeExitPoint, cp.cfg.EnableGTID) > 0 {
cp.safeModeExitPoint = point
cp.needFlushSafeModeExitPoint = true
}
}

// SafeModeExitPoint implements CheckPoint.SafeModeExitPoint.
Expand Down Expand Up @@ -506,7 +514,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
sqls := make([]string, 0, 100)
args := make([][]interface{}, 0, 100)

if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() {
if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() || cp.needFlushSafeModeExitPoint {
locationG := cp.GlobalPoint()
sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true)
sqls = append(sqls, sqlG)
Expand Down Expand Up @@ -556,6 +564,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
}

cp.globalPointSaveTime = time.Now()
cp.needFlushSafeModeExitPoint = false
return nil
}

Expand Down Expand Up @@ -601,6 +610,30 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, sour
return nil
}

// FlushSafeModeExitPoint implements CheckPoint.FlushSafeModeExitPoint.
func (cp *RemoteCheckPoint) FlushSafeModeExitPoint(tctx *tcontext.Context) error {
cp.RLock()
defer cp.RUnlock()

sqls := make([]string, 1)
args := make([][]interface{}, 1)

// use FlushedGlobalPoint here to avoid update global checkpoint
locationG := cp.FlushedGlobalPoint()
sqls[0], args[0] = cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, cp.safeModeExitPoint, nil, true)

// use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update
tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration)
defer cancel()
_, err := cp.dbConn.ExecuteSQL(tctx2, sqls, args...)
if err != nil {
return err
}

cp.needFlushSafeModeExitPoint = false
return nil
}

// GlobalPoint implements CheckPoint.GlobalPoint.
func (cp *RemoteCheckPoint) GlobalPoint() binlog.Location {
cp.RLock()
Expand Down
48 changes: 24 additions & 24 deletions syncer/mode.go
Expand Up @@ -35,31 +35,31 @@ func (s *Syncer) enableSafeModeInitializationPhase(tctx *tcontext.Context, safeM
if s.checkpoint.SafeModeExitPoint() != nil {
//nolint:errcheck
safeMode.Add(tctx, 1) // enable and will revert after pass SafeModeExitLoc
s.tctx.L().Info("enable safe-mode because of inconsistent dump, will exit at", zap.Stringer("location", *s.checkpoint.SafeModeExitPoint()))
}
s.tctx.L().Info("enable safe-mode for safe mode exit point, will exit at", zap.Stringer("location", *s.checkpoint.SafeModeExitPoint()))
} else {
//nolint:errcheck
safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval
go func() {
defer func() {
err := safeMode.Add(tctx, -1)
if err != nil {
// send error to the fatal chan to interrupt the process
s.runFatalChan <- unit.NewProcessError(err)
}
}()

initPhaseSeconds := s.cfg.CheckpointFlushInterval * 2

//nolint:errcheck
safeMode.Add(tctx, 1) // enable and will revert after 2 * CheckpointFlushInterval
go func() {
defer func() {
err := safeMode.Add(tctx, -1)
if err != nil {
// send error to the fatal chan to interrupt the process
s.runFatalChan <- unit.NewProcessError(err)
failpoint.Inject("SafeModeInitPhaseSeconds", func(val failpoint.Value) {
seconds, _ := val.(int)
initPhaseSeconds = seconds
s.tctx.L().Info("set initPhaseSeconds", zap.String("failpoint", "SafeModeInitPhaseSeconds"), zap.Int("value", seconds))
})
s.tctx.L().Info("enable safe-mode because of task initialization", zap.Int("duration in seconds", initPhaseSeconds))
select {
case <-tctx.Context().Done():
case <-time.After(time.Duration(initPhaseSeconds) * time.Second):
}
}()

initPhaseSeconds := s.cfg.CheckpointFlushInterval * 2

failpoint.Inject("SafeModeInitPhaseSeconds", func(val failpoint.Value) {
seconds, _ := val.(int)
initPhaseSeconds = seconds
s.tctx.L().Info("set initPhaseSeconds", zap.String("failpoint", "SafeModeInitPhaseSeconds"), zap.Int("value", seconds))
})
s.tctx.L().Info("enable safe-mode because of task initialization", zap.Int("duration in seconds", initPhaseSeconds))
select {
case <-tctx.Context().Done():
case <-time.After(time.Duration(initPhaseSeconds) * time.Second):
}
}()
}
}
116 changes: 79 additions & 37 deletions syncer/syncer.go
Expand Up @@ -86,6 +86,7 @@ var (
maxDDLConnectionTimeout = fmt.Sprintf("%dm", MaxDDLConnectionTimeoutMinute)

maxDMLConnectionDuration, _ = time.ParseDuration(maxDMLConnectionTimeout)
maxDMLExecutionDuration = 30 * time.Second

adminQueueName = "admin queue"
defaultBucketCount = 8
Expand Down Expand Up @@ -1065,7 +1066,7 @@ func (s *Syncer) flushCheckPoints() error {
// optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this
// worker resume, it can not execute the DML/DDL in old binlog because of downstream table structure mismatching.
// We should find a way to (compensating) implement a transaction containing interaction with both etcd and SQL.
if err != nil {
if err != nil && (terror.ErrDBExecuteFailed.Equal(err) || terror.ErrDBUnExpect.Equal(err)) {
s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint",
zap.Stringer("checkpoint", s.checkpoint),
zap.Error(err))
Expand Down Expand Up @@ -1144,7 +1145,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.

failpoint.Inject("ExecDDLError", func() {
s.tctx.L().Warn("execute ddl error", zap.Strings("DDL", ddlJob.ddls), zap.String("failpoint", "ExecDDLError"))
err = errors.Errorf("execute ddl %v error", ddlJob.ddls)
err = terror.ErrDBUnExpect.Delegate(errors.Errorf("execute ddl %v error", ddlJob.ddls))
failpoint.Goto("bypass")
})

Expand All @@ -1157,6 +1158,16 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.
}
}
failpoint.Label("bypass")
failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && (intVal == 2 || intVal == 3) {
s.tctx.L().Warn("mock safe mode error", zap.Strings("DDL", ddlJob.ddls), zap.String("failpoint", "SafeModeExit"))
if intVal == 2 {
err = terror.ErrWorkerDDLLockInfoNotFound.Generatef("DDL info not found")
} else {
err = terror.ErrDBExecuteFailed.Delegate(errors.Errorf("execute ddl %v error", ddlJob.ddls))
}
}
})
// If downstream has error (which may cause by tracker is more compatible than downstream), we should stop handling
// this job, set `s.execError` to let caller of `addJob` discover error
if err != nil {
Expand Down Expand Up @@ -1198,7 +1209,9 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.
}
}
if err != nil {
s.execError.Store(err)
if s.execError.Load() == nil {
s.execError.Store(err)
}
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, ddlJob.startLocation, ddlJob.currentLocation, true, ddlJob.originSQL)
s.runFatalChan <- unit.NewProcessError(err)
Expand Down Expand Up @@ -1266,21 +1279,10 @@ func (s *Syncer) syncDML(
failpoint.Inject("failSecondJobOfEvent", func() {
if failExecuteSQL && failOnce.CAS(false, true) {
s.tctx.L().Info("trigger failSecondJobOfEvent")
failpoint.Return(0, errors.New("failSecondJobOfEvent"))
failpoint.Return(0, terror.ErrDBExecuteFailed.Delegate(errors.New("failSecondJobOfEvent"), "mock"))
}
})

select {
case <-tctx.Ctx.Done():
// do not execute queries anymore, because they should be failed with a done context.
// and avoid some errors like:
// - `driver: bad connection` for `BEGIN`
// - `sql: connection is already closed` for `BEGIN`
tctx.L().Info("skip some remaining DML jobs in the job chan because the context is done", zap.Int("count", len(jobs)))
return 0, tctx.Ctx.Err() // return the error to trigger `fatalF`.
default:
}

queries := make([]string, 0, len(jobs))
args := make([][]interface{}, 0, len(jobs))
for _, j := range jobs {
Expand All @@ -1291,7 +1293,10 @@ func (s *Syncer) syncDML(
t := v.(int)
time.Sleep(time.Duration(t) * time.Second)
})
return db.ExecuteSQL(tctx, queries, args...)
// use background context to execute sqls as much as possible
ctctx, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLExecutionDuration)
defer cancel()
return db.ExecuteSQL(ctctx, queries, args...)
}

var err error
Expand Down Expand Up @@ -1324,6 +1329,13 @@ func (s *Syncer) syncDML(

if idx >= count || sqlJob.tp == flush {
affect, err = executeSQLs()

failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == 4 && len(jobs) > 0 {
s.tctx.L().Warn("fail to exec DML", zap.String("failpoint", "SafeModeExit"))
affect, err = 0, terror.ErrDBExecuteFailed.Delegate(errors.New("SafeModeExit"), "mock")
}
})
if err != nil {
fatalF(affect, err)
continue
Expand Down Expand Up @@ -1397,6 +1409,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if flushCheckpoint {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
return err
}
}
if delLoadTask {
Expand Down Expand Up @@ -1505,6 +1518,23 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
cancel()
}()

// syncing progress with sharding DDL group
// 1. use the global streamer to sync regular binlog events
// 2. sharding DDL synced for some sharding groups
// * record first pos, last pos, target schema, target table as re-sync info
// 3. use the re-sync info recorded in step.2 to create a new streamer
// 4. use the new streamer re-syncing for this sharding group
// 5. in sharding group's re-syncing
// * ignore other tables' binlog events
// * compare last pos with current binlog's pos to determine whether re-sync completed
// 6. use the global streamer to continue the syncing
var (
shardingReSyncCh = make(chan *ShardingReSync, 10)
shardingReSync *ShardingReSync
savedGlobalLastLocation binlog.Location
traceSource = fmt.Sprintf("%s.syncer.%s", s.cfg.SourceID, s.cfg.Name)
)

defer func() {
if err1 := recover(); err1 != nil {
failpoint.Inject("ExitAfterSaveOnlineDDL", func() {
Expand All @@ -1516,9 +1546,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

s.jobWg.Wait()
if err2 := s.flushCheckPoints(); err2 != nil {
tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
var (
err2 error
exitSafeModeLoc binlog.Location
)
if binlog.CompareLocation(currentLocation, savedGlobalLastLocation, s.cfg.EnableGTID) > 0 {
exitSafeModeLoc = currentLocation.Clone()
} else {
exitSafeModeLoc = savedGlobalLastLocation.Clone()
}
s.checkpoint.SaveSafeModeExitPoint(&exitSafeModeLoc)
if err2 = s.execError.Load(); err2 != nil && (terror.ErrDBExecuteFailed.Equal(err2) || terror.ErrDBUnExpect.Equal(err2)) {
err2 = s.checkpoint.FlushSafeModeExitPoint(s.tctx)
} else {
err2 = s.flushCheckPoints()
}
tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}()

s.start = time.Now()
Expand All @@ -1538,23 +1581,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
safeMode := sm.NewSafeMode()
s.enableSafeModeInitializationPhase(tctx, safeMode)

// syncing progress with sharding DDL group
// 1. use the global streamer to sync regular binlog events
// 2. sharding DDL synced for some sharding groups
// * record first pos, last pos, target schema, target table as re-sync info
// 3. use the re-sync info recorded in step.2 to create a new streamer
// 4. use the new streamer re-syncing for this sharding group
// 5. in sharding group's re-syncing
// * ignore other tables' binlog events
// * compare last pos with current binlog's pos to determine whether re-sync completed
// 6. use the global streamer to continue the syncing
var (
shardingReSyncCh = make(chan *ShardingReSync, 10)
shardingReSync *ShardingReSync
savedGlobalLastLocation binlog.Location
traceSource = fmt.Sprintf("%s.syncer.%s", s.cfg.SourceID, s.cfg.Name)
)

closeShardingResync := func() error {
if shardingReSync == nil {
return nil
Expand Down Expand Up @@ -1586,6 +1612,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

for {
if s.execError.Load() != nil {
return nil
}
s.currentLocationMu.Lock()
s.currentLocationMu.currentLocation = currentLocation
s.currentLocationMu.Unlock()
Expand Down Expand Up @@ -1617,6 +1646,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
startTime := time.Now()
e, err = s.getEvent(tctx, currentLocation)

failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == 1 {
s.tctx.L().Warn("fail to get event", zap.String("failpoint", "SafeModeExit"))
err = errors.New("connect: connection refused")
}
})
switch {
case err == context.Canceled:
tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation))
Expand Down Expand Up @@ -1744,8 +1779,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if safeModeExitLoc != nil && !s.isReplacingErr && shardingReSync == nil {
if binlog.CompareLocation(currentLocation, *safeModeExitLoc, s.cfg.EnableGTID) >= 0 {
s.checkpoint.SaveSafeModeExitPoint(nil)
err = safeMode.Add(tctx, -1)
if err != nil {
// must flush here to avoid the following situation:
// 1. quit safe mode
// 2. push forward and replicate some sqls after safeModeExitPoint to downstream
// 3. quit because of network error, fail to flush global checkpoint and new safeModeExitPoint to downstream
// 4. restart again, quit safe mode at safeModeExitPoint, but some sqls after this location have already been replicated to the downstream
if err = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err != nil {
return err
}
if err = safeMode.Add(tctx, -1); err != nil {
return err
}
}
Expand Down
11 changes: 10 additions & 1 deletion syncer/syncer_test.go
Expand Up @@ -1233,12 +1233,17 @@ func (s *testSyncerSuite) TestRun(c *C) {
streamer: mockStreamer,
}

// When crossing safeModeExitPoint, will generate a flush sql
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1))
checkPointMock.ExpectCommit()
// Simulate resume from syncer, last time we exit successfully, so we shouldn't open safe mode here
go syncer.Process(ctx, resultCh)

expectJobs2 := []*expectJob{
{
insert,
"REPLACE INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)",
"INSERT INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)",
[]interface{}{int32(3), "c"},
}, {
del,
Expand Down Expand Up @@ -1348,6 +1353,10 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) {
ctx, cancel := context.WithCancel(context.Background())
resultCh := make(chan pb.ProcessResult)

// When crossing safeModeExitPoint, will generate a flush sql
checkPointMock.ExpectBegin()
checkPointMock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1))
checkPointMock.ExpectCommit()
// disable 1-minute safe mode
c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil)
go syncer.Process(ctx, resultCh)
Expand Down

0 comments on commit acfbff4

Please sign in to comment.