From 425b10a9db2911ec2592cf7368356316024f6f93 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 16 Aug 2023 23:00:30 +0800 Subject: [PATCH] sink(cdc): fix internal retry algothrim (#9530) (#9572) ref pingcap/tiflow#9272, close pingcap/tiflow#9518 --- cdc/api/v1/api.go | 7 +- cdc/api/v2/changefeed.go | 1 + cdc/api/v2/changefeed_test.go | 23 +- cdc/api/v2/model.go | 21 ++ cdc/api/v2/model_test.go | 24 ++ cdc/api/v2/processor.go | 2 +- cdc/api/v2/processor_test.go | 2 +- cdc/model/changefeed.go | 25 +- cdc/model/changefeed_test.go | 4 +- cdc/model/http_model.go | 6 + cdc/model/http_model_test.go | 4 +- cdc/owner/changefeed.go | 2 +- cdc/owner/ddl_sink.go | 62 +++- cdc/owner/feed_state_manager.go | 316 +++++++++++------- cdc/owner/feed_state_manager_test.go | 189 ++++++----- cdc/owner/owner_test.go | 2 +- cdc/processor/sinkmanager/manager.go | 14 +- metrics/alertmanager/ticdc.rules.yml | 25 ++ metrics/grafana/ticdc.json | 22 +- pkg/cmd/cli/cli_changefeed_list_test.go | 18 +- pkg/errors/helper_test.go | 6 + pkg/migrate/migrate_test.go | 4 +- pkg/retry/error_retry.go | 105 ++++++ .../integration_tests/changefeed_error/run.sh | 4 +- .../kafka_sink_error_resume/run.sh | 2 +- 25 files changed, 647 insertions(+), 243 deletions(-) create mode 100644 pkg/retry/error_retry.go diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index a1ec1145f8b..e8d607aed9c 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -174,13 +174,18 @@ func (h *OpenAPI) ListChangefeed(c *gin.Context) { } resp.FeedState = cfInfo.State - resp.RunningError = cfInfo.Error + if cfInfo.Error != nil { + resp.RunningError = cfInfo.Error + } else { + resp.RunningError = cfInfo.Warning + } if cfStatus != nil { resp.CheckpointTSO = cfStatus.CheckpointTs tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs) resp.CheckpointTime = model.JSONTime(tm) } + log.Info("List changefeed successfully!", zap.Any("info", resp)) resps = append(resps, resp) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 237891fb5ca..e91efe355aa 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -218,6 +218,7 @@ func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) { } else { commonInfo.RunningError = cfInfo.Warning } + log.Info("List changefeed successfully!", zap.Any("runningError", commonInfo.RunningError)) // if the state is normal, we shall not return the error info // because changefeed will is retrying. errors will confuse the users diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 99c0465ff5b..2c7a01179f1 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -521,16 +521,24 @@ func TestListChangeFeeds(t *testing.T) { State: model.StateNormal, }, model.DefaultChangeFeedID("cf2"): { - State: model.StateError, + State: model.StateWarning, }, model.DefaultChangeFeedID("cf3"): { State: model.StateStopped, }, + model.DefaultChangeFeedID("cf4"): { + State: model.StatePending, + }, + model.DefaultChangeFeedID("cf5"): { + State: model.StateFinished, + }, }, changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{ model.DefaultChangeFeedID("cf1"): {}, model.DefaultChangeFeedID("cf2"): {}, model.DefaultChangeFeedID("cf3"): {}, + model.DefaultChangeFeedID("cf4"): {}, + model.DefaultChangeFeedID("cf5"): {}, }, } cp.EXPECT().StatusProvider().Return(provider1).AnyTimes() @@ -549,10 +557,17 @@ func TestListChangeFeeds(t *testing.T) { resp := ListResponse[model.ChangefeedCommonInfo]{} err := json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) - require.Equal(t, 3, resp.Total) + require.Equal(t, 5, resp.Total) // changefeed info must be sorted by ID require.Equal(t, true, sorted(resp.Items)) - + warningChangefeedCount := 0 + for _, cf := range resp.Items { + if cf.FeedState == model.StateWarning { + warningChangefeedCount++ + } + require.NotEqual(t, model.StatePending, cf.FeedState) + } + require.Equal(t, 2, warningChangefeedCount) // case 2: only list changefeed with state 'normal', 'stopped' and 'failed' metaInfo2 := testCase{ url: "/api/v2/changefeeds", @@ -568,7 +583,7 @@ func TestListChangeFeeds(t *testing.T) { resp2 := ListResponse[model.ChangefeedCommonInfo]{} err = json.NewDecoder(w.Body).Decode(&resp2) require.Nil(t, err) - require.Equal(t, 2, resp2.Total) + require.Equal(t, 4, resp2.Total) // changefeed info must be sorted by ID require.Equal(t, true, sorted(resp2.Items)) } diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index c172452aa70..e7246eb6bcb 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -100,6 +100,27 @@ type ChangefeedCommonInfo struct { RunningError *model.RunningError `json:"error"` } +// MarshalJSON marshal changefeed common info to json +// we need to set feed state to normal if it is uninitialized and pending to warning +// to hide the detail of uninitialized and pending state from user +func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error) { + // alias the original type to prevent recursive call of MarshalJSON + type Alias ChangefeedCommonInfo + + if c.FeedState == model.StateUnInitialized { + c.FeedState = model.StateNormal + } + if c.FeedState == model.StatePending { + c.FeedState = model.StateWarning + } + + return json.Marshal(struct { + Alias + }{ + Alias: Alias(c), + }) +} + // ChangefeedConfig use by create changefeed api type ChangefeedConfig struct { Namespace string `json:"namespace"` diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index e0105accd33..9382356a670 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -21,6 +21,7 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" filter "github.com/pingcap/tidb/util/table-filter" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/redo" "github.com/stretchr/testify/require" @@ -208,3 +209,26 @@ func TestEventFilterRuleConvert(t *testing.T) { require.Equal(t, c.inRule, c.apiRule.ToInternalEventFilterRule()) } } + +func TestMarshalChangefeedCommonInfo(t *testing.T) { + t.Parallel() + + cfInfo := &ChangefeedCommonInfo{ + ID: "test-id", + FeedState: model.StatePending, + } + + cfInfoJSON, err := json.Marshal(cfInfo) + require.Nil(t, err) + require.False(t, strings.Contains(string(cfInfoJSON), "pending")) + require.True(t, strings.Contains(string(cfInfoJSON), "warning")) + + cfInfo = &ChangefeedCommonInfo{ + ID: "test-id", + FeedState: model.StateUnInitialized, + } + + cfInfoJSON, err = json.Marshal(cfInfo) + require.Nil(t, err) + require.True(t, strings.Contains(string(cfInfoJSON), "normal")) +} diff --git a/cdc/api/v2/processor.go b/cdc/api/v2/processor.go index 51a32282bc8..0945214fb10 100644 --- a/cdc/api/v2/processor.go +++ b/cdc/api/v2/processor.go @@ -64,7 +64,7 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) { _ = c.Error(err) return } - if info.State != model.StateNormal { + if !info.State.IsRunning() { _ = c.Error( cerror.WrapError( cerror.ErrAPIInvalidParam, diff --git a/cdc/api/v2/processor_test.go b/cdc/api/v2/processor_test.go index 81157bc1ef4..a9f991f96f6 100644 --- a/cdc/api/v2/processor_test.go +++ b/cdc/api/v2/processor_test.go @@ -94,7 +94,7 @@ func TestGetProcessor(t *testing.T) { { provider := &mockStatusProvider{ changefeedInfo: &model.ChangeFeedInfo{ - State: model.StateError, + State: model.StatePending, }, } cp := mock_capture.NewMockCapture(gomock.NewController(t)) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index ae7ede02274..359f5a19f19 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -78,11 +78,15 @@ type FeedState string // All FeedStates const ( StateNormal FeedState = "normal" - StateError FeedState = "error" + StatePending FeedState = "pending" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" StateRemoved FeedState = "removed" StateFinished FeedState = "finished" + StateWarning FeedState = "warning" + // StateUnInitialized is used for the changefeed that has not been initialized + // it only exists in memory for a short time and will not be persisted to storage + StateUnInitialized FeedState = "" ) // ToInt return an int for each `FeedState`, only use this for metrics. @@ -90,7 +94,7 @@ func (s FeedState) ToInt() int { switch s { case StateNormal: return 0 - case StateError: + case StatePending: return 1 case StateFailed: return 2 @@ -100,6 +104,10 @@ func (s FeedState) ToInt() int { return 4 case StateRemoved: return 5 + case StateWarning: + return 6 + case StateUnInitialized: + return 7 } // -1 for unknown feed state return -1 @@ -118,11 +126,20 @@ func (s FeedState) IsNeeded(need string) bool { return true case StateFailed: return true + case StateWarning: + return true + case StatePending: + return true } } return need == string(s) } +// IsRunning return true if the feedState represents a running state. +func (s FeedState) IsRunning() bool { + return s == StateNormal || s == StateWarning +} + // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { UpstreamID uint64 `json:"upstream-id"` @@ -183,7 +200,7 @@ func ValidateNamespace(namespace string) error { // Note: if the changefeed is failed by GC, it should not block the GC safepoint. func (info *ChangeFeedInfo) NeedBlockGC() bool { switch info.State { - case StateNormal, StateStopped, StateError: + case StateNormal, StateStopped, StatePending, StateWarning: return true case StateFailed: return !info.isFailedByGC() @@ -356,7 +373,7 @@ func (info *ChangeFeedInfo) fixState() { if cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { state = StateFailed } else { - state = StateError + state = StateWarning } } case AdminStop: diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index a2b2f5923ce..c1dd632b375 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -465,7 +465,7 @@ func TestFixState(t *testing.T) { Code: string(errors.ErrClusterIDMismatch.RFCCode()), }, }, - expectedState: StateError, + expectedState: StateWarning, }, { info: &ChangeFeedInfo{ @@ -475,7 +475,7 @@ func TestFixState(t *testing.T) { Code: string(errors.ErrClusterIDMismatch.RFCCode()), }, }, - expectedState: StateError, + expectedState: StateWarning, }, { info: &ChangeFeedInfo{ diff --git a/cdc/model/http_model.go b/cdc/model/http_model.go index 7b3efa1b8cb..7898fc04a69 100644 --- a/cdc/model/http_model.go +++ b/cdc/model/http_model.go @@ -122,6 +122,12 @@ func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error) { if c.FeedState == StateNormal { c.RunningError = nil } + if c.FeedState == StateUnInitialized { + c.FeedState = StateNormal + } + if c.FeedState == StatePending { + c.FeedState = StateWarning + } return json.Marshal(struct { Alias }{ diff --git a/cdc/model/http_model_test.go b/cdc/model/http_model_test.go index a751a4ab32a..f3e2e37abab 100644 --- a/cdc/model/http_model_test.go +++ b/cdc/model/http_model_test.go @@ -42,7 +42,7 @@ func TestChangefeedCommonInfoMarshalJSON(t *testing.T) { require.NotContains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode())) // when state is not normal, the error code is exist - cfInfo.FeedState = StateError + cfInfo.FeedState = StateWarning cfInfoJSON, err = json.Marshal(cfInfo) require.Nil(t, err) require.Contains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode())) @@ -68,7 +68,7 @@ func TestChangefeedDetailMarshalJSON(t *testing.T) { require.NotContains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode())) // when state is not normal, the error code is exist - cfDetail.FeedState = StateError + cfDetail.FeedState = StateWarning cfInfoJSON, err = json.Marshal(cfDetail) require.Nil(t, err) require.Contains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode())) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index fa1ab53a489..4954a9afd50 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -285,7 +285,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs } func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error { - adminJobPending := c.feedStateManager.Tick(c.state) + adminJobPending := c.feedStateManager.Tick(c.state, c.resolvedTs) preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status) // checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()` // to ensure all changefeeds, no matter whether they are running or not, will be checked. diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 094661b0f5a..64adfce4036 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -15,6 +15,7 @@ package owner import ( "context" + "fmt" "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/factory" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -90,6 +92,7 @@ type ddlSinkImpl struct { changefeedID model.ChangeFeedID info *model.ChangeFeedInfo + sinkRetry *retry.ErrorRetry reportError func(err error) reportWarning func(err error) } @@ -107,7 +110,7 @@ func newDDLSink( changefeedID: changefeedID, info: info, - errCh: make(chan error, defaultErrChSize), + sinkRetry: retry.NewInfiniteErrorRetry(), reportError: reportError, reportWarning: reportWarning, } @@ -175,27 +178,57 @@ func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error { } // retry the given action with 5s interval. Before every retry, s.sink will be re-initialized. -func (s *ddlSinkImpl) retrySinkActionWithErrorReport(ctx context.Context, action func() error) (err error) { +func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action func() error) (err error) { for { if err = action(); err == nil { return nil } s.sinkV1 = nil s.sinkV2 = nil - if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled { + isRetryable := !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled + log.Warn("owner ddl sink fails on action", + zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID), + zap.String("action", name), + zap.Bool("retryable", isRetryable), + zap.Error(err)) + + if isRetryable { s.reportWarning(err) } else { s.reportError(err) return err } - // Use a 5 second backoff when re-establishing internal resources. - if err = util.Hang(ctx, 5*time.Second); err != nil { + backoff, err := s.sinkRetry.GetRetryBackoff(err) + if err != nil { + return errors.New(fmt.Sprintf("GetRetryBackoff: %s", err.Error())) + } + + if err = util.Hang(ctx, backoff); err != nil { return errors.Trace(err) } } } +func (s *ddlSinkImpl) observedRetrySinkAction(ctx context.Context, name string, action func() error) (err error) { + errCh := make(chan error, 1) + go func() { errCh <- s.retrySinkAction(ctx, name, action) }() + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case err := <-errCh: + return err + case <-ticker.C: + log.Info("owner ddl sink performs an action too long", + zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID), + zap.String("action", name)) + } + } +} + func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *model.Ts) error { doWrite := func() (err error) { s.mu.Lock() @@ -221,7 +254,7 @@ func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *m return } - return s.retrySinkActionWithErrorReport(ctx, doWrite) + return s.observedRetrySinkAction(ctx, "writeCheckpointTs", doWrite) } func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { @@ -256,7 +289,8 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er } return } - return s.retrySinkActionWithErrorReport(ctx, doWrite) + + return s.observedRetrySinkAction(ctx, "writeDDLEvent", doWrite) } func (s *ddlSinkImpl) run(ctx context.Context) { @@ -264,19 +298,29 @@ func (s *ddlSinkImpl) run(ctx context.Context) { s.wg.Add(1) go func() { - defer s.wg.Done() + var err error + log.Info("owner ddl sink background loop is started", + zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID)) + defer func() { + s.wg.Done() + log.Info("owner ddl sink background loop exits", + zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID), + zap.Error(err)) + }() // TODO make the tick duration configurable ticker := time.NewTicker(time.Second) defer ticker.Stop() var lastCheckpointTs model.Ts - var err error for { // `ticker.C` and `ddlCh` may can be triggered at the same time, it // does not matter which one emit first, since TiCDC allow DDL with // CommitTs equal to the last CheckpointTs be emitted later. select { case <-ctx.Done(): + err = ctx.Err() return case <-ticker.C: if err = s.writeCheckpointTs(ctx, &lastCheckpointTs); err != nil { diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 6312adcc65f..54ff5dc438a 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -35,8 +35,8 @@ const ( // 640s, 1280s, 1800s, ...). // To avoid thunderherd, a random factor is also added. defaultBackoffInitInterval = 10 * time.Second - defaultBackoffMaxInterval = 30 * time.Minute - defaultBackoffMaxElapsedTime = 90 * time.Minute + defaultBackoffMaxInterval = 10 * time.Minute + defaultBackoffMaxElapsedTime = 30 * time.Minute defaultBackoffRandomizationFactor = 0.1 defaultBackoffMultiplier = 2.0 @@ -49,19 +49,30 @@ const ( // feedStateManager manages the ReactorState of a changefeed // when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState type feedStateManager struct { - upstream *upstream.Upstream - state *orchestrator.ChangefeedReactorState + upstream *upstream.Upstream + state *orchestrator.ChangefeedReactorState + shouldBeRunning bool // Based on shouldBeRunning = false // shouldBeRemoved = true means the changefeed is removed // shouldBeRemoved = false means the changefeed is paused shouldBeRemoved bool - adminJobQueue []*model.AdminJob - stateHistory [defaultStateWindowSize]model.FeedState - lastErrorTime time.Time // time of last error for a changefeed - backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state - errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed + adminJobQueue []*model.AdminJob + stateHistory [defaultStateWindowSize]model.FeedState + lastErrorRetryTime time.Time // time of last error for a changefeed + lastErrorRetryCheckpointTs model.Ts // checkpoint ts of last retry + lastWarningReportCheckpointTs model.Ts // checkpoint ts of last warning report + backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state + errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed + + // resolvedTs and initCheckpointTs is for checking whether resolved timestamp + // has been advanced or not. + resolvedTs model.Ts + initCheckpointTs model.Ts + + checkpointTsAdvanced time.Time + lastCheckpointTs model.Ts } // newFeedStateManager creates feedStateManager and initialize the exponential backoff @@ -77,18 +88,11 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { // backoff will stop once the defaultBackoffMaxElapsedTime has elapsed. f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime - f.resetErrBackoff() - f.lastErrorTime = time.Unix(0, 0) + f.resetErrRetry() return f } -// resetErrBackoff reset the backoff-related fields -func (m *feedStateManager) resetErrBackoff() { - m.errBackoff.Reset() - m.backoffInterval = m.errBackoff.NextBackOff() -} - // isChangefeedStable check if there are states other than 'normal' in this sliding window. func (m *feedStateManager) isChangefeedStable() bool { for _, val := range m.stateHistory { @@ -109,14 +113,30 @@ func (m *feedStateManager) shiftStateWindow(state model.FeedState) { m.stateHistory[defaultStateWindowSize-1] = state } -func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adminJobPending bool) { +func (m *feedStateManager) Tick( + state *orchestrator.ChangefeedReactorState, + resolvedTs model.Ts, +) (adminJobPending bool) { + if state.Status != nil { + if m.lastCheckpointTs < state.Status.CheckpointTs { + m.lastCheckpointTs = state.Status.CheckpointTs + m.checkpointTsAdvanced = time.Now() + } + if m.state == nil || m.state.Status == nil { + // It's the first time `m.state.Status` gets filled. + m.initCheckpointTs = state.Status.CheckpointTs + } + } + + m.shiftStateWindow(state.Info.State) + m.checkAndInitLastRetryCheckpointTs(state.Status) + m.state = state + m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { - if m.shouldBeRunning { - m.patchState(model.StateNormal) - } else { - m.cleanUpInfos() + if !m.shouldBeRunning { + m.cleanUpTaskPositions() } }() if m.handleAdminJob() { @@ -126,6 +146,9 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm return } switch m.state.Info.State { + case model.StateUnInitialized: + m.patchState(model.StateNormal) + return case model.StateRemoved: m.shouldBeRunning = false m.shouldBeRemoved = true @@ -133,22 +156,55 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm case model.StateStopped, model.StateFailed, model.StateFinished: m.shouldBeRunning = false return - case model.StateError: - if m.state.Info.Error.IsChangefeedUnRetryableError() { + case model.StatePending: + if time.Since(m.lastErrorRetryTime) < m.backoffInterval { + m.shouldBeRunning = false + return + } + // retry the changefeed + oldBackoffInterval := m.backoffInterval + m.backoffInterval = m.errBackoff.NextBackOff() + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, + // set the changefeed to failed state. + if m.backoffInterval == m.errBackoff.Stop { + log.Warn("The changefeed won't be restarted "+ + "as it has been experiencing failures for "+ + "an extended duration", + zap.Duration( + "maxElapsedTime", + m.errBackoff.MaxElapsedTime, + ), + ) m.shouldBeRunning = false m.patchState(model.StateFailed) return } - } - errs := m.errorsReportedByProcessors() - m.handleError(errs...) - // only handle warnings when there are no errors - // otherwise, the warnings will cover the errors - if len(errs) == 0 { - // warning are come from processors' sink component - // they ere not fatal errors, so we don't need to stop the changefeed - warnings := m.warningsReportedByProcessors() - m.handleWarning(warnings...) + + m.lastErrorRetryTime = time.Now() + if m.state.Status != nil { + m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs + } + m.shouldBeRunning = true + m.patchState(model.StateWarning) + log.Info("changefeed retry backoff interval is elapsed,"+ + "chengefeed will be restarted", + zap.String("namespace", m.state.ID.Namespace), + zap.String("changefeed", m.state.ID.ID), + zap.Time("lastErrorRetryTime", m.lastErrorRetryTime), + zap.Duration("lastRetryInterval", oldBackoffInterval), + zap.Duration("nextRetryInterval", m.backoffInterval)) + case model.StateNormal, model.StateWarning: + m.checkAndChangeState() + errs := m.errorsReportedByProcessors() + m.handleError(errs...) + // only handle warnings when there are no errors + // otherwise, the warnings will cover the errors + if len(errs) == 0 { + // warning are come from processors' sink component + // they ere not fatal errors, so we don't need to stop the changefeed + warnings := m.warningsReportedByProcessors() + m.handleWarning(warnings...) + } } return } @@ -196,7 +252,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { switch job.Type { case model.AdminStop: switch m.state.Info.State { - case model.StateNormal, model.StateError: + case model.StateNormal, model.StateWarning, model.StatePending: default: log.Warn("can not pause the changefeed in the current state", zap.String("namespace", m.state.ID.Namespace), @@ -208,17 +264,6 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { jobsPending = true m.patchState(model.StateStopped) case model.AdminRemove: - switch m.state.Info.State { - case model.StateNormal, model.StateError, model.StateFailed, - model.StateStopped, model.StateFinished, model.StateRemoved: - default: - log.Warn("can not remove the changefeed in the current state", - zap.String("namespace", m.state.ID.Namespace), - zap.String("changefeed", m.state.ID.ID), - zap.String("changefeedState", string(m.state.Info.State)), zap.Any("job", job)) - return - } - m.shouldBeRunning = false m.shouldBeRemoved = true jobsPending = true @@ -244,7 +289,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { zap.Uint64("checkpointTs", checkpointTs)) case model.AdminResume: switch m.state.Info.State { - case model.StateFailed, model.StateError, model.StateStopped, model.StateFinished: + case model.StateFailed, model.StateStopped, model.StateFinished: default: log.Warn("can not resume the changefeed in the current state", zap.String("namespace", m.state.ID.Namespace), @@ -254,9 +299,8 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { } m.shouldBeRunning = true // when the changefeed is manually resumed, we must reset the backoff - m.resetErrBackoff() + m.resetErrRetry() // The lastErrorTime also needs to be cleared before a fresh run. - m.lastErrorTime = time.Unix(0, 0) jobsPending = true m.patchState(model.StateNormal) @@ -299,7 +343,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { case model.AdminFinish: switch m.state.Info.State { - case model.StateNormal: + case model.StateNormal, model.StateWarning: default: log.Warn("can not finish the changefeed in the current state", zap.String("namespace", m.state.ID.Namespace), @@ -335,13 +379,13 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { var updateEpoch bool var adminJobType model.AdminJobType switch feedState { - case model.StateNormal: + case model.StateNormal, model.StateWarning: adminJobType = model.AdminNone updateEpoch = false case model.StateFinished: adminJobType = model.AdminFinish updateEpoch = true - case model.StateError, model.StateStopped, model.StateFailed: + case model.StatePending, model.StateStopped, model.StateFailed: adminJobType = model.AdminStop updateEpoch = true case model.StateRemoved: @@ -389,7 +433,7 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpInfos() { +func (m *feedStateManager) cleanUpTaskPositions() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil @@ -463,10 +507,14 @@ func (m *feedStateManager) warningsReportedByProcessors() []*model.RunningError } func (m *feedStateManager) handleError(errs ...*model.RunningError) { + if len(errs) == 0 { + return + } // if there are a fastFail error in errs, we can just fastFail the changefeed // and no need to patch other error to the changefeed info for _, err := range errs { - if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) { + if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) || + err.IsChangefeedUnRetryableError() { m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil @@ -490,96 +538,76 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { return } - // we need to patch changefeed unretryable error to the changefeed info, - // so we have to iterate all errs here to check wether it is a unretryable - // error in errs - for _, err := range errs { - if err.IsChangefeedUnRetryableError() { - m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - if info == nil { - return nil, false, nil - } - info.Error = err - return info, true, nil - }) - m.shouldBeRunning = false - m.patchState(model.StateError) - return + var lastError *model.RunningError + // find the last non nil error + // BTW, there shouldn't be any nil error in errs + // this is just a safe guard + for i := len(errs) - 1; i >= 0; i-- { + if errs[i] != nil { + lastError = errs[i] + break } } + if lastError != nil { + log.Warn("changefeed meets an error", zap.Any("error", lastError)) + m.shouldBeRunning = false + m.patchState(model.StatePending) - m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - if info == nil { - return nil, false, nil - } - for _, err := range errs { - info.Error = err - } - return info, len(errs) > 0, nil - }) + // patch the last error to changefeed info + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + if info == nil { + return nil, false, nil + } + info.Error = lastError + return info, true, nil + }) + } - // If we enter into an abnormal state ('error', 'failed') for this changefeed now + // If we enter into an abnormal state 'error' for this changefeed now // but haven't seen abnormal states in a sliding window (512 ticks), // it can be assumed that this changefeed meets a sudden change from a stable condition. // So we can reset the exponential backoff and re-backoff from the InitialInterval. // TODO: this detection policy should be added into unit test. - if len(errs) > 0 { - m.lastErrorTime = time.Now() - if m.isChangefeedStable() { - m.resetErrBackoff() - } - } else { - if m.state.Info.State == model.StateNormal { - m.lastErrorTime = time.Unix(0, 0) - } + if m.isChangefeedStable() { + m.resetErrRetry() } - m.shiftStateWindow(m.state.Info.State) +} - if m.lastErrorTime == time.Unix(0, 0) { +func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { + if len(errs) == 0 { return } - - if time.Since(m.lastErrorTime) < m.backoffInterval { - m.shouldBeRunning = false - m.patchState(model.StateError) - } else { - oldBackoffInterval := m.backoffInterval - - m.backoffInterval = m.errBackoff.NextBackOff() - m.lastErrorTime = time.Unix(0, 0) - - // NextBackOff() will return -1 once the MaxElapsedTime has elapsed. - if m.backoffInterval == m.errBackoff.Stop { - log.Warn("The changefeed won't be restarted "+ - "as it has been experiencing failures for "+ - "an extended duration", - zap.Duration( - "maxElapsedTime", - m.errBackoff.MaxElapsedTime, - ), - ) - m.shouldBeRunning = false - m.patchState(model.StateFailed) + lastError := errs[len(errs)-1] + log.Warn("changefeed meets an warning", zap.Any("warning", lastError)) + if m.state.Status != nil { + currTime := m.upstream.PDClock.CurrentTime() + ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs) + m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs + // Conditions: + // 1. checkpoint lag is large enough; + // 2. checkpoint hasn't been advanced for a long while; + // 3. the changefeed has been initialized. + if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime && + time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime && + m.resolvedTs > m.initCheckpointTs { + code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable) + m.handleError(&model.RunningError{ + Time: lastError.Time, + Addr: lastError.Addr, + Code: string(code), + Message: lastError.Message, + }) return } - - log.Info("changefeed restart backoff interval is changed", - zap.String("namespace", m.state.ID.Namespace), - zap.String("changefeed", m.state.ID.ID), - zap.Duration("oldInterval", oldBackoffInterval), - zap.Duration("newInterval", m.backoffInterval)) } -} -func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { + m.patchState(model.StateWarning) m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { if info == nil { return nil, false, nil } - for _, err := range errs { - info.Warning = err - } - return info, len(errs) > 0, nil + info.Warning = lastError + return info, true, nil }) } @@ -592,3 +620,43 @@ func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64 { } return oracle.ComposeTS(phyTs, logical) } + +// resetErrRetry reset the error retry related fields +func (m *feedStateManager) resetErrRetry() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorRetryTime = time.Unix(0, 0) +} + +// checkAndChangeState checks the state of the changefeed and change it if needed. +// if the state of the changefeed is warning and the changefeed's checkpointTs is +// greater than the lastRetryCheckpointTs, it will change the state to normal. +func (m *feedStateManager) checkAndChangeState() { + if m.state.Info == nil || m.state.Status == nil { + return + } + if m.state.Info.State == model.StateWarning && + m.state.Status.CheckpointTs > m.lastErrorRetryCheckpointTs && + m.state.Status.CheckpointTs > m.lastWarningReportCheckpointTs { + log.Info("changefeed is recovered from warning state,"+ + "its checkpointTs is greater than lastRetryCheckpointTs,"+ + "it will be changed to normal state", + zap.String("changefeed", m.state.ID.ID), + zap.String("namespace", m.state.ID.Namespace), + zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), + zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs)) + m.patchState(model.StateNormal) + } +} + +// checkAndInitLastRetryCheckpointTs checks the lastRetryCheckpointTs and init it if needed. +// It the owner is changed, the lastRetryCheckpointTs will be reset to 0, and we should init +// it to the checkpointTs of the changefeed when the changefeed is ticked at the first time. +func (m *feedStateManager) checkAndInitLastRetryCheckpointTs(status *model.ChangeFeedStatus) { + if status == nil || m.lastErrorRetryCheckpointTs != 0 { + return + } + m.lastWarningReportCheckpointTs = status.CheckpointTs + m.lastErrorRetryCheckpointTs = status.CheckpointTs + log.Info("init lastRetryCheckpointTs", zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs)) +} diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 14b03f70252..e91a8d8bc02 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -62,8 +62,7 @@ func newFeedStateManager4Test( f.errBackoff.Multiplier = multiplier f.errBackoff.RandomizationFactor = 0 - f.resetErrBackoff() - f.lastErrorTime = time.Unix(0, 0) + f.resetErrRetry() return f } @@ -83,7 +82,7 @@ func TestHandleJob(t *testing.T) { return &model.ChangeFeedStatus{}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -92,7 +91,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID("fake-changefeed-id"), Type: model.AdminStop, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -101,7 +100,7 @@ func TestHandleJob(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -110,7 +109,7 @@ func TestHandleJob(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminStop, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -124,7 +123,7 @@ func TestHandleJob(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -137,7 +136,7 @@ func TestHandleJob(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminRemove, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -160,7 +159,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { return &model.ChangeFeedStatus{}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -169,7 +168,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminStop, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -184,7 +183,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { Type: model.AdminResume, OverwriteCheckpointTs: 100, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -202,7 +201,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.Equal(t, state.Info.State, model.StateFailed) require.Equal(t, state.Info.AdminJobType, model.AdminStop) @@ -214,7 +213,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { Type: model.AdminResume, OverwriteCheckpointTs: 200, }) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -238,12 +237,12 @@ func TestMarkFinished(t *testing.T) { return &model.ChangeFeedStatus{}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) manager.MarkFinished() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -272,12 +271,12 @@ func TestCleanUpInfos(t *testing.T) { }) tester.MustApplyPatches() require.Contains(t, state.TaskPositions, ctx.GlobalVars().CaptureInfo.ID) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) manager.MarkFinished() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateFinished) @@ -298,11 +297,13 @@ func TestHandleError(t *testing.T) { }) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.Nil(t, status) - return &model.ChangeFeedStatus{}, true, nil + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() intervals := []time.Duration{200, 400, 800, 1600, 1600} @@ -321,16 +322,45 @@ func TestHandleError(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateError) + require.Equal(t, state.Info.State, model.StatePending) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) time.Sleep(d) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() } + + // no error tick, state should be transferred from pending to warning + manager.Tick(state, 0) + require.True(t, manager.ShouldRunning()) + require.Equal(t, model.StateWarning, state.Info.State) + require.Equal(t, model.AdminNone, state.Info.AdminJobType) + require.Equal(t, model.AdminNone, state.Status.AdminJobType) + + // no error tick and checkpointTs is progressing, + // state should be transferred from warning to normal + state.PatchStatus( + func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + status.CheckpointTs += 1 + return status, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 0) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + state.PatchStatus( + func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + status.CheckpointTs += 1 + return status, true, nil + }) + manager.Tick(state, 0) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.Equal(t, model.AdminNone, state.Info.AdminJobType) + require.Equal(t, model.AdminNone, state.Status.AdminJobType) } func TestHandleFastFailError(t *testing.T) { @@ -352,7 +382,7 @@ func TestHandleFastFailError(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) // test handling fast failed error with non-nil ChangeFeedInfo tester.MustApplyPatches() // test handling fast failed error with nil ChangeFeedInfo @@ -360,7 +390,7 @@ func TestHandleFastFailError(t *testing.T) { state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return nil, true, nil }) - manager.Tick(state) + manager.Tick(state, 0) // When the patches are applied, the callback function of PatchInfo in feedStateManager.HandleError will be called. // At that time, the nil pointer will be checked instead of throwing a panic. See issue #3128 for more detail. tester.MustApplyPatches() @@ -441,7 +471,7 @@ func TestChangefeedStatusNotExist(t *testing.T) { etcd.DefaultClusterAndMetaPrefix, ): "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", }) - manager.Tick(state) + manager.Tick(state, 0) require.False(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) tester.MustApplyPatches() @@ -450,7 +480,7 @@ func TestChangefeedStatusNotExist(t *testing.T) { CfID: ctx.ChangefeedVars().ID, Type: model.AdminRemove, }) - manager.Tick(state) + manager.Tick(state, 0) require.False(t, manager.ShouldRunning()) require.True(t, manager.ShouldRemoved()) tester.MustApplyPatches() @@ -471,7 +501,7 @@ func TestChangefeedNotRetry(t *testing.T) { return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}, State: model.StateNormal}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) require.True(t, manager.ShouldRunning()) // changefeed in error state but error can be retried @@ -479,7 +509,7 @@ func TestChangefeedNotRetry(t *testing.T) { return &model.ChangeFeedInfo{ SinkURI: "123", Config: &config.ReplicaConfig{}, - State: model.StateError, + State: model.StateWarning, Error: &model.RunningError{ Addr: "127.0.0.1", Code: "CDC:ErrPipelineTryAgain", @@ -489,57 +519,58 @@ func TestChangefeedNotRetry(t *testing.T) { }, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) require.True(t, manager.ShouldRunning()) - // changefeed in error state and error can't be retried - state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - return &model.ChangeFeedInfo{ - SinkURI: "123", - Config: &config.ReplicaConfig{}, - State: model.StateError, - Error: &model.RunningError{ + state.PatchTaskPosition("test", + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + if position == nil { + position = &model.TaskPosition{} + } + position.Error = &model.RunningError{ Addr: "127.0.0.1", - Code: "CDC:ErrExpressionColumnNotFound", - Message: "what ever", - }, - }, true, nil - }) + Code: string(cerror.ErrExpressionColumnNotFound.RFCCode()), + Message: cerror.ErrExpressionColumnNotFound.Error(), + } + return position, true, nil + }) + tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) require.False(t, manager.ShouldRunning()) - state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - return &model.ChangeFeedInfo{ - SinkURI: "123", - Config: &config.ReplicaConfig{}, - State: model.StateError, - Error: &model.RunningError{ + state.PatchTaskPosition("test", + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + if position == nil { + position = &model.TaskPosition{} + } + position.Error = &model.RunningError{ Addr: "127.0.0.1", Code: string(cerror.ErrExpressionColumnNotFound.RFCCode()), Message: cerror.ErrExpressionColumnNotFound.Error(), - }, - }, true, nil - }) + } + return position, true, nil + }) + tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) // should be false require.False(t, manager.ShouldRunning()) - state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - return &model.ChangeFeedInfo{ - SinkURI: "123", - Config: &config.ReplicaConfig{}, - State: model.StateError, - Error: &model.RunningError{ + state.PatchTaskPosition("test", + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + if position == nil { + position = &model.TaskPosition{} + } + position.Error = &model.RunningError{ Addr: "127.0.0.1", Code: string(cerror.ErrExpressionParseFailed.RFCCode()), Message: cerror.ErrExpressionParseFailed.Error(), - }, - }, true, nil - }) + } + return position, true, nil + }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) // should be false require.False(t, manager.ShouldRunning()) } @@ -561,7 +592,7 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() for i := 1; i <= 10; i++ { @@ -572,7 +603,11 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { require.Equal(t, state.Info.State, model.StateFailed) require.False(t, manager.ShouldRunning()) } else { - require.Equal(t, state.Info.State, model.StateNormal) + if i == 1 { + require.Equal(t, model.StateNormal, state.Info.State) + } else { + require.Equal(t, model.StateWarning, state.Info.State) + } require.True(t, manager.ShouldRunning()) state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) ( @@ -585,11 +620,11 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() // If an error occurs, backing off from running the task. require.False(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateError) + require.Equal(t, model.StatePending, state.Info.State) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) } @@ -597,7 +632,7 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { // 500ms is the backoff interval, so sleep 500ms and after a manager // tick, the changefeed will turn into normal state time.Sleep(500 * time.Millisecond) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() } } @@ -619,11 +654,15 @@ func TestBackoffNeverStops(t *testing.T) { }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() for i := 1; i <= 30; i++ { - require.Equal(t, state.Info.State, model.StateNormal) + if i == 1 { + require.Equal(t, model.StateNormal, state.Info.State) + } else { + require.Equal(t, model.StateWarning, state.Info.State) + } require.True(t, manager.ShouldRunning()) state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { @@ -634,16 +673,16 @@ func TestBackoffNeverStops(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateError) + require.Equal(t, model.StatePending, state.Info.State) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) // 100ms is the backoff interval, so sleep 100ms and after a manager tick, // the changefeed will turn into normal state time.Sleep(100 * time.Millisecond) - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() } } @@ -665,7 +704,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.Equal(t, state.Info.State, model.StateNormal) require.True(t, manager.ShouldRunning()) @@ -685,10 +724,10 @@ func TestUpdateChangefeedEpoch(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(state) + manager.Tick(state, 0) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateError) + require.Equal(t, model.StatePending, state.Info.State, i) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index c1d4574a371..56319eb1acb 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -687,7 +687,7 @@ func TestCalculateGCSafepointTs(t *testing.T) { o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)} stateMap := []model.FeedState{ - model.StateNormal, model.StateStopped, model.StateError, + model.StateNormal, model.StateStopped, model.StatePending, model.StateFailed, /* failed changefeed with normal error should not be ignored */ } for i := 0; i < 100; i++ { diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 395b423421f..fd782322f62 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -15,6 +15,7 @@ package sinkmanager import ( "context" + "fmt" "math" "sync" "time" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/tiflow/cdc/sinkv2/tablesink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -102,6 +104,8 @@ type SinkManager struct { sinkWorkerAvailable chan struct{} // sinkMemQuota is used to control the total memory usage of the table sink. sinkMemQuota *memquota.MemQuota + // sinkRetry is used to control the retry behavior of the table sink. + sinkRetry *retry.ErrorRetry // redoWorkers used to pull data from source manager. redoWorkers []*redoWorker @@ -151,6 +155,7 @@ func New( sinkWorkers: make([]*sinkWorker, 0, sinkWorkerNum), sinkTaskChan: make(chan *sinkTask), sinkWorkerAvailable: make(chan struct{}, 1), + sinkRetry: retry.NewInfiniteErrorRetry(), metricsTableSinkTotalRows: metricsTableSinkTotalRows, } @@ -296,8 +301,13 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er } else { return errors.Trace(err) } - // Use a 5 second backoff when re-establishing internal resources. - if err = util.Hang(m.managerCtx, 5*time.Second); err != nil { + + backoff, err := m.sinkRetry.GetRetryBackoff(err) + if err != nil { + return errors.New(fmt.Sprintf("GetRetryBackoff: %s", err.Error())) + } + + if err = util.Hang(m.managerCtx, backoff); err != nil { return errors.Trace(err) } } diff --git a/metrics/alertmanager/ticdc.rules.yml b/metrics/alertmanager/ticdc.rules.yml index e9ed7f59b22..15166fd809b 100644 --- a/metrics/alertmanager/ticdc.rules.yml +++ b/metrics/alertmanager/ticdc.rules.yml @@ -37,6 +37,31 @@ groups: value: '{{ $value }}' summary: cdc processor checkpoint delay more than 10 minutes + # changefeed related alter rules + - alert: ticdc_changefeed_failed + expr: (max_over_time(ticdc_owner_status[1m]) == 2) > 0 + for: 1m + labels: + env: ENV_LABELS_ENV + level: critical + expr: (max_over_time(ticdc_owner_status[1m]) == 2) > 0 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: cdc changefeed failed, it can not be automatically resumed + + - alert: ticdc_changefeed_meet_error + expr: (max_over_time(ticdc_owner_status[1m]) == 1 or max_over_time(ticdc_owner_status[1m]) == 6) > 0 + for: 1m + labels: + env: ENV_LABELS_ENV + level: warning + expr: (max_over_time(ticdc_owner_status[1m]) == 1 or max_over_time(ticdc_owner_status[1m]) == 6 ) > 0 + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}' + value: '{{ $value }}' + summary: cdc changefeed meet error + - alert: ticdc_mounter_unmarshal_and_mount_time_more_than_1s expr: histogram_quantile(0.9, rate(ticdc_mounter_unmarshal_and_mount_bucket[1m])) * 1000 > 1000 for: 1m diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 724527838c1..0db62779487 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -4260,7 +4260,7 @@ }, { "datasource": "${DS_TEST-CLUSTER}", - "description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", + "description": "The status of each changefeed.\n\n0: Normal\n\n1 and 6: Warning\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", "fieldConfig": { "defaults": { "color": { @@ -4299,7 +4299,7 @@ { "from": "", "id": 2, - "text": "Error", + "text": "Warning", "to": "", "type": 1, "value": "1" @@ -4331,22 +4331,30 @@ { "from": "", "id": 6, + "text": "Warning", + "to": "", + "type": 1, + "value": "6" + }, + { + "from": "", + "id": 7, "text": "Unknown", "to": "", "type": 1, "value": "-1" }, { - "from": "5", - "id": 7, + "from": "7", + "id": 8, "text": "Other", "to": "10000", "type": 1, - "value": "5" + "value": "7" }, { - "from": "6", - "id": 8, + "from": "7", + "id": 9, "text": "-", "to": "1000", "type": 2 diff --git a/pkg/cmd/cli/cli_changefeed_list_test.go b/pkg/cmd/cli/cli_changefeed_list_test.go index 8f5e99fe719..b43a4c2eb06 100644 --- a/pkg/cmd/cli/cli_changefeed_list_test.go +++ b/pkg/cmd/cli/cli_changefeed_list_test.go @@ -39,10 +39,10 @@ func TestChangefeedListCli(t *testing.T) { { UpstreamID: 1, Namespace: "default", - ID: "error-1", + ID: "pending-1", CheckpointTime: model.JSONTime{}, RunningError: nil, - FeedState: model.StateError, + FeedState: model.StateWarning, }, { UpstreamID: 1, @@ -84,28 +84,38 @@ func TestChangefeedListCli(t *testing.T) { RunningError: nil, FeedState: model.StateStopped, }, + { + UpstreamID: 1, + Namespace: "default", + ID: "warning-7", + CheckpointTime: model.JSONTime{}, + RunningError: nil, + FeedState: model.StateWarning, + }, }, nil).Times(2) // when --all=false, should contains StateNormal, StateError, StateFailed, StateStopped changefeed os.Args = []string{"list", "--all=false"} require.Nil(t, cmd.Execute()) out, err := io.ReadAll(b) require.Nil(t, err) - require.Contains(t, string(out), "error-1") + require.Contains(t, string(out), "pending-1") require.Contains(t, string(out), "normal-2") require.Contains(t, string(out), "stopped-6") require.Contains(t, string(out), "failed-3") + require.Contains(t, string(out), "warning-7") // when --all=true, should contains all changefeed os.Args = []string{"list", "--all=true"} require.Nil(t, cmd.Execute()) out, err = io.ReadAll(b) require.Nil(t, err) - require.Contains(t, string(out), "error-1") + require.Contains(t, string(out), "pending-1") require.Contains(t, string(out), "normal-2") require.Contains(t, string(out), "failed-3") require.Contains(t, string(out), "removed-4") require.Contains(t, string(out), "finished-5") require.Contains(t, string(out), "stopped-6") + require.Contains(t, string(out), "warning-7") cf.EXPECT().List(gomock.Any(), gomock.Any()).Return(nil, errors.New("changefeed list test error")) o := newListChangefeedOptions() diff --git a/pkg/errors/helper_test.go b/pkg/errors/helper_test.go index de5377ef99b..b5b130df871 100644 --- a/pkg/errors/helper_test.go +++ b/pkg/errors/helper_test.go @@ -177,6 +177,12 @@ func TestIsChangefeedUnRetryableError(t *testing.T) { for _, c := range cases { require.Equal(t, c.expected, IsChangefeedUnRetryableError(c.err)) } + + var code errors.RFCErrorCode + var ok bool + code, ok = RFCCode(ErrChangefeedUnretryable) + require.True(t, ok) + require.True(t, IsChangefeedUnRetryableError(errors.New(string(code)))) } func TestIsCliUnprintableError(t *testing.T) { diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 8b156f142a7..cba6b2c5e12 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -80,7 +80,7 @@ func TestMigration(t *testing.T) { status1 := model.ChangeFeedStatus{CheckpointTs: 1} info2 := model.ChangeFeedInfo{ SinkURI: "test1", - StartTs: 2, TargetTs: 200, State: model.StateError, + StartTs: 2, TargetTs: 200, State: model.StateWarning, } status2 := model.ChangeFeedStatus{CheckpointTs: 2} cfg := config.GetDefaultReplicaConfig() @@ -359,7 +359,7 @@ func TestMigrationNonDefaultCluster(t *testing.T) { status1 := model.ChangeFeedStatus{CheckpointTs: 1} info2 := model.ChangeFeedInfo{ SinkURI: "test1", - StartTs: 2, TargetTs: 200, State: model.StateError, + StartTs: 2, TargetTs: 200, State: model.StateWarning, } status2 := model.ChangeFeedStatus{CheckpointTs: 2} info3 := model.ChangeFeedInfo{ diff --git a/pkg/retry/error_retry.go b/pkg/retry/error_retry.go new file mode 100644 index 00000000000..3e0128e5854 --- /dev/null +++ b/pkg/retry/error_retry.go @@ -0,0 +1,105 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" + "math/rand" + "time" + + "github.com/pingcap/log" + cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +const ( + defaultErrorMaxRetryDuration = 30 * time.Minute + defaultErrGCInterval = 10 * time.Minute + defaultBackoffBaseInS = 5 + defaultBackoffMaxInS = 30 +) + +// ErrorRetry is used to control the error retry logic. +type ErrorRetry struct { + // To control the error retry. + lastInternalError error + firstRetryTime time.Time + lastErrorRetryTime time.Time + maxRetryDuration time.Duration + errGCInterval time.Duration + backoffBase int64 + backoffMax int64 +} + +// NewDefaultErrorRetry creates a new ErrorRetry with default values. +func NewDefaultErrorRetry() *ErrorRetry { + return NewErrorRetry(defaultErrorMaxRetryDuration, + defaultErrGCInterval, + defaultBackoffBaseInS, + defaultBackoffMaxInS) +} + +// NewInfiniteErrorRetry creates a new ErrorRetry with infinite duration. +func NewInfiniteErrorRetry() *ErrorRetry { + return NewErrorRetry(time.Duration(math.MaxInt64), + defaultErrGCInterval, + defaultBackoffBaseInS, + defaultBackoffMaxInS) +} + +// NewErrorRetry creates a new ErrorRetry. +func NewErrorRetry( + maxRetryDuration time.Duration, + errGCInterval time.Duration, + backoffBase int64, + backoffMax int64, +) *ErrorRetry { + return &ErrorRetry{ + maxRetryDuration: maxRetryDuration, + errGCInterval: errGCInterval, + backoffBase: backoffBase, + backoffMax: backoffMax, + } +} + +// GetRetryBackoff returns the backoff duration for retrying the last error. +// If the retry time is exhausted, it returns the an ChangefeedUnRetryableError. +func (r *ErrorRetry) GetRetryBackoff(err error) (time.Duration, error) { + // reset firstRetryTime when the last error is too long ago + // it means the last error is retry success, and the sink is running well for some time + if r.lastInternalError == nil || + time.Since(r.lastErrorRetryTime) >= r.errGCInterval { + log.Debug("reset firstRetryTime", + zap.Time("lastErrorRetryTime", r.lastErrorRetryTime), + zap.Time("now", time.Now())) + r.firstRetryTime = time.Now() + } + + // return an unretryable error if retry time is exhausted + if time.Since(r.firstRetryTime) >= r.maxRetryDuration { + log.Debug("error retry exhausted", + zap.Time("firstRetryTime", r.firstRetryTime), + zap.Time("lastErrorRetryTime", r.lastErrorRetryTime), + zap.Time("now", time.Now())) + return 0, cerror.WrapChangefeedUnretryableErr(err) + } + + r.lastInternalError = err + r.lastErrorRetryTime = time.Now() + + // interval is in range [defaultBackoffBaseInS, defaultBackoffMaxInS) + interval := time.Second * time.Duration( + rand.Int63n(defaultBackoffMaxInS-defaultBackoffBaseInS)+defaultBackoffBaseInS) + return interval, nil +} diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index b81ce6d9c30..abaf3d6b556 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -70,7 +70,7 @@ function run() { ensure $MAX_RETRIES "check_etcd_meta_not_exist '/tidb/cdc/default/__cdc_meta__/owner' 'owner'" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "error" "failpoint injected retriable error" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "warning" "failpoint injected retriable error" "" run_cdc_cli changefeed remove -c $changefeedid ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1} @@ -86,7 +86,7 @@ function run() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid_1 run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);" - ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 normal last_warning ErrExecDDLFailed + ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeedid_1 warning last_warning ErrExecDDLFailed run_cdc_cli changefeed remove -c $changefeedid_1 cleanup_process $CDC_BINARY diff --git a/tests/integration_tests/kafka_sink_error_resume/run.sh b/tests/integration_tests/kafka_sink_error_resume/run.sh index bff234fcc96..97e3a528c42 100755 --- a/tests/integration_tests/kafka_sink_error_resume/run.sh +++ b/tests/integration_tests/kafka_sink_error_resume/run.sh @@ -38,7 +38,7 @@ function run() { run_sql "CREATE table kafka_sink_error_resume.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO kafka_sink_error_resume.t1 VALUES ();" - ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeed_id "normal" "last_warning" "kafka sink injected error" + ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeed_id "warning" "last_warning" "kafka sink injected error" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeed_id "normal"