Skip to content

Commit

Permalink
scheduler(ticdc): adjust some logic in scheduler (pingcap#9296) (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Jul 28, 2023
1 parent eab086e commit e68b679
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 49 deletions.
12 changes: 2 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
"go.uber.org/zap"
)

// newSchedulerFromCtx creates a new scheduler from context.
// newScheduler creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
func newScheduler(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
Expand All @@ -58,14 +58,6 @@ func newSchedulerFromCtx(
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context,
pdClock pdutil.Clock,
epoch uint64,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, pdClock, epoch)
}

type changefeed struct {
id model.ChangeFeedID
// state is read-only during the Tick, should only be updated by patch the etcd.
Expand Down
29 changes: 9 additions & 20 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,21 @@ func NewCoordinator(
if err != nil {
return nil, errors.Trace(err)
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.pdClock = pdClock
coord.changefeedEpoch = changefeedEpoch
return coord, nil
}

func newCoordinator(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
ownerRevision int64,
cfg *config.SchedulerConfig,
) *coordinator {
revision := schedulepb.OwnerRevision{Revision: ownerRevision}

return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
version: version.ReleaseSemver(),
revision: revision,
changefeedEpoch: changefeedEpoch,
captureID: captureID,
trans: trans,
replicationM: replication.NewReplicationManager(
cfg.MaxTaskConcurrency, changefeedID),
captureM: member.NewCaptureManager(captureID, changefeedID, revision, cfg),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
compat: compat.New(map[model.CaptureID]*model.CaptureInfo{}),
}
pdClock: pdClock,
}, nil
}

// Tick implement the scheduler interface
Expand Down Expand Up @@ -206,8 +195,8 @@ func (c *coordinator) DrainCapture(target model.CaptureID) (int, error) {
return count, nil
}

// when draining the capture, tables need to be dispatched to other
// capture except the draining one, so at least should have 2 captures alive.
// when draining the capture, tables need to be dispatched to other capture
// except the draining one, so there should be at least two live captures.
if len(c.captureM.Captures) <= 1 {
log.Warn("schedulerv3: drain capture request ignored, "+
"only one captures alive",
Expand Down
33 changes: 27 additions & 6 deletions cdc/scheduler/internal/v3/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -118,10 +119,30 @@ func TestCoordinatorRecvMsgs(t *testing.T) {
}}, msgs)
}

func newCoordinator(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
cfg *config.SchedulerConfig,
) *coordinator {
revision := schedulepb.OwnerRevision{Revision: 1}

return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
replicationM: replication.NewReplicationManager(
cfg.MaxTaskConcurrency, changefeedID),
captureM: member.NewCaptureManager(captureID, changefeedID, revision, cfg),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
compat: compat.New(map[model.CaptureID]*model.CaptureInfo{}),
}
}

func TestCoordinatorHeartbeat(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
Expand Down Expand Up @@ -180,7 +201,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {

func TestCoordinatorAddCapture(t *testing.T) {
t.Parallel()
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
Expand Down Expand Up @@ -237,7 +258,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
func TestCoordinatorRemoveCapture(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
Expand Down Expand Up @@ -330,7 +351,7 @@ func TestCoordinatorDrainCapture(t *testing.T) {
func TestCoordinatorAdvanceCheckpoint(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
Expand Down Expand Up @@ -425,10 +446,10 @@ func TestCoordinatorDropMsgIfChangefeedEpochMismatch(t *testing.T) {
t.Parallel()

ctx := context.Background()
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("b", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
CollectStatsTick: math.MaxInt,
MaxTaskConcurrency: 1,
MaxTaskConcurrency: 2,
})
coord.captureID = "0"
coord.changefeedEpoch = 1
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func TestInfoProvider(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestInfoProvider(t *testing.T) {
func TestInfoProviderIsInitialized(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down
16 changes: 10 additions & 6 deletions cdc/scheduler/internal/v3/replication/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,21 +807,25 @@ func (r *ReplicationSet) handleAddTable(
zap.Any("replicationSet", r), zap.Int64("tableID", r.TableID))
return nil, nil
}
oldState := r.State
r.State = ReplicationSetStateAbsent
err := r.setCapture(captureID, RoleSecondary)
if err != nil {
return nil, errors.Trace(err)
}
log.Info("schedulerv3: replication state transition, add table",
zap.Any("replicationSet", r),
zap.Stringer("old", oldState), zap.Stringer("new", r.State))
oldState := r.State
status := tablepb.TableStatus{
TableID: r.TableID,
State: tablepb.TableStateAbsent,
Checkpoint: tablepb.Checkpoint{},
}
return r.poll(&status, captureID)
msgs, err := r.poll(&status, captureID)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("schedulerv3: replication state transition, add table",
zap.Any("replicationSet", r),
zap.Stringer("old", oldState), zap.Stringer("new", r.State))
return msgs, nil
}

func (r *ReplicationSet) handleMoveTable(
Expand Down
5 changes: 0 additions & 5 deletions cdc/scheduler/internal/v3/scheduler/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
package scheduler

import (
"math/rand"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member"
Expand All @@ -35,14 +32,12 @@ var _ scheduler = &basicScheduler{}
// 3. Capture offline.
type basicScheduler struct {
batchSize int
random *rand.Rand
changefeedID model.ChangeFeedID
}

func newBasicScheduler(batchSize int, changefeed model.ChangeFeedID) *basicScheduler {
return &basicScheduler{
batchSize: batchSize,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
changefeedID: changefeed,
}
}
Expand Down

0 comments on commit e68b679

Please sign in to comment.