Skip to content

Commit

Permalink
cdc: retry internal context deadline exceeded (pingcap#8602)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Mar 24, 2023
1 parent 4b60531 commit c6ed8da
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 31 deletions.
49 changes: 28 additions & 21 deletions cdc/capture/capture.go
Expand Up @@ -24,12 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/util"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
Expand All @@ -42,7 +36,12 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

const cleanMetaDuration = 10 * time.Second
Expand Down Expand Up @@ -113,9 +112,7 @@ func (c *Capture) reset(ctx context.Context) error {
sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
return errors.Trace(err)
}

c.captureMu.Lock()
Expand All @@ -132,9 +129,7 @@ func (c *Capture) reset(ctx context.Context) error {
c.UpstreamManager = upstream.NewManager(ctx)
err = c.UpstreamManager.Add(upstream.DefaultUpstreamID, c.pdEndpoints, conf.Security)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"add default upstream failed")
return errors.Trace(err)
}

c.processorManager = c.newProcessorManager(c.UpstreamManager)
Expand Down Expand Up @@ -224,17 +219,23 @@ func (c *Capture) Run(ctx context.Context) error {
}
return errors.Trace(err)
}
err = c.reset(ctx)
if err != nil {
return errors.Trace(err)
}
err = c.run(ctx)
// if capture suicided, reset the capture and run again.
// if the canceled error throw, there are two possible scenarios:
// 1. the internal context canceled, it means some error happened in the internal, and the routine is exited, we should restart the capture
// 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block
// TODO: make sure the internal cancel should return the real error instead of context.Canceled
if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) {
// 1. the internal context canceled, it means some error happened in
// the internal, and the routine is exited, we should restart
// the capture.
// 2. the parent context canceled, it means that the caller of
// the capture hope the capture to exit, and this loop will return
// in the above `select` block.
// if there are some **internal** context deadline exceeded (IO/network
// timeout), reset the capture and run again.
//
// TODO: make sure the internal cancel should return the real error
// instead of context.Canceled.
if cerror.ErrCaptureSuicide.Equal(err) ||
context.Canceled == errors.Cause(err) ||
context.DeadlineExceeded == errors.Cause(err) {
log.Info("capture recovered", zap.String("captureID", c.info.ID))
continue
}
Expand All @@ -243,6 +244,12 @@ func (c *Capture) Run(ctx context.Context) error {
}

func (c *Capture) run(stdCtx context.Context) error {
err := c.reset(stdCtx)
if err != nil {
log.Error("reset capture failed", zap.Error(err))
return errors.Trace(err)
}

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -251,7 +258,7 @@ func (c *Capture) run(stdCtx context.Context) error {
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
})
err := c.register(ctx)
err = c.register(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Expand Up @@ -145,7 +145,7 @@ func (s *Server) Run(ctx context.Context) error {
},
})
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "new etcd client")
return errors.Trace(err)
}

cdcEtcdClient := etcd.NewCDCEtcdClient(ctx, etcdCli)
Expand Down
5 changes: 0 additions & 5 deletions errors.toml
Expand Up @@ -596,11 +596,6 @@ error = '''
MySQL worker panic
'''

["CDC:ErrNewCaptureFailed"]
error = '''
new capture failed
'''

["CDC:ErrNewProcessorFailed"]
error = '''
new processor failed
Expand Down
4 changes: 0 additions & 4 deletions pkg/errors/errors.go
Expand Up @@ -561,10 +561,6 @@ var (
"capture suicide",
errors.RFCCodeText("CDC:ErrCaptureSuicide"),
)
ErrNewCaptureFailed = errors.Normalize(
"new capture failed",
errors.RFCCodeText("CDC:ErrNewCaptureFailed"),
)
ErrCaptureRegister = errors.Normalize(
"capture register to etcd failed",
errors.RFCCodeText("CDC:ErrCaptureRegister"),
Expand Down

0 comments on commit c6ed8da

Please sign in to comment.