diff --git a/cdc/api/v2/changefeed_test.go b/cdc/api/v2/changefeed_test.go index 10eaf5a990c..31bcbd4f79f 100644 --- a/cdc/api/v2/changefeed_test.go +++ b/cdc/api/v2/changefeed_test.go @@ -272,7 +272,7 @@ func TestGetChangeFeed(t *testing.T) { statusProvider.changefeedInfo = &model.ChangeFeedInfo{ ID: validID, Error: &model.RunningError{ - Code: string(cerrors.ErrGCTTLExceeded.RFCCode()), + Code: string(cerrors.ErrStartTsBeforeGC.RFCCode()), }, } statusProvider.changefeedStatus = &model.ChangeFeedStatus{ @@ -287,7 +287,7 @@ func TestGetChangeFeed(t *testing.T) { err = json.NewDecoder(w.Body).Decode(&resp) require.Nil(t, err) require.Equal(t, resp.ID, validID) - require.Contains(t, resp.Error.Code, "ErrGCTTLExceeded") + require.Contains(t, resp.Error.Code, "ErrStartTsBeforeGC") // success statusProvider.changefeedInfo = &model.ChangeFeedInfo{ID: validID} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 4c7d7765cb7..2f276a35a03 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -458,26 +458,6 @@ func TestFixState(t *testing.T) { }, expectedState: StateNormal, }, - { - info: &ChangeFeedInfo{ - AdminJobType: AdminNone, - State: StateNormal, - Error: &RunningError{ - Code: string(errors.ErrGCTTLExceeded.RFCCode()), - }, - }, - expectedState: StateFailed, - }, - { - info: &ChangeFeedInfo{ - AdminJobType: AdminResume, - State: StateNormal, - Error: &RunningError{ - Code: string(errors.ErrGCTTLExceeded.RFCCode()), - }, - }, - expectedState: StateFailed, - }, { info: &ChangeFeedInfo{ AdminJobType: AdminNone, diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index c43f683a0de..ae90b5e7079 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -278,7 +278,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs state := c.state.Info.State if state == model.StateNormal || state == model.StateStopped || state == model.StateError { failpoint.Inject("InjectChangefeedFastFailError", func() error { - return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError") + return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError") }) if err := c.upstream.GCManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil { return errors.Trace(err) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 1c4c1592af2..5836c089934 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -36,6 +36,7 @@ const ( // To avoid thunderherd, a random factor is also added. defaultBackoffInitInterval = 10 * time.Second defaultBackoffMaxInterval = 30 * time.Minute + defaultBackoffMaxElapsedTime = 90 * time.Minute defaultBackoffRandomizationFactor = 0.1 defaultBackoffMultiplier = 2.0 @@ -73,8 +74,8 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { f.errBackoff.MaxInterval = defaultBackoffMaxInterval f.errBackoff.Multiplier = defaultBackoffMultiplier f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor - // MaxElapsedTime=0 means the backoff never stops - f.errBackoff.MaxElapsedTime = 0 + // backoff will stop once the defaultBackoffMaxElapsedTime has elapsed. + f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime f.resetErrBackoff() f.lastErrorTime = time.Unix(0, 0) @@ -135,6 +136,7 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm case model.StateError: if m.state.Info.Error.IsChangefeedUnRetryableError() { m.shouldBeRunning = false + m.patchState(model.StateFailed) return } } @@ -536,12 +538,25 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { m.patchState(model.StateError) } else { oldBackoffInterval := m.backoffInterval - // NextBackOff will never return -1 because the backoff never stops - // with `MaxElapsedTime=0` - // ref: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L121-L123 + m.backoffInterval = m.errBackoff.NextBackOff() m.lastErrorTime = time.Unix(0, 0) + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed. + if m.backoffInterval == m.errBackoff.Stop { + log.Warn("The changefeed won't be restarted "+ + "as it has been experiencing failures for "+ + "an extended duration", + zap.Duration( + "maxElapsedTime", + m.errBackoff.MaxElapsedTime, + ), + ) + m.shouldBeRunning = false + m.patchState(model.StateFailed) + return + } + log.Info("changefeed restart backoff interval is changed", zap.String("namespace", m.state.ID.Namespace), zap.String("changefeed", m.state.ID.ID), diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index ea624cb1c39..14b03f70252 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -197,7 +197,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Error: &model.RunningError{ Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "CDC:ErrGCTTLExceeded", + Code: "CDC:ErrStartTsBeforeGC", Message: "fake error for test", }}, true, nil }) @@ -347,7 +347,7 @@ func TestHandleFastFailError(t *testing.T) { func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Error: &model.RunningError{ Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "CDC:ErrGCTTLExceeded", + Code: "CDC:ErrStartTsBeforeGC", Message: "fake error for test", }}, true, nil }) @@ -565,33 +565,37 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { tester.MustApplyPatches() for i := 1; i <= 10; i++ { - require.Equal(t, state.Info.State, model.StateNormal) - require.True(t, manager.ShouldRunning()) - state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, - func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { - return &model.TaskPosition{Error: &model.RunningError{ - Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, - Code: "[CDC:ErrEtcdSessionDone]", - Message: "fake error for test", - }}, true, nil - }) - tester.MustApplyPatches() - manager.Tick(state) - tester.MustApplyPatches() - // after round 8, the maxElapsedTime of backoff will exceed 4000ms, - // and NextBackOff() will return -1, so the changefeed state will - // never turn into error state. if i >= 8 { - require.True(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateNormal) + // after round 8, the maxElapsedTime of backoff will exceed 4000ms, + // and NextBackOff() will return -1, so the changefeed state will + // never turn into error state. + require.Equal(t, state.Info.State, model.StateFailed) + require.False(t, manager.ShouldRunning()) } else { + require.Equal(t, state.Info.State, model.StateNormal) + require.True(t, manager.ShouldRunning()) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) ( + *model.TaskPosition, bool, error, + ) { + return &model.TaskPosition{Error: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrEtcdSessionDone]", + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state) + tester.MustApplyPatches() + // If an error occurs, backing off from running the task. require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateError) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) } - // 500ms is the backoff interval, so sleep 500ms and after a manager tick, - // the changefeed will turn into normal state + + // 500ms is the backoff interval, so sleep 500ms and after a manager + // tick, the changefeed will turn into normal state time.Sleep(500 * time.Millisecond) manager.Tick(state) tester.MustApplyPatches() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index e56cc956628..9b4756d551c 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -775,6 +775,26 @@ func (o *ownerImpl) updateGCSafepoint( return nil } +// ignoreFailedChangeFeedWhenGC checks if a failed changefeed should be ignored +// when calculating the gc safepoint of the associated upstream. +func (o *ownerImpl) ignoreFailedChangeFeedWhenGC( + state *orchestrator.ChangefeedReactorState, +) bool { + upID := state.Info.UpstreamID + us, exist := o.upstreamManager.Get(upID) + if !exist { + log.Warn("upstream not found", zap.Uint64("ID", upID)) + return false + } + // in case the changefeed failed right after it is created + // and the status is not initialized yet. + ts := state.Info.StartTs + if state.Status != nil { + ts = state.Status.CheckpointTs + } + return us.GCManager.IgnoreFailedChangeFeed(ts) +} + // calculateGCSafepoint calculates GCSafepoint for different upstream. // Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster // to prevent upstream TiDB GC from removing data that is still needed by TiCDC. @@ -789,8 +809,13 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState) if changefeedState.Info == nil { continue } + switch changefeedState.Info.State { case model.StateNormal, model.StateStopped, model.StateError: + case model.StateFailed: + if o.ignoreFailedChangeFeedWhenGC(changefeedState) { + continue + } default: continue } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index ab13a09c28e..3803928185e 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -51,7 +51,7 @@ type mockManager struct { func (m *mockManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { - return cerror.ErrGCTTLExceeded.GenWithStackByArgs() + return cerror.ErrStartTsBeforeGC.GenWithStackByArgs() } var _ gc.Manager = (*mockManager)(nil) @@ -185,7 +185,7 @@ func TestCreateRemoveChangefeed(t *testing.T) { Error: nil, } - // this will make changefeed always meet ErrGCTTLExceeded + // this will make changefeed always meet ErrStartTsBeforeGC up, _ := owner.upstreamManager.Get(changefeedInfo.UpstreamID) mockedManager := &mockManager{Manager: up.GCManager} up.GCManager = mockedManager diff --git a/errors.toml b/errors.toml index 21f97152103..cc3785c1ba3 100755 --- a/errors.toml +++ b/errors.toml @@ -396,11 +396,6 @@ error = ''' event is larger than the total memory quota, size: %d, quota: %d ''' -["CDC:ErrGCTTLExceeded"] -error = ''' -the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL -''' - ["CDC:ErrGRPCDialFailed"] error = ''' grpc dial failed diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 782b9c765d5..2fc0d287493 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -752,11 +752,6 @@ var ( " caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC"), ) - ErrGCTTLExceeded = errors.Normalize( - "the checkpoint-ts(%d) lag of the changefeed(%s) "+ - "has exceeded the GC TTL", - errors.RFCCodeText("CDC:ErrGCTTLExceeded"), - ) ErrNotOwner = errors.Normalize( "this capture is not a owner", errors.RFCCodeText("CDC:ErrNotOwner"), diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index bd2cac03ec5..2ddb072a48d 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -39,7 +39,7 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error { // wants to replicate has been or will be GC. So it makes no sense to try to // resume the changefeed, and the changefeed should immediately be failed. var changeFeedFastFailError = []*errors.Error{ - ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC, + ErrSnapshotLostByGC, ErrStartTsBeforeGC, } // IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError diff --git a/pkg/errors/helper_test.go b/pkg/errors/helper_test.go index af88c926aad..880e9a80753 100644 --- a/pkg/errors/helper_test.go +++ b/pkg/errors/helper_test.go @@ -102,26 +102,11 @@ func TestIsRetryableError(t *testing.T) { func TestChangefeedFastFailError(t *testing.T) { t.Parallel() - err := ErrGCTTLExceeded.FastGenByArgs() + err := ErrSnapshotLostByGC.FastGenByArgs() rfcCode, _ := RFCCode(err) require.Equal(t, true, IsChangefeedFastFailError(err)) require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) - err = ErrGCTTLExceeded.GenWithStack("aa") - rfcCode, _ = RFCCode(err) - require.Equal(t, true, IsChangefeedFastFailError(err)) - require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) - - err = ErrGCTTLExceeded.Wrap(errors.New("aa")) - rfcCode, _ = RFCCode(err) - require.Equal(t, true, IsChangefeedFastFailError(err)) - require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) - - err = ErrSnapshotLostByGC.FastGenByArgs() - rfcCode, _ = RFCCode(err) - require.Equal(t, true, IsChangefeedFastFailError(err)) - require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode)) - err = ErrStartTsBeforeGC.FastGenByArgs() rfcCode, _ = RFCCode(err) require.Equal(t, true, IsChangefeedFastFailError(err)) diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 54da159416d..349e32b20b0 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -17,7 +17,6 @@ import ( "context" "time" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -29,6 +28,11 @@ import ( "go.uber.org/zap" ) +// gcTTL is the duration during which data related to a +// failed feed will be retained, and beyond which point the data will be deleted +// by garbage collection. +const gcTTL = 24 * time.Hour + // gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint var gcSafepointUpdateInterval = 1 * time.Minute @@ -39,6 +43,9 @@ type Manager interface { // Set `forceUpdate` to force Manager update. TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error + // IgnoreFailedChangeFeed verifies whether a failed changefeed should be + // disregarded. When calculating the GC safepoint of the related upstream, + IgnoreFailedChangeFeed(checkpointTs uint64) bool } type gcManager struct { @@ -50,7 +57,6 @@ type gcManager struct { lastUpdatedTime time.Time lastSucceededTime time.Time lastSafePointTs uint64 - isTiCDCBlockGC bool } // NewManager creates a new Manager. @@ -97,9 +103,6 @@ func (m *gcManager) TryUpdateGCSafePoint( log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs", zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs)) } - // if the min checkpoint ts is equal to the current gc safe point, - // it means that the service gc safe point set by TiCDC is the min service gc safe point - m.isTiCDCBlockGC = actual == checkpointTs m.lastSafePointTs = actual m.lastSucceededTime = time.Now() return nil @@ -109,20 +112,32 @@ func (m *gcManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { gcSafepointUpperBound := checkpointTs - 1 - if m.isTiCDCBlockGC { - pdTime, err := m.pdClock.CurrentTime() - // TODO: should we return err here, or just log it? - if err != nil { - return errors.Trace(err) - } - if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second { - return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, changefeedID) - } - } else { - // if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts. - if gcSafepointUpperBound < m.lastSafePointTs { - return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs) - } + // if there is another service gc point less than the min checkpoint ts. + if gcSafepointUpperBound < m.lastSafePointTs { + return cerror.ErrSnapshotLostByGC. + GenWithStackByArgs( + checkpointTs, + m.lastSafePointTs, + ) } return nil } + +func (m *gcManager) IgnoreFailedChangeFeed( + checkpointTs uint64, +) bool { + pdTime, err := m.pdClock.CurrentTime() + if err != nil { + log.Warn("failed to get ts", + zap.String("GcManagerID", m.gcServiceID), + zap.Error(err), + ) + return false + } + // ignore the changefeed if its current checkpoint TS is earlier + // than the (currentPDTso - failedFeedDataRetentionTime). + gcSafepointUpperBound := checkpointTs - 1 + return pdTime.Sub( + oracle.GetTimeFromTS(gcSafepointUpperBound), + ) > gcTTL +} diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 10a4c596d9a..fddd5a094cb 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -91,24 +91,40 @@ func TestCheckStaleCheckpointTs(t *testing.T) { pdClock := pdutil.NewClock4Test() gcManager := NewManager(etcd.GcServiceIDForTest(), mockPDClient, pdClock).(*gcManager) - gcManager.isTiCDCBlockGC = true ctx := context.Background() time.Sleep(1 * time.Second) cfID := model.DefaultChangeFeedID("cfID") - err := gcManager.CheckStaleCheckpointTs(ctx, cfID, 10) - require.True(t, cerror.ErrGCTTLExceeded.Equal(errors.Cause(err))) - require.True(t, cerror.IsChangefeedFastFailError(err)) - - err = gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now())) + err := gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now())) require.Nil(t, err) - gcManager.isTiCDCBlockGC = false gcManager.lastSafePointTs = 20 - err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10) - require.True(t, cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err))) require.True(t, cerror.IsChangefeedFastFailError(err)) } + +func TestIgnoreFailedFeed(t *testing.T) { + t.Parallel() + + mockPDClient := &MockPDClient{} + pdClock := pdutil.NewClock4Test() + gcManager := NewManager(etcd.GcServiceIDForTest(), + mockPDClient, pdClock).(*gcManager) + + // 5 hours ago + ts1 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 5)) + ret1 := gcManager.IgnoreFailedChangeFeed(ts1) + require.False(t, ret1) + + // 20 hours ago + ts2 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 20)) + ret2 := gcManager.IgnoreFailedChangeFeed(ts2) + require.False(t, ret2) + + // 25 hours ago + ts3 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 25)) + ret3 := gcManager.IgnoreFailedChangeFeed(ts3) + require.True(t, ret3) +} diff --git a/tests/integration_tests/changefeed_fast_fail/run.sh b/tests/integration_tests/changefeed_fast_fail/run.sh index 183b586515e..2e0ff878b47 100644 --- a/tests/integration_tests/changefeed_fast_fail/run.sh +++ b/tests/integration_tests/changefeed_fast_fail/run.sh @@ -32,7 +32,7 @@ function run() { changefeedid="changefeed-fast-fail" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "ErrGCTTLExceeded" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failed" "ErrStartTsBeforeGC" "" # test changefeed remove result=$(cdc cli changefeed remove -c $changefeedid) diff --git a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh index 8181d1e07ca..d8508c9caf2 100644 --- a/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh +++ b/tests/integration_tests/changefeed_resume_with_checkpoint_ts/run.sh @@ -87,7 +87,7 @@ function resume_changefeed_in_failed_state() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1" run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --pd $pd_addr - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "failed" "ErrGCTTLExceeded" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "failed" "ErrStartTsBeforeGC" "" cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr --overwrite-checkpoint-ts=now --no-confirm=true ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} $changefeed_id "normal" "null" "" diff --git a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh index 62686f15a20..b8b49abfad6 100755 --- a/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh +++ b/tests/integration_tests/csv_storage_multi_tables_ddl/run.sh @@ -58,7 +58,7 @@ function run() { check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_normal "normal" "null" "" check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err1 "normal" "null" "" - check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "error" "ErrSyncRenameTableFailed" "" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" "" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 diff --git a/tests/integration_tests/multi_tables_ddl/run.sh b/tests/integration_tests/multi_tables_ddl/run.sh index 4adbfc73aad..2ca909bf873 100755 --- a/tests/integration_tests/multi_tables_ddl/run.sh +++ b/tests/integration_tests/multi_tables_ddl/run.sh @@ -98,7 +98,7 @@ function run() { check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_normal "normal" "null" "" check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err1 "normal" "null" "" - check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "error" "ErrSyncRenameTableFailed" "" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" $cf_err2 "failed" "ErrSyncRenameTableFailed" "" check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60