Skip to content

Commit

Permalink
*: query-status -s and purge-relay send to all relay worker (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Apr 9, 2021
1 parent e23bfda commit 5466403
Show file tree
Hide file tree
Showing 34 changed files with 685 additions and 225 deletions.
8 changes: 4 additions & 4 deletions cmd/dm-worker/main.go
Expand Up @@ -21,15 +21,15 @@ import (
"strings"
"syscall"

"github.com/pingcap/errors"
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
globalLog "github.com/pingcap/log"

"github.com/pingcap/errors"
"go.uber.org/zap"
)

func main() {
Expand Down
6 changes: 6 additions & 0 deletions dm/config/source_config.go
Expand Up @@ -8,7 +8,9 @@ import (
"io/ioutil"
"math"
"math/rand"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -255,6 +257,9 @@ func (c *SourceConfig) Adjust(ctx context.Context, db *sql.DB) (err error) {
if len(c.RelayDir) == 0 {
c.RelayDir = defaultRelayDir
}
if filepath.IsAbs(c.RelayDir) {
log.L().Warn("using an absolute relay path, relay log can't work when starting multiple relay worker")
}

return nil
}
Expand Down Expand Up @@ -291,6 +296,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
return terror.WithScope(err, terror.ScopeUpstream)
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
Expand Down
94 changes: 81 additions & 13 deletions dm/master/scheduler/scheduler.go
Expand Up @@ -932,6 +932,32 @@ func (s *Scheduler) StopRelay(source string, workers []string) error {
return nil
}

// GetRelayWorkers returns all alive worker instances for a relay source
func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started {
return nil, terror.ErrSchedulerNotStarted.Generate()
}

workers := s.relayWorkers[source]
var ret []*Worker
for w := range workers {
worker, ok := s.workers[w]
if !ok {
// should not happen
s.logger.Error("worker instance for relay worker not found", zap.String("worker", w))
continue
}
ret = append(ret, worker)
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].baseInfo.Name < ret[j].baseInfo.Name
})
return ret, nil
}

// UpdateExpectRelayStage updates the current expect relay stage.
// now, only support updates:
// - from `Running` to `Paused`.
Expand Down Expand Up @@ -1475,10 +1501,17 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
}

// try to find its relay source (currently only one relay source)
if source == "" {
if source != "" {
s.logger.Info("found history source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
} else {
for source2, workers := range s.relayWorkers {
if _, ok2 := workers[w.BaseInfo().Name]; ok2 {
source = source2
s.logger.Info("found relay source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand All @@ -1498,6 +1531,9 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// randomly pick one from unbounds
if source == "" {
for source = range s.unbounds {
s.logger.Info("found unbound source when worker bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break // got a source.
}
}
Expand Down Expand Up @@ -1529,40 +1565,72 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// returns (true, nil) after bounded.
// called should update the s.unbounds
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
// 1. try to find history workers...
var worker *Worker
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
break
relayWorkers := s.relayWorkers[source]
// 1. try to find a history worker in relay workers...
if len(relayWorkers) > 0 {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if _, ok2 := relayWorkers[workerName]; ok2 && w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// then a relay worker for this source...
if worker == nil {
for workerName := range s.relayWorkers[source] {
for workerName := range relayWorkers {
w, ok := s.workers[workerName]
if !ok {
// a not found worker, should not happened
s.logger.Warn("worker instance not found for relay worker", zap.String("worker", workerName))
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found relay worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
// then a history worker for this source...
if worker == nil {
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found history worker when source bound",
zap.String("worker", workerName),
zap.String("source", source))
break
}
}
}
}
// and then a random Free worker.
if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
s.logger.Info("found free worker when source bound",
zap.String("worker", w.BaseInfo().Name),
zap.String("source", source))
break
}
}
Expand Down
23 changes: 23 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Expand Up @@ -1164,6 +1164,12 @@ func (t *testScheduler) TestStartStopSource(c *C) {
// test not exist source
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StartRelay(sourceID3, []string{workerName1})), IsTrue)
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.StopRelay(sourceID4, []string{workerName1})), IsTrue)
noWorkerSources := []string{sourceID1, sourceID2, sourceID3, sourceID4}
for _, source := range noWorkerSources {
workers, err := s.GetRelayWorkers(source)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}

// start-relay success on bound-same-source and free worker
c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1175,6 +1181,9 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.relayWorkers[sourceID1], HasLen, 2)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName1)
c.Assert(s.relayWorkers[sourceID1], HasKey, workerName3)
workers, err := s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})

// failed on bound-not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongBound.Equal(s.StartRelay(sourceID1, []string{workerName2})), IsTrue)
Expand All @@ -1189,11 +1198,22 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers[sourceID2], HasLen, 1)
c.Assert(s.relayWorkers[sourceID2], HasKey, workerName2)
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// failed on not-same-source worker and not exist worker
c.Assert(terror.ErrSchedulerRelayWorkersWrongRelay.Equal(s.StopRelay(sourceID1, []string{workerName2})), IsTrue)
c.Assert(terror.ErrSchedulerWorkerNotExist.Equal(s.StopRelay(sourceID1, []string{"not-exist"})), IsTrue)

// nothing changed
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker1, worker3})
workers, err = s.GetRelayWorkers(sourceID2)
c.Assert(err, IsNil)
c.Assert(workers, DeepEquals, []*Worker{worker2})

// stop-relay success
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
Expand All @@ -1202,4 +1222,7 @@ func (t *testScheduler) TestStartStopSource(c *C) {
c.Assert(s.expectRelayStages, HasKey, sourceID2)
c.Assert(s.relayWorkers, HasLen, 1)
c.Assert(s.relayWorkers, HasKey, sourceID2)
workers, err = s.GetRelayWorkers(sourceID1)
c.Assert(err, IsNil)
c.Assert(workers, HasLen, 0)
}

0 comments on commit 5466403

Please sign in to comment.