From 80aa4a2426d61f0e96981263d82eea33662bd9ab Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:57:47 +0800 Subject: [PATCH] owner(ticdc): do not resign owner when ErrNotOwner is encountered (#9396) close pingcap/tiflow#9344 --- cdc/capture/capture.go | 45 ++++++++++++++++++++------------- cdc/capture/election.go | 4 +-- cdc/owner/owner.go | 8 ------ pkg/orchestrator/etcd_worker.go | 2 +- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 464057625cf..fdc8af05ac9 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -336,6 +336,8 @@ func (c *captureImpl) run(stdCtx context.Context) error { }() g, stdCtx := errgroup.WithContext(stdCtx) + stdCtx, cancel := context.WithCancel(stdCtx) + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -343,7 +345,6 @@ func (c *captureImpl) run(stdCtx context.Context) error { MessageRouter: c.MessageRouter, SortEngineFactory: c.sortEngineFactory, }) - g.Go(func() error { // when the campaignOwner returns an error, it means that the owner throws // an unrecoverable serious errors (recoverable errors are intercepted in the owner tick) @@ -359,6 +360,17 @@ func (c *captureImpl) run(stdCtx context.Context) error { }) g.Go(func() error { + // Processor manager should be closed as soon as possible to prevent double write issue. + defer func() { + if cancel != nil { + // Propagate the cancel signal to the owner and other goroutines. + cancel() + } + if c.processorManager != nil { + c.processorManager.Close() + } + log.Info("processor manager closed", zap.String("captureID", c.info.ID)) + }() processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval) globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) @@ -427,7 +439,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - rootErr := errors.Cause(err) if rootErr == context.Canceled { return nil @@ -518,21 +529,23 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { c.setController(nil) c.setOwner(nil) - // if owner exits, resign the owner key, - // use a new context to prevent the context from being cancelled. - resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if resignErr := c.resign(resignCtx); resignErr != nil { - if errors.Cause(resignErr) != context.DeadlineExceeded { - log.Info("owner resign failed", zap.String("captureID", c.info.ID), + if !cerror.ErrNotOwner.Equal(err) { + // if owner exits, resign the owner key, + // use a new context to prevent the context from being cancelled. + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if resignErr := c.resign(resignCtx); resignErr != nil { + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("owner resign failed", zap.String("captureID", c.info.ID), + zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() + return errors.Trace(resignErr) + } + + log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) - cancel() - return errors.Trace(resignErr) } - - log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), - zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() } - cancel() log.Info("owner resigned successfully", zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev)) @@ -666,10 +679,6 @@ func (c *captureImpl) Close() { c.captureMu.Lock() defer c.captureMu.Unlock() - if c.processorManager != nil { - c.processorManager.Close() - } - log.Info("processor manager closed", zap.String("captureID", c.info.ID)) c.grpcService.Reset(nil) if c.MessageRouter != nil { diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 9012d78e596..6388b1f0696 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election { } } -func (e *electionImpl) campaign(ctx context.Context, key string) error { +func (e *electionImpl) campaign(ctx context.Context, val string) error { failpoint.Inject("capture-campaign-compacted-error", func() { failpoint.Return(errors.Trace(mvcc.ErrCompacted)) }) - return e.election.Campaign(ctx, key) + return e.election.Campaign(ctx, val) } func (e *electionImpl) resign(ctx context.Context) error { diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 131f8d5c02f..42c88ebf8d8 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -330,14 +330,6 @@ func (o *ownerImpl) updateMetrics() { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID). Set(float64(cf.state.Info.State.ToInt())) } - - // The InfoProvider is a proxy object returning information - // from the scheduler. - infoProvider := cf.GetInfoProvider() - if infoProvider == nil { - // The scheduler has not been initialized yet. - continue - } } } diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index d75dc3a8a80..db690b36389 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err != nil { // This error means owner is resigned by itself, // and we should exit etcd worker and campaign owner again. - return nil + return err } }