Skip to content

Commit

Permalink
scheduler(ticdc): evenly distribute tables for drain capture
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jul 13, 2022
1 parent a7485b6 commit bcb773a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
36 changes: 13 additions & 23 deletions cdc/scheduler/internal/v3/scheduler_drain_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ package v3

import (
"math"
"math/rand"
"sync"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -32,15 +30,13 @@ var _ scheduler = &drainCaptureScheduler{}
type drainCaptureScheduler struct {
mu sync.Mutex
target model.CaptureID
random *rand.Rand

maxTaskConcurrency int
}

func newDrainCaptureScheduler(concurrency int) *drainCaptureScheduler {
return &drainCaptureScheduler{
target: captureIDNotDraining,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
maxTaskConcurrency: concurrency,
}
}
Expand Down Expand Up @@ -102,6 +98,7 @@ func (d *drainCaptureScheduler) Schedule(
zap.String("captureID", stopping))
}

// Currently, the workload is the number of tables in a capture.
captureWorkload := make(map[model.CaptureID]int)
for id := range captures {
if id != d.target {
Expand All @@ -119,8 +116,9 @@ func (d *drainCaptureScheduler) Schedule(
return nil
}

// victims record all table instance should be dropped from the target capture
victims := make([]model.TableID, 0)
maxTaskConcurrency := d.maxTaskConcurrency
// victimTables record tables should be moved out from the target capture
victimTables := make([]model.TableID, 0, maxTaskConcurrency)
for tableID, rep := range replications {
if rep.State != ReplicationSetStateReplicating {
// only drain the target capture if all tables is replicating,
Expand All @@ -132,7 +130,9 @@ func (d *drainCaptureScheduler) Schedule(
}

if rep.Primary == d.target {
victims = append(victims, tableID)
if len(victimTables) < maxTaskConcurrency {
victimTables = append(victimTables, tableID)
}
}

// only calculate workload of other captures not the drain target.
Expand All @@ -145,29 +145,18 @@ func (d *drainCaptureScheduler) Schedule(
// 1. the target capture has no table at the beginning
// 2. all tables moved from the target capture
// 3. the target capture cannot be found in the latest captures
if len(victims) == 0 {
log.Info("tpscheduler: drain capture scheduler finished, since no table",
if len(victimTables) == 0 {
log.Info("schedulerv3: drain capture scheduler finished, since no table",
zap.String("target", d.target))
d.target = captureIDNotDraining
return nil
}

for captureID, w := range captureWorkload {
captureWorkload[captureID] = randomizeWorkload(d.random, w)
}

maxTaskConcurrency := d.maxTaskConcurrency
if len(victims) < maxTaskConcurrency {
maxTaskConcurrency = len(victims)
}

// for each victim table, find the target for it
// For each victim table, find the target for it
result := make([]*scheduleTask, 0, maxTaskConcurrency)
for i := 0; i < maxTaskConcurrency; i++ {
tableID := victims[i]
for _, tableID := range victimTables {
target := ""
minWorkload := math.MaxInt64

for captureID, workload := range captureWorkload {
if workload < minWorkload {
minWorkload = workload
Expand All @@ -188,7 +177,8 @@ func (d *drainCaptureScheduler) Schedule(
accept: (callback)(nil), // No need for accept callback here.
})

captureWorkload[target] = randomizeWorkload(d.random, minWorkload+1)
// Increase target workload to make sure tables are evenly distributed.
captureWorkload[target]++
}

return result
Expand Down
28 changes: 28 additions & 0 deletions cdc/scheduler/internal/v3/scheduler_drain_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,31 @@ func TestDrainImbalanceCluster(t *testing.T) {
require.Len(t, tasks, 2)
require.EqualValues(t, "a", scheduler.getTarget())
}

func TestDrainEvenlyDistributedTables(t *testing.T) {
t.Parallel()

var checkpointTs model.Ts
currentTables := make([]model.TableID, 0)
captures := map[model.CaptureID]*CaptureStatus{
"a": {State: CaptureStateInitialized},
"b": {IsOwner: true, State: CaptureStateInitialized},
"c": {State: CaptureStateInitialized},
}
replications := map[model.TableID]*ReplicationSet{
1: {State: ReplicationSetStateReplicating, Primary: "a"},
2: {State: ReplicationSetStateReplicating, Primary: "a"},
3: {State: ReplicationSetStateReplicating, Primary: "a"},
6: {State: ReplicationSetStateReplicating, Primary: "b"},
}
scheduler := newDrainCaptureScheduler(10)
scheduler.setTarget("a")
tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications)
require.Len(t, tasks, 3)
taskMap := make(map[model.CaptureID]int)
for _, t := range tasks {
taskMap[t.moveTable.DestCapture]++
}
require.Equal(t, 1, taskMap["b"])
require.Equal(t, 2, taskMap["c"])
}

0 comments on commit bcb773a

Please sign in to comment.