Skip to content

Commit

Permalink
pkg/orchestrator(ticdc): add timeout before remove capture (pingcap#9445
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Aug 22, 2023
1 parent 8bb611a commit b216246
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 25 deletions.
8 changes: 3 additions & 5 deletions cdc/capture/capture.go
Expand Up @@ -350,7 +350,7 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -463,7 +463,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.upstreamManager, c.config.Debug.Scheduler)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -481,9 +481,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
})

err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String())
c.owner.AsyncStop()
c.setOwner(nil)

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/owner_test.go
Expand Up @@ -135,7 +135,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Expand Up @@ -63,7 +63,7 @@ func NewManager4Test(
//nolint:unused
func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
s.manager = NewManager4Test(t, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
s.state = orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestrator/etcd_worker.go
Expand Up @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error {
return errors.Trace(err)
}
}
worker.state.UpdatePendingChange()

worker.pendingUpdates = worker.pendingUpdates[:0]
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Expand Up @@ -43,6 +43,9 @@ type bankReactorState struct {

const bankTestPrefix = "/ticdc/test/bank/"

func (b *bankReactorState) UpdatePendingChange() {
}

func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix))
indexStr := key.String()[len(bankTestPrefix):]
Expand Down
9 changes: 9 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Expand Up @@ -147,6 +147,9 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) UpdatePendingChange() {
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
Expand Down Expand Up @@ -283,6 +286,9 @@ type intReactorState struct {
lastVal int
}

func (s *intReactorState) UpdatePendingChange() {
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
Expand Down Expand Up @@ -372,6 +378,9 @@ type commonReactorState struct {
pendingPatches []DataPatch
}

func (s *commonReactorState) UpdatePendingChange() {
}

func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
s.state[key.String()] = string(value)
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/interfaces.go
Expand Up @@ -35,6 +35,9 @@ type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte, isInit bool) error

// UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes.
UpdatePendingChange()

// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
// a slice of DataPatch will be committed as one ETCD txn
Expand Down
57 changes: 45 additions & 12 deletions pkg/orchestrator/reactor_state.go
Expand Up @@ -15,6 +15,7 @@ package orchestrator

import (
"reflect"
"time"

"github.com/goccy/go-json"
"github.com/pingcap/errors"
Expand All @@ -26,6 +27,8 @@ import (
"go.uber.org/zap"
)

const defaultCaptureRemoveTTL = 5

// GlobalReactorState represents a global state which stores all key-value pairs in ETCD
type GlobalReactorState struct {
ClusterID string
Expand All @@ -39,16 +42,44 @@ type GlobalReactorState struct {
// to be called when captures are added and removed.
onCaptureAdded func(captureID model.CaptureID, addr string)
onCaptureRemoved func(captureID model.CaptureID)

captureRemoveTTL int
toRemoveCaptures map[model.CaptureID]time.Time
}

// NewGlobalState creates a new global state
func NewGlobalState(clusterID string) *GlobalReactorState {
// NewGlobalState creates a new global state.
func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState {
captureRemoveTTL := captureSessionTTL / 2
if captureRemoveTTL < defaultCaptureRemoveTTL {
captureRemoveTTL = defaultCaptureRemoveTTL
}
return &GlobalReactorState{
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
captureRemoveTTL: captureRemoveTTL,
toRemoveCaptures: make(map[model.CaptureID]time.Time),
}
}

// NewGlobalStateForTest creates a new global state for test.
func NewGlobalStateForTest(clusterID string) *GlobalReactorState {
return NewGlobalState(clusterID, 0)
}

// UpdatePendingChange implements the ReactorState interface
func (s *GlobalReactorState) UpdatePendingChange() {
for c, t := range s.toRemoveCaptures {
if time.Since(t) >= time.Duration(s.captureRemoveTTL)*time.Second {
log.Info("remote capture offline", zap.Any("info", s.Captures[c]))
delete(s.Captures, c)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(c)
}
delete(s.toRemoveCaptures, c)
}
}
}

Expand All @@ -59,6 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
if err != nil {
return errors.Trace(err)
}

switch k.Tp {
case etcd.CDCKeyTypeOwner:
if value != nil {
Expand All @@ -69,11 +101,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
return nil
case etcd.CDCKeyTypeCapture:
if value == nil {
log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID]))
delete(s.Captures, k.CaptureID)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(k.CaptureID)
}
log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID]))
s.toRemoveCaptures[k.CaptureID] = time.Now()
return nil
}

Expand Down Expand Up @@ -174,6 +203,10 @@ func NewChangefeedReactorState(clusterID string,
}
}

// UpdatePendingChange implements the ReactorState interface
func (s *ChangefeedReactorState) UpdatePendingChange() {
}

// Update implements the ReactorState interface
func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error {
k := new(etcd.CDCKey)
Expand Down
25 changes: 19 additions & 6 deletions pkg/orchestrator/reactor_state_test.go
Expand Up @@ -448,10 +448,13 @@ func TestPatchTaskPosition(t *testing.T) {
}

func TestGlobalStateUpdate(t *testing.T) {
t.Parallel()

testCases := []struct {
updateKey []string
updateValue []string
expected GlobalReactorState
timeout int
}{
{ // common case
updateKey: []string{
Expand Down Expand Up @@ -533,13 +536,14 @@ func TestGlobalStateUpdate(t *testing.T) {
`55551111`,
`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,
"admin-job-type":0}`,
"admin-job-type":0}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,
"admin-job-type":0}`,
"admin-job-type":0}`,
``,
``,
``,
},
timeout: 6,
expected: GlobalReactorState{
ClusterID: etcd.DefaultCDCClusterID,
Owner: map[string]struct{}{"22317526c4fc9a38": {}},
Expand All @@ -561,7 +565,7 @@ func TestGlobalStateUpdate(t *testing.T) {
},
}
for _, tc := range testCases {
state := NewGlobalState(etcd.DefaultCDCClusterID)
state := NewGlobalState(etcd.DefaultCDCClusterID, 10)
for i, k := range tc.updateKey {
value := []byte(tc.updateValue[i])
if len(value) == 0 {
Expand All @@ -570,13 +574,17 @@ func TestGlobalStateUpdate(t *testing.T) {
err := state.Update(util.NewEtcdKey(k), value, false)
require.Nil(t, err)
}
time.Sleep(time.Duration(tc.timeout) * time.Second)
state.UpdatePendingChange()
require.True(t, cmp.Equal(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})),
cmp.Diff(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})))
}
}

func TestCaptureChangeHooks(t *testing.T) {
state := NewGlobalState(etcd.DefaultCDCClusterID)
t.Parallel()

state := NewGlobalState(etcd.DefaultCDCClusterID, 10)

var callCount int
state.onCaptureAdded = func(captureID model.CaptureID, addr string) {
Expand All @@ -600,13 +608,18 @@ func TestCaptureChangeHooks(t *testing.T) {
etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"),
captureInfoBytes, false)
require.Nil(t, err)
require.Equal(t, callCount, 1)
require.Eventually(t, func() bool {
return callCount == 1
}, time.Second*3, 10*time.Millisecond)

err = state.Update(util.NewEtcdKey(
etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"),
nil /* delete */, false)
require.Nil(t, err)
require.Equal(t, callCount, 2)
require.Eventually(t, func() bool {
state.UpdatePendingChange()
return callCount == 2
}, time.Second*10, 10*time.Millisecond)
}

func TestCheckChangefeedNormal(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/server.go
Expand Up @@ -378,6 +378,7 @@ func (m *MessageServer) deregisterPeerByID(ctx context.Context, peerID string) {
m.peerLock.Unlock()
if !ok {
log.Warn("peer not found", zap.String("peerID", peerID))
return
}
m.deregisterPeer(ctx, peer, nil)
}
Expand Down

0 comments on commit b216246

Please sign in to comment.