diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index de0b8805c71..a4632ace1fd 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -24,12 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/util" - "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/server/v3/mvcc" - "go.uber.org/zap" - "golang.org/x/time/rate" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/cdc/processor" @@ -42,7 +36,12 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" + "go.etcd.io/etcd/client/v3/concurrency" + "go.etcd.io/etcd/server/v3/mvcc" + "go.uber.org/zap" + "golang.org/x/time/rate" ) const cleanMetaDuration = 10 * time.Second @@ -113,9 +112,7 @@ func (c *Capture) reset(ctx context.Context) error { sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(), concurrency.WithTTL(conf.CaptureSessionTTL)) if err != nil { - return errors.Annotate( - cerror.WrapError(cerror.ErrNewCaptureFailed, err), - "create capture session") + return errors.Trace(err) } c.captureMu.Lock() @@ -132,9 +129,7 @@ func (c *Capture) reset(ctx context.Context) error { c.UpstreamManager = upstream.NewManager(ctx) err = c.UpstreamManager.Add(upstream.DefaultUpstreamID, c.pdEndpoints, conf.Security) if err != nil { - return errors.Annotate( - cerror.WrapError(cerror.ErrNewCaptureFailed, err), - "add default upstream failed") + return errors.Trace(err) } c.processorManager = c.newProcessorManager(c.UpstreamManager) @@ -224,17 +219,23 @@ func (c *Capture) Run(ctx context.Context) error { } return errors.Trace(err) } - err = c.reset(ctx) - if err != nil { - return errors.Trace(err) - } err = c.run(ctx) // if capture suicided, reset the capture and run again. // if the canceled error throw, there are two possible scenarios: - // 1. the internal context canceled, it means some error happened in the internal, and the routine is exited, we should restart the capture - // 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block - // TODO: make sure the internal cancel should return the real error instead of context.Canceled - if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) { + // 1. the internal context canceled, it means some error happened in + // the internal, and the routine is exited, we should restart + // the capture. + // 2. the parent context canceled, it means that the caller of + // the capture hope the capture to exit, and this loop will return + // in the above `select` block. + // if there are some **internal** context deadline exceeded (IO/network + // timeout), reset the capture and run again. + // + // TODO: make sure the internal cancel should return the real error + // instead of context.Canceled. + if cerror.ErrCaptureSuicide.Equal(err) || + context.Canceled == errors.Cause(err) || + context.DeadlineExceeded == errors.Cause(err) { log.Info("capture recovered", zap.String("captureID", c.info.ID)) continue } @@ -243,6 +244,12 @@ func (c *Capture) Run(ctx context.Context) error { } func (c *Capture) run(stdCtx context.Context) error { + err := c.reset(stdCtx) + if err != nil { + log.Error("reset capture failed", zap.Error(err)) + return errors.Trace(err) + } + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -251,7 +258,7 @@ func (c *Capture) run(stdCtx context.Context) error { MessageServer: c.MessageServer, MessageRouter: c.MessageRouter, }) - err := c.register(ctx) + err = c.register(ctx) if err != nil { return errors.Trace(err) } diff --git a/cdc/server.go b/cdc/server.go index 555cd2b0143..9d7320bd949 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -145,7 +145,7 @@ func (s *Server) Run(ctx context.Context) error { }, }) if err != nil { - return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client") + return errors.Trace(err) } cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli) diff --git a/errors.toml b/errors.toml index 2a031dd6784..268fe68299c 100755 --- a/errors.toml +++ b/errors.toml @@ -596,11 +596,6 @@ error = ''' MySQL worker panic ''' -["CDC:ErrNewCaptureFailed"] -error = ''' -new capture failed -''' - ["CDC:ErrNewProcessorFailed"] error = ''' new processor failed diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 29ddc5182ef..a5b01522d7a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -561,10 +561,6 @@ var ( "capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"), ) - ErrNewCaptureFailed = errors.Normalize( - "new capture failed", - errors.RFCCodeText("CDC:ErrNewCaptureFailed"), - ) ErrCaptureRegister = errors.Normalize( "capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"),