Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc (ticdc): optimize the algorithm calculating gc safepoint (#8464) #8887

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestGetChangeFeed(t *testing.T) {
statusProvider.changefeedInfo = &model.ChangeFeedInfo{
ID: validID,
Error: &model.RunningError{
Code: string(cerrors.ErrGCTTLExceeded.RFCCode()),
Code: string(cerrors.ErrStartTsBeforeGC.RFCCode()),
},
}
statusProvider.changefeedStatus = &model.ChangeFeedStatus{
Expand All @@ -298,7 +298,7 @@ func TestGetChangeFeed(t *testing.T) {
err = json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, resp.ID, validID)
require.Contains(t, resp.Error.Code, "ErrGCTTLExceeded")
require.Contains(t, resp.Error.Code, "ErrStartTsBeforeGC")

// success
statusProvider.changefeedInfo = &model.ChangeFeedInfo{ID: validID}
Expand Down
20 changes: 0 additions & 20 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,26 +458,6 @@ func TestFixState(t *testing.T) {
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(errors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(errors.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError")
})
if err := c.upstream.GCManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
Expand Down
25 changes: 20 additions & 5 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

Expand Down Expand Up @@ -73,8 +74,8 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
// MaxElapsedTime=0 means the backoff never stops
f.errBackoff.MaxElapsedTime = 0
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)
Expand Down Expand Up @@ -135,6 +136,7 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) (adm
case model.StateError:
if m.state.Info.Error.IsChangefeedUnRetryableError() {
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}
}
Expand Down Expand Up @@ -502,12 +504,25 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
m.patchState(model.StateError)
} else {
oldBackoffInterval := m.backoffInterval
// NextBackOff will never return -1 because the backoff never stops
// with `MaxElapsedTime=0`
// ref: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L121-L123

m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// NextBackOff() will return -1 once the MaxElapsedTime has elapsed.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}

log.Info("changefeed restart backoff interval is changed",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
Expand Down
48 changes: 26 additions & 22 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "CDC:ErrGCTTLExceeded",
Code: "CDC:ErrStartTsBeforeGC",
Message: "fake error for test",
}}, true, nil
})
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestHandleFastFailError(t *testing.T) {
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "CDC:ErrGCTTLExceeded",
Code: "CDC:ErrStartTsBeforeGC",
Message: "fake error for test",
}}, true, nil
})
Expand Down Expand Up @@ -565,33 +565,37 @@ func TestBackoffStopsUnexpectedly(t *testing.T) {
tester.MustApplyPatches()

for i := 1; i <= 10; i++ {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
if i >= 8 {
require.True(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateNormal)
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
require.Equal(t, state.Info.State, model.StateFailed)
require.False(t, manager.ShouldRunning())
} else {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (
*model.TaskPosition, bool, error,
) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// If an error occurs, backing off from running the task.
require.False(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateError)
require.Equal(t, state.Info.AdminJobType, model.AdminStop)
require.Equal(t, state.Status.AdminJobType, model.AdminStop)
}
// 500ms is the backoff interval, so sleep 500ms and after a manager tick,
// the changefeed will turn into normal state

// 500ms is the backoff interval, so sleep 500ms and after a manager
// tick, the changefeed will turn into normal state
time.Sleep(500 * time.Millisecond)
manager.Tick(state)
tester.MustApplyPatches()
Expand Down
25 changes: 25 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,26 @@ func (o *ownerImpl) updateGCSafepoint(
return nil
}

// ignoreFailedChangeFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
func (o *ownerImpl) ignoreFailedChangeFeedWhenGC(
state *orchestrator.ChangefeedReactorState,
) bool {
upID := state.Info.UpstreamID
us, exist := o.upstreamManager.Get(upID)
if !exist {
log.Warn("upstream not found", zap.Uint64("ID", upID))
return false
}
// in case the changefeed failed right after it is created
// and the status is not initialized yet.
ts := state.Info.StartTs
if state.Status != nil {
ts = state.Status.CheckpointTs
}
return us.GCManager.IgnoreFailedChangeFeed(ts)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
Expand All @@ -714,8 +734,13 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
if changefeedState.Info == nil {
continue
}

switch changefeedState.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
case model.StateFailed:
if o.ignoreFailedChangeFeedWhenGC(changefeedState) {
continue
}
default:
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type mockManager struct {
func (m *mockManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs()
return cerror.ErrStartTsBeforeGC.GenWithStackByArgs()
}

var _ gc.Manager = (*mockManager)(nil)
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestCreateRemoveChangefeed(t *testing.T) {
Error: nil,
}

// this will make changefeed always meet ErrGCTTLExceeded
// this will make changefeed always meet ErrStartTsBeforeGC
up, _ := owner.upstreamManager.Get(changefeedInfo.UpstreamID)
mockedManager := &mockManager{Manager: up.GCManager}
up.GCManager = mockedManager
Expand Down
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,6 @@ error = '''
flow controller is aborted
'''

["CDC:ErrGCTTLExceeded"]
error = '''
the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL
'''

["CDC:ErrGRPCDialFailed"]
error = '''
grpc dial failed
Expand Down
5 changes: 0 additions & 5 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,6 @@ var (
" caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d",
errors.RFCCodeText("CDC:ErrSnapshotLostByGC"),
)
ErrGCTTLExceeded = errors.Normalize(
"the checkpoint-ts(%d) lag of the changefeed(%s) "+
"has exceeded the GC TTL",
errors.RFCCodeText("CDC:ErrGCTTLExceeded"),
)
ErrNotOwner = errors.Normalize(
"this capture is not a owner",
errors.RFCCodeText("CDC:ErrNotOwner"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error {
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var changeFeedFastFailError = []*errors.Error{
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError
Expand Down
17 changes: 1 addition & 16 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,11 @@ func TestIsRetryableError(t *testing.T) {

func TestChangefeedFastFailError(t *testing.T) {
t.Parallel()
err := ErrGCTTLExceeded.FastGenByArgs()
err := ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ := RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrGCTTLExceeded.GenWithStack("aa")
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrGCTTLExceeded.Wrap(errors.New("aa"))
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
require.Equal(t, true, IsChangefeedFastFailErrorCode(rfcCode))

err = ErrStartTsBeforeGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
require.Equal(t, true, IsChangefeedFastFailError(err))
Expand Down