Skip to content

Commit

Permalink
sink(cdc): ddl sink errors shouldn't fail changefeed quickly (#9581) (#…
Browse files Browse the repository at this point in the history
…9585)

close #9582
  • Loading branch information
ti-chi-bot committed Aug 17, 2023
1 parent 425b10a commit 55ee2ab
Show file tree
Hide file tree
Showing 10 changed files with 17 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/mounter.go
Expand Up @@ -179,7 +179,7 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
}
return nil, nil
}()
if err != nil && !cerror.IsChangefeedUnRetryableError(err) {
if err != nil && !cerror.ShouldFailChangefeed(err) {
log.Error("failed to mount and unmarshals entry, start to print debug info", zap.Error(err))
snap.PrintStatus(log.Error)
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/errors.go
Expand Up @@ -28,7 +28,7 @@ type RunningError struct {
Message string `json:"message"`
}

// IsChangefeedUnRetryableError return true if a running error contains a changefeed not retry error.
func (r RunningError) IsChangefeedUnRetryableError() bool {
return cerror.IsChangefeedUnRetryableError(errors.New(r.Message + r.Code))
// ShouldFailChangefeed return true if a running error contains a changefeed not retry error.
func (r RunningError) ShouldFailChangefeed() bool {
return cerror.ShouldFailChangefeed(errors.New(r.Message + r.Code))
}
2 changes: 1 addition & 1 deletion cdc/model/errors_test.go
Expand Up @@ -52,6 +52,6 @@ func TestIsChangefeedNotRetryError(t *testing.T) {
}

for _, c := range cases {
require.Equal(t, c.result, c.err.IsChangefeedUnRetryableError())
require.Equal(t, c.result, c.err.ShouldFailChangefeed())
}
}
4 changes: 2 additions & 2 deletions cdc/owner/ddl_sink.go
Expand Up @@ -185,7 +185,7 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f
}
s.sinkV1 = nil
s.sinkV2 = nil
isRetryable := !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled
isRetryable := !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled
log.Warn("owner ddl sink fails on action",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (e
if err == nil {
return nil
}
if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
if !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled {
// TODO(qupeng): retry it internally after async sink syncPoint is ready.
s.reportError(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager.go
Expand Up @@ -514,7 +514,7 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
// and no need to patch other error to the changefeed info
for _, err := range errs {
if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) ||
err.IsChangefeedUnRetryableError() {
err.ShouldFailChangefeed() {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/sinkmanager/manager.go
Expand Up @@ -293,7 +293,8 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Duration("cost", time.Since(start)))
}

if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
// If the error is retryable, we should retry to re-establish the internal resources.
if !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled {
select {
case <-m.managerCtx.Done():
case warnings[0] <- err:
Expand Down
7 changes: 1 addition & 6 deletions cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
Expand Up @@ -99,12 +99,7 @@ func NewMySQLDDLSink(
}

func (m *mysqlDDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
err := m.execDDLWithMaxRetries(ctx, ddl)
// we should not retry changefeed if DDL failed by return an unretryable error.
if !errorutil.IsRetryableDDLError(err) {
return cerror.WrapChangefeedUnretryableErr(err)
}
return errors.Trace(err)
return m.execDDLWithMaxRetries(ctx, ddl)
}

func (m *mysqlDDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/errors/helper.go
Expand Up @@ -83,8 +83,8 @@ var changefeedUnRetryableErrors = []*errors.Error{
ErrStorageSinkInvalidConfig,
}

// IsChangefeedUnRetryableError returns true if an error is a changefeed not retry error.
func IsChangefeedUnRetryableError(err error) bool {
// ShouldFailChangefeed returns true if an error is a changefeed not retry error.
func ShouldFailChangefeed(err error) bool {
for _, e := range changefeedUnRetryableErrors {
if e.Equal(err) {
return true
Expand Down
6 changes: 3 additions & 3 deletions pkg/errors/helper_test.go
Expand Up @@ -118,7 +118,7 @@ func TestChangefeedFastFailError(t *testing.T) {
require.Equal(t, false, IsChangefeedGCFastFailErrorCode(rfcCode))
}

func TestIsChangefeedUnRetryableError(t *testing.T) {
func TestShouldFailChangefeed(t *testing.T) {
t.Parallel()
cases := []struct {
err error
Expand Down Expand Up @@ -175,14 +175,14 @@ func TestIsChangefeedUnRetryableError(t *testing.T) {
}

for _, c := range cases {
require.Equal(t, c.expected, IsChangefeedUnRetryableError(c.err))
require.Equal(t, c.expected, ShouldFailChangefeed(c.err))
}

var code errors.RFCErrorCode
var ok bool
code, ok = RFCCode(ErrChangefeedUnretryable)
require.True(t, ok)
require.True(t, IsChangefeedUnRetryableError(errors.New(string(code))))
require.True(t, ShouldFailChangefeed(errors.New(string(code))))
}

func TestIsCliUnprintableError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/expr_filter.go
Expand Up @@ -415,7 +415,7 @@ func (f *dmlExprFilter) shouldSkipDML(
for _, rule := range rules {
ignore, err := rule.shouldSkipDML(row, rawRow, ti)
if err != nil {
if cerror.IsChangefeedUnRetryableError(err) {
if cerror.ShouldFailChangefeed(err) {
return false, err
}
return false, cerror.WrapError(cerror.ErrFailedToFilterDML, err, row)
Expand Down

0 comments on commit 55ee2ab

Please sign in to comment.