From ee1e00069982ee4125a7f7d0bd0e2c49b62f357a Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 13 Jul 2023 11:11:58 +0800 Subject: [PATCH 1/4] add error retry limit for ddl sink --- cdc/owner/ddl_sink.go | 11 +++++-- cdc/processor/sinkmanager/manager.go | 37 ++++------------------- cdc/processor/sinkmanager/manager_test.go | 26 ---------------- 3 files changed, 15 insertions(+), 59 deletions(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 5bd978ffc93..d8fcbc9edc0 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" "github.com/pingcap/tiflow/cdc/syncpointstore" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -83,6 +84,7 @@ type ddlSinkImpl struct { changefeedID model.ChangeFeedID info *model.ChangeFeedInfo + sinkRetry *retry.ErrorRetry reportError func(err error) reportWarning func(err error) } @@ -100,6 +102,7 @@ func newDDLSink( changefeedID: changefeedID, info: info, + sinkRetry: retry.NewDefaultErrorRetry(), reportError: reportError, reportWarning: reportWarning, } @@ -170,8 +173,12 @@ func (s *ddlSinkImpl) retrySinkActionWithErrorReport(ctx context.Context, action return err } - // Use a 5 second backoff when re-establishing internal resources. - if err = util.Hang(ctx, 5*time.Second); err != nil { + backoff, err := s.sinkRetry.GetRetryBackoff(err) + if err != nil { + return errors.Trace(err) + } + + if err = util.Hang(ctx, backoff); err != nil { return errors.Trace(err) } } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 17763a140e4..9ec71ae1326 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -16,7 +16,6 @@ package sinkmanager import ( "context" "math" - "math/rand" "sync" "time" @@ -34,6 +33,7 @@ import ( tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink" "github.com/pingcap/tiflow/cdc/sink/tablesink" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" @@ -66,6 +66,8 @@ type sinkRetry struct { lastInternalError error firstRetryTime time.Time lastErrorRetryTime time.Time + maxRetryDuration time.Duration + errGCInterval time.Duration } // SinkManager is the implementation of SinkManager. @@ -106,7 +108,7 @@ type SinkManager struct { sinkWorkerAvailable chan struct{} // sinkMemQuota is used to control the total memory usage of the table sink. sinkMemQuota *memquota.MemQuota - sinkRetry sinkRetry + sinkRetry *retry.ErrorRetry // redoWorkers used to pull data from source manager. redoWorkers []*redoWorker // redoTaskChan is used to send tasks to redoWorkers. @@ -151,11 +153,7 @@ func New( sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum), sinkTaskChan: make(chan *sinkTask), sinkWorkerAvailable: make(chan struct{}, 1), - sinkRetry: sinkRetry{ - lastInternalError: nil, - firstRetryTime: time.Now(), - lastErrorRetryTime: time.Now(), - }, + sinkRetry: retry.NewDefaultErrorRetry(), metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter. WithLabelValues(changefeedID.Namespace, changefeedID.ID), @@ -290,7 +288,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er return errors.Trace(err) } - backoff, err := m.getRetryBackoff(err) + backoff, err := m.sinkRetry.GetRetryBackoff(err) if err != nil { return errors.Trace(err) } @@ -301,29 +299,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er } } -// getRetryBackoff returns the backoff duration for retrying the last error. -// If the retry time is exhausted, it returns the an ChangefeedUnRetryableError. -func (m *SinkManager) getRetryBackoff(err error) (time.Duration, error) { - // reset firstRetryTime when the last error is too long ago - // it means the last error is retry success, and the sink is running well for some time - if m.sinkRetry.lastInternalError == nil || - time.Since(m.sinkRetry.lastErrorRetryTime) >= errGCInterval { - m.sinkRetry.firstRetryTime = time.Now() - } - - // return an unretryable error if retry time is exhausted - if time.Since(m.sinkRetry.firstRetryTime) >= maxRetryDuration { - return 0, cerror.WrapChangefeedUnretryableErr(err) - } - - m.sinkRetry.lastInternalError = err - m.sinkRetry.lastErrorRetryTime = time.Now() - - // interval is in range [5s, 30s) - interval := time.Second * time.Duration(rand.Int63n(25)+5) - return interval, nil -} - func (m *SinkManager) initSinkFactory(errCh chan error) error { m.sinkFactoryMu.Lock() defer m.sinkFactoryMu.Unlock() diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 0ca5a277bb4..85d0cb28919 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -15,7 +15,6 @@ package sinkmanager import ( "context" - "errors" "math" "testing" "time" @@ -357,28 +356,3 @@ func TestSinkManagerRunWithErrors(t *testing.T) { log.Panic("must get an error instead of a timeout") } } - -func TestGetRetryBackoff(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - errCh := make(chan error, 16) - changefeedInfo := getChangefeedInfo() - manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) - defer func() { - cancel() - manager.Close() - }() - - backoff, err := manager.getRetryBackoff(errors.New("test")) - require.NoError(t, err) - require.Less(t, backoff, 30*time.Second) - time.Sleep(500 * time.Millisecond) - elapsedTime := time.Since(manager.sinkRetry.firstRetryTime) - - // mock time to test reset error backoff - manager.sinkRetry.lastErrorRetryTime = time.Unix(0, 0) - _, err = manager.getRetryBackoff(errors.New("test")) - require.NoError(t, err) - require.Less(t, time.Since(manager.sinkRetry.firstRetryTime), elapsedTime) -} From 32aec25e6fe8d34d025123b5203e01f7ca334c28 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 13 Jul 2023 11:12:11 +0800 Subject: [PATCH 2/4] add error retry limit for ddl sink 2 --- pkg/retry/error_retry.go | 87 +++++++++++++++++++++++++++++++++++ pkg/retry/error_retry_test.go | 39 ++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 pkg/retry/error_retry.go create mode 100644 pkg/retry/error_retry_test.go diff --git a/pkg/retry/error_retry.go b/pkg/retry/error_retry.go new file mode 100644 index 00000000000..8bbb4791ec8 --- /dev/null +++ b/pkg/retry/error_retry.go @@ -0,0 +1,87 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math/rand" + "time" + + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +const ( + defaultErrorMaxRetryDuration = 30 * time.Minute + defaultErrGCInterval = 10 * time.Minute + defaultBackoffBaseInS = 5 + defaultBackoffMaxInS = 30 +) + +// ErrorRetry is used to control the error retry logic. +type ErrorRetry struct { + // To control the error retry. + lastInternalError error + firstRetryTime time.Time + lastErrorRetryTime time.Time + maxRetryDuration time.Duration + errGCInterval time.Duration + backoffBase int64 + backoffMax int64 +} + +// NewDefaultErrorRetry creates a new ErrorRetry with default values. +func NewDefaultErrorRetry() *ErrorRetry { + return NewErrorRetry(defaultErrorMaxRetryDuration, + defaultErrGCInterval, + defaultBackoffBaseInS, + defaultBackoffMaxInS) +} + +// NewErrorRetry creates a new ErrorRetry. +func NewErrorRetry( + maxRetryDuration time.Duration, + errGCInterval time.Duration, + backoffBase int64, + backoffMax int64, +) *ErrorRetry { + return &ErrorRetry{ + maxRetryDuration: maxRetryDuration, + errGCInterval: errGCInterval, + backoffBase: backoffBase, + backoffMax: backoffMax, + } +} + +// getRetryBackoff returns the backoff duration for retrying the last error. +// If the retry time is exhausted, it returns the an ChangefeedUnRetryableError. +func (r *ErrorRetry) GetRetryBackoff(err error) (time.Duration, error) { + // reset firstRetryTime when the last error is too long ago + // it means the last error is retry success, and the sink is running well for some time + if r.lastInternalError == nil || + time.Since(r.lastErrorRetryTime) >= r.errGCInterval { + r.firstRetryTime = time.Now() + } + + // return an unretryable error if retry time is exhausted + if time.Since(r.firstRetryTime) >= r.maxRetryDuration { + return 0, cerror.WrapChangefeedUnretryableErr(err) + } + + r.lastInternalError = err + r.lastErrorRetryTime = time.Now() + + // interval is in range [defaultBackoffBaseInS, defaultBackoffMaxInS) + interval := time.Second * time.Duration( + rand.Int63n(defaultBackoffMaxInS-defaultBackoffBaseInS)+defaultBackoffBaseInS) + return interval, nil +} diff --git a/pkg/retry/error_retry_test.go b/pkg/retry/error_retry_test.go new file mode 100644 index 00000000000..a3f8a83c7ea --- /dev/null +++ b/pkg/retry/error_retry_test.go @@ -0,0 +1,39 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package retry + +import ( + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestGetRetryBackoff(t *testing.T) { + t.Parallel() + + r := NewDefaultErrorRetry() + // test retry backoff + backoff, err := r.GetRetryBackoff(errors.New("test")) + require.NoError(t, err) + require.Less(t, backoff, 30*time.Second) + time.Sleep(500 * time.Millisecond) + elapsedTime := time.Since(r.firstRetryTime) + + // mock time to test reset error backoff + r.lastErrorRetryTime = time.Unix(0, 0) + _, err = r.GetRetryBackoff(errors.New("test")) + require.NoError(t, err) + require.Less(t, time.Since(r.firstRetryTime), elapsedTime) +} From fba5e3375d8dc6a70ad9923241f5a9c0145b7bdd Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 13 Jul 2023 14:49:22 +0800 Subject: [PATCH 3/4] remove useless struct --- cdc/processor/sinkmanager/manager.go | 9 --------- pkg/filter/filter_test.go | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 9ec71ae1326..32aa313edf0 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -61,15 +61,6 @@ type TableStats struct { BarrierTs model.Ts } -type sinkRetry struct { - // To control the error retry. - lastInternalError error - firstRetryTime time.Time - lastErrorRetryTime time.Time - maxRetryDuration time.Duration - errGCInterval time.Duration -} - // SinkManager is the implementation of SinkManager. type SinkManager struct { changefeedID model.ChangeFeedID diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index e4c53b35e68..ed55410fc8d 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -68,6 +68,20 @@ func TestShouldUseCustomRules(t *testing.T) { require.False(t, filter.ShouldIgnoreTable("school", "student")) require.True(t, filter.ShouldIgnoreTable("school", "teacher")) require.Nil(t, err) + + filter, err = NewFilter(&config.ReplicaConfig{ + Filter: &config.FilterConfig{ + // 1. match all schema and table + // 2. do not match test.season + // 3. match all table of schema school + // 4. do not match table school.teacher + Rules: []string{"*.*", "!test.t1", "!test.t2"}, + }, + }, "") + require.False(t, filter.ShouldIgnoreTable("test", "season")) + require.True(t, filter.ShouldIgnoreTable("test", "t1")) + require.True(t, filter.ShouldIgnoreTable("test", "t2")) + require.Nil(t, err) } func TestShouldIgnoreDMLEvent(t *testing.T) { From 454d94c1fba73af5f1c8698ae776d209637d3ae4 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 13 Jul 2023 14:52:12 +0800 Subject: [PATCH 4/4] fix make check --- pkg/retry/error_retry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/retry/error_retry.go b/pkg/retry/error_retry.go index 8bbb4791ec8..0f6d251214e 100644 --- a/pkg/retry/error_retry.go +++ b/pkg/retry/error_retry.go @@ -62,7 +62,7 @@ func NewErrorRetry( } } -// getRetryBackoff returns the backoff duration for retrying the last error. +// GetRetryBackoff returns the backoff duration for retrying the last error. // If the retry time is exhausted, it returns the an ChangefeedUnRetryableError. func (r *ErrorRetry) GetRetryBackoff(err error) (time.Duration, error) { // reset firstRetryTime when the last error is too long ago