Skip to content

Commit

Permalink
pkg/orchestrator(ticdc): add timeout before remove capture (pingcap#9448
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Aug 2, 2023
1 parent 462cff4 commit 98d4cb1
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 56 deletions.
53 changes: 30 additions & 23 deletions cdc/capture/capture.go
Expand Up @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()

g, stdCtx := errgroup.WithContext(stdCtx)
stdCtx, cancel := context.WithCancel(stdCtx)

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
// when the campaignOwner returns an error, it means that the owner throws
// an unrecoverable serious errors (recoverable errors are intercepted in the owner tick)
Expand All @@ -351,9 +352,20 @@ func (c *captureImpl) run(stdCtx context.Context) error {
})

g.Go(func() error {
// Processor manager should be closed as soon as possible to prevent double write issue.
defer func() {
if cancel != nil {
// Propagate the cancel signal to the owner and other goroutines.
cancel()
}
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand Down Expand Up @@ -467,7 +478,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
owner := c.newOwner(c.upstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -485,27 +496,27 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
})

err = c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
ownerFlushInterval, util.RoleOwner.String())
err = c.runEtcdWorker(ownerCtx, owner, globalState, ownerFlushInterval, util.RoleOwner.String())
c.owner.AsyncStop()
c.setOwner(nil)

// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
if !cerror.ErrNotOwner.Equal(err) {
// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
}
cancel()

log.Info("owner resigned successfully",
zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev))
Expand Down Expand Up @@ -622,10 +633,6 @@ func (c *captureImpl) AsyncClose() {

c.captureMu.Lock()
defer c.captureMu.Unlock()
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))

c.grpcService.Reset(nil)
if c.MessageRouter != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/election.go
Expand Up @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election {
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
func (e *electionImpl) campaign(ctx context.Context, val string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
return e.election.Campaign(ctx, val)
}

func (e *electionImpl) resign(ctx context.Context) error {
Expand Down
8 changes: 0 additions & 8 deletions cdc/owner/owner.go
Expand Up @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() {
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/owner_test.go
Expand Up @@ -120,7 +120,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
o := owner.(*ownerImpl)
o.upstreamManager = upstream.NewManager4Test(pdClient)

state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalStateForTest(etcd.DefaultCDCClusterID)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// set captures
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestUpdateGCSafePoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
tester := orchestrator.NewReactorStateTester(t, state, nil)

// no changefeed, the gc safe point should be max uint64
Expand Down Expand Up @@ -667,7 +667,7 @@ WorkLoop:
}

func TestCalculateGCSafepointTs(t *testing.T) {
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Expand Up @@ -227,6 +227,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID, ctx)
}
log.Info("All processors are closed in processor manager")
// FIXME: we should drain command queue and signal callers an error.
return cerrors.ErrReactorFinished
case commandTpWriteDebugInfo:
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Expand Up @@ -71,7 +71,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
checkpointTs: replicaInfo.StartTs,
}, nil
}, &s.liveness)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
s.state = orchestrator.NewGlobalState(etcd.DefaultCDCClusterID, 0)
captureInfoBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
require.Nil(t, err)
s.tester = orchestrator.NewReactorStateTester(t, s.state, map[string]string{
Expand Down
3 changes: 2 additions & 1 deletion pkg/orchestrator/etcd_worker.go
Expand Up @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err != nil {
// This error means owner is resigned by itself,
// and we should exit etcd worker and campaign owner again.
return nil
return err
}
}

Expand Down Expand Up @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error {
return errors.Trace(err)
}
}
worker.state.UpdatePendingChange()

worker.pendingUpdates = worker.pendingUpdates[:0]
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Expand Up @@ -43,6 +43,9 @@ type bankReactorState struct {

const bankTestPrefix = "/ticdc/test/bank/"

func (b *bankReactorState) UpdatePendingChange() {
}

func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix))
indexStr := key.String()[len(bankTestPrefix):]
Expand Down
9 changes: 9 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Expand Up @@ -147,6 +147,9 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) UpdatePendingChange() {
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
Expand Down Expand Up @@ -283,6 +286,9 @@ type intReactorState struct {
lastVal int
}

func (s *intReactorState) UpdatePendingChange() {
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
Expand Down Expand Up @@ -372,6 +378,9 @@ type commonReactorState struct {
pendingPatches []DataPatch
}

func (s *commonReactorState) UpdatePendingChange() {
}

func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
s.state[key.String()] = string(value)
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/interfaces.go
Expand Up @@ -35,6 +35,9 @@ type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte, isInit bool) error

// UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes.
UpdatePendingChange()

// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
// a slice of DataPatch will be committed as one ETCD txn
Expand Down
57 changes: 45 additions & 12 deletions pkg/orchestrator/reactor_state.go
Expand Up @@ -16,6 +16,7 @@ package orchestrator
import (
"encoding/json"
"reflect"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -26,6 +27,8 @@ import (
"go.uber.org/zap"
)

const defaultCaptureRemoveTTL = 5

// GlobalReactorState represents a global state which stores all key-value pairs in ETCD
type GlobalReactorState struct {
ClusterID string
Expand All @@ -39,16 +42,44 @@ type GlobalReactorState struct {
// to be called when captures are added and removed.
onCaptureAdded func(captureID model.CaptureID, addr string)
onCaptureRemoved func(captureID model.CaptureID)

captureRemoveTTL int
toRemoveCaptures map[model.CaptureID]time.Time
}

// NewGlobalState creates a new global state
func NewGlobalState(clusterID string) *GlobalReactorState {
// NewGlobalState creates a new global state.
func NewGlobalState(clusterID string, captureSessionTTL int) *GlobalReactorState {
captureRemoveTTL := captureSessionTTL / 2
if captureRemoveTTL < defaultCaptureRemoveTTL {
captureRemoveTTL = defaultCaptureRemoveTTL
}
return &GlobalReactorState{
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
captureRemoveTTL: captureRemoveTTL,
toRemoveCaptures: make(map[model.CaptureID]time.Time),
}
}

// NewGlobalStateForTest creates a new global state for test.
func NewGlobalStateForTest(clusterID string) *GlobalReactorState {
return NewGlobalState(clusterID, 0)
}

// UpdatePendingChange implements the ReactorState interface
func (s *GlobalReactorState) UpdatePendingChange() {
for c, t := range s.toRemoveCaptures {
if time.Since(t) >= time.Duration(s.captureRemoveTTL)*time.Second {
log.Info("remote capture offline", zap.Any("info", s.Captures[c]))
delete(s.Captures, c)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(c)
}
delete(s.toRemoveCaptures, c)
}
}
}

Expand All @@ -59,6 +90,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
if err != nil {
return errors.Trace(err)
}

switch k.Tp {
case etcd.CDCKeyTypeOwner:
if value != nil {
Expand All @@ -69,11 +101,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
return nil
case etcd.CDCKeyTypeCapture:
if value == nil {
log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID]))
delete(s.Captures, k.CaptureID)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(k.CaptureID)
}
log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID]))
s.toRemoveCaptures[k.CaptureID] = time.Now()
return nil
}

Expand Down Expand Up @@ -174,6 +203,10 @@ func NewChangefeedReactorState(clusterID string,
}
}

// UpdatePendingChange implements the ReactorState interface
func (s *ChangefeedReactorState) UpdatePendingChange() {
}

// Update implements the ReactorState interface
func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error {
k := new(etcd.CDCKey)
Expand Down

0 comments on commit 98d4cb1

Please sign in to comment.