Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: improve performance #594

Merged
merged 14 commits into from Apr 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 13 additions & 4 deletions dm/config/task.go
Expand Up @@ -55,8 +55,10 @@ var (
defaultPoolSize = 16
defaultDir = "./dumped_data"
// SyncerConfig
defaultWorkerCount = 16
defaultBatch = 100
defaultWorkerCount = 16
defaultBatch = 100
defaultQueueSize = 5120
defaultCheckpointFlushInterval = 30
)

// Meta represents binlog's meta pos
Expand Down Expand Up @@ -193,6 +195,11 @@ type SyncerConfig struct {
MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly
WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"`
Batch int `yaml:"batch" toml:"batch" json:"batch"`
QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"`

// checkpoint flush interval in seconds.
CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"`

// deprecated
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`

Expand All @@ -206,8 +213,10 @@ type SyncerConfig struct {

func defaultSyncerConfig() SyncerConfig {
return SyncerConfig{
WorkerCount: defaultWorkerCount,
Batch: defaultBatch,
WorkerCount: defaultWorkerCount,
Batch: defaultBatch,
QueueSize: defaultQueueSize,
CheckpointFlushInterval: defaultCheckpointFlushInterval,
}
}

Expand Down
4 changes: 1 addition & 3 deletions syncer/checkpoint.go
Expand Up @@ -46,8 +46,6 @@ var (
globalCpTable = "" // global checkpoint's cp_table
maxCheckPointTimeout = "1m"
minCheckpoint = mysql.Position{Pos: 4}

maxCheckPointSaveTime = 30 * time.Second
)

// NOTE: now we sync from relay log, so not add GTID support yet
Expand Down Expand Up @@ -453,7 +451,7 @@ func (cp *RemoteCheckPoint) String() string {
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.globalPointSaveTime) >= maxCheckPointSaveTime
return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
}

// Rollback implements CheckPoint.Rollback
Expand Down
10 changes: 10 additions & 0 deletions syncer/metrics.go
Expand Up @@ -64,6 +64,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"task"})

addJobDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "add_job_duration",
Help: "bucketed histogram of add a job to the queue time (s)",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"type", "task", "queueNo"})

binlogSkippedEventsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Expand Down Expand Up @@ -205,6 +214,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(binlogEventSizeHistogram)
registry.MustRegister(binlogEvent)
registry.MustRegister(conflictDetectDurationHistogram)
registry.MustRegister(addJobDurationHistogram)
registry.MustRegister(binlogSkippedEventsTotal)
registry.MustRegister(addedJobsTotal)
registry.MustRegister(finishedJobsTotal)
Expand Down
9 changes: 8 additions & 1 deletion syncer/syncer.go
Expand Up @@ -275,7 +275,7 @@ func (s *Syncer) newJobChans(count int) {
s.closeJobChans()
s.jobs = make([]chan *job, 0, count)
for i := 0; i < count; i++ {
s.jobs = append(s.jobs, make(chan *job, 1000))
s.jobs = append(s.jobs, make(chan *job, s.cfg.QueueSize))
}
s.jobsClosed.Set(false)
}
Expand Down Expand Up @@ -757,7 +757,10 @@ func (s *Syncer) addJob(job *job) error {
// ugly code addJob and sync, refine it later
s.jobWg.Add(s.cfg.WorkerCount)
for i := 0; i < s.cfg.WorkerCount; i++ {
startTime := time.Now()
s.jobs[i] <- job
// flush for every DML queue
addJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i]).Observe(time.Since(startTime).Seconds())
}
s.jobWg.Wait()
finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc()
Expand All @@ -767,15 +770,19 @@ func (s *Syncer) addJob(job *job) error {
addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc()
s.jobWg.Add(1)
queueBucket = s.cfg.WorkerCount
startTime := time.Now()
s.jobs[queueBucket] <- job
addJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Observe(time.Since(startTime).Seconds())
if job.ddlExecItem != nil {
execDDLReq = job.ddlExecItem.req
}
case insert, update, del:
s.jobWg.Add(1)
queueBucket = int(utils.GenHashKey(job.key)) % s.cfg.WorkerCount
s.addCount(false, s.queueBucketMapping[queueBucket], job.tp, 1)
startTime := time.Now()
s.jobs[queueBucket] <- job
addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds())
}

if s.tracer.Enable() {
Expand Down