scheduler: add Relay as a worker stage and fix bug #2219
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
for some TODO, I need reviewer's opinion and fix them in next PR |
Later I'll add some comment of choose bound to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
after rest comment are resolved
// returns (true, nil) after bounded. | ||
// tryBoundForWorker tries to bind a source to the given worker. The order of picking source is | ||
// - try to bind the last bound source | ||
// - try to bind sources on which the worker has unfinished load task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think try to bind to the unfinished load task
priority is higher than last bound souce
.
relaySource := workers[i].RelaySourceID() | ||
another := i ^ 1 // make use of XOR to flip 0 and 1 | ||
toBindSource := inputSources[another] | ||
if relaySource != "" && toBindSource != "" && relaySource != toBindSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If SourceA finished dump phase on WorkerA and then workerA down, then sourceA was scheduled to WorkerB and enable relay on workerB, then workerA up again, sourceA should be transferred to workerA again. But in this situation, it will return a false positive error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before transferWorkerAndSource
: (workerA, ""), (workerB, sourceA), workerB has relay source of sourceA
in this check of line 555, when iterating workerA, relaySource
is "" so not pass; when iterating workerB toBindSouce
is "" so not pass. It won't raise false positive error.
This is to check workerB has started relay for sourceA but requested to bind to sourceB
dm/master/scheduler/scheduler.go
Outdated
// set the stage as Bound and record the bound relationship if exists. | ||
if bound, ok := sbm[name]; ok { | ||
// source bounds without source configuration should be deleted later | ||
if _, ok := scm[bound.Source]; ok { | ||
boundsToTrigger = append(boundsToTrigger, bound) | ||
// TODO: if etcd has saved KV that worker1 started relay for source1, but bound to source2, should | ||
// we stop relay or stop source to avoid DM master leader failed to bootstrap? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe stop source is a simpler choice because we don't support migrate relay bind relation to another worker.
@@ -1521,6 +1521,10 @@ func (t *testScheduler) TestTransferWorkerAndSource(c *C) { | |||
c.Assert(s.bounds[sourceID2], DeepEquals, worker1) | |||
c.Assert(s.bounds[sourceID3], DeepEquals, worker4) | |||
c.Assert(s.bounds[sourceID4], DeepEquals, worker3) | |||
|
|||
c.Assert(worker1.StartRelay(sourceID2), IsNil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add some unit tests for the newly added logic
defer w.mu.Unlock() | ||
|
||
switch w.stage { | ||
case WorkerOffline, WorkerRelay: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the worker is in these two states? maybe add a log.DPanic
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WorkerOffline can directly turn on relay, to ensure the worker has no time sync without relay.
WorkerRelay can turn on relay just like WorkerBound can ToBound again. Maybe we'll change it in future.
/cc @GMHDBJD |
_utils/terror_gen/errors_release.txt
Outdated
@@ -518,7 +518,7 @@ ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level= | |||
ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later" | |||
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update relay-log related parts for now" | |||
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]" | |||
ErrSchedulerCantTransferToRelayWorker,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker to be bound to source [%s], but it has been started relay for source [%s]" | |||
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workaround? stop-relay for previous source and start for current source?
dm/master/scheduler/scheduler.go
Outdated
for _, workerName := range workers { | ||
worker := s.workers[workerName] | ||
if relaySource := worker.RelaySourceID(); relaySource != "" && relaySource != source { | ||
busyWorkers = append(busyWorkers, workerName) | ||
busySources = append(busySources, relaySource) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about move to L1117
delete(s.relayWorkers[source], w) | ||
for _, workerName := range workers { | ||
delete(s.relayWorkers[source], workerName) | ||
s.workers[workerName].StopRelay() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call tryBoundForWorker for this worker if it stopRelay and become free? or in later pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to wait DM worker DisableRelay
then it can receive a new source bound 😂 much work to do, I'll leave it in future
dm/master/scheduler/scheduler.go
Outdated
// - remove the bound relationship in the scheduler. | ||
// this func is called after the bound relationship removed from etcd. | ||
func (s *Scheduler) updateStatusForUnbound(source string) { | ||
w, ok := s.bounds[source] | ||
if !ok { | ||
return | ||
} | ||
w.ToFree() | ||
if err := w.Unbound(); err != nil { | ||
s.logger.DPanic("can updateStatusForUnbound", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.logger.DPanic("can updateStatusForUnbound", zap.Error(err)) | |
s.logger.DPanic("cannot updateStatusForUnbound", zap.Error(err)) |
@@ -270,7 +270,45 @@ function test_last_bound() { | |||
echo "[$(date)] <<<<<< finish test_last_bound >>>>>>" | |||
} | |||
|
|||
function test_exclusive_relay() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about add test for original issue.
e.g.
worker1 --relay-> source1
worker2 --relay-> source2
worker3 free
restart worker1,then worker3 --> source1
restart worker2, then worker1 --> source2
now worker1 should not error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean worker1 should not be bound? source2 should keep unbound until worker2 has up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queshi.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 93c4d59
|
In response to a cherrypick label: new pull request created: #2275. |
What problem does this PR solve?
close #2204
What is changed and how it works?
Now scheduler of DM master will maintain DM worker in 4 stage: Offline, Free, Relay, Bound. Relay is newly added to easily and correctly handle some stage transition.
Check List
Tests
Code changes
Side effects
Related changes