Skip to content

Commit

Permalink
stop the backoff once the defaultBackoffMaxElapsedTime has elapsed
Browse files Browse the repository at this point in the history
  • Loading branch information
charleszheng44 committed Mar 29, 2023
1 parent c0ad3af commit 92faa59
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 26 deletions.
24 changes: 19 additions & 5 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

Expand Down Expand Up @@ -73,8 +74,8 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
// MaxElapsedTime=0 means the backoff never stops
f.errBackoff.MaxElapsedTime = 0
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)
Expand Down Expand Up @@ -502,12 +503,25 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
m.patchState(model.StateError)
} else {
oldBackoffInterval := m.backoffInterval
// NextBackOff will never return -1 because the backoff never stops
// with `MaxElapsedTime=0`
// ref: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L121-L123

m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// NextBackOff() will return -1 once the MaxElapsedTime has elapsed.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}

log.Info("changefeed restart backoff interval is changed",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
Expand Down
44 changes: 24 additions & 20 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,33 +565,37 @@ func TestBackoffStopsUnexpectedly(t *testing.T) {
tester.MustApplyPatches()

for i := 1; i <= 10; i++ {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
if i >= 8 {
require.True(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateNormal)
// after round 8, the maxElapsedTime of backoff will exceed 4000ms,
// and NextBackOff() will return -1, so the changefeed state will
// never turn into error state.
require.Equal(t, state.Info.State, model.StateFailed)
require.False(t, manager.ShouldRunning())
} else {
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (
*model.TaskPosition, bool, error,
) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
// If an error occurs, backing off from running the task.
require.False(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateError)
require.Equal(t, state.Info.AdminJobType, model.AdminStop)
require.Equal(t, state.Status.AdminJobType, model.AdminStop)
}
// 500ms is the backoff interval, so sleep 500ms and after a manager tick,
// the changefeed will turn into normal state

// 500ms is the backoff interval, so sleep 500ms and after a manager
// tick, the changefeed will turn into normal state
time.Sleep(500 * time.Millisecond)
manager.Tick(state)
tester.MustApplyPatches()
Expand Down
2 changes: 1 addition & 1 deletion pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (m *gcManager) IgnoreFailedChangeFeed(
)
return false
}
// ignore the change feed if its current checkpoint TS is earlier
// ignore the changefeed if its current checkpoint TS is earlier
// than the (currentPDTso - failedFeedDataRetentionTime).
gcSafepointUpperBound := checkpointTs - 1
return pdTime.Sub(
Expand Down

0 comments on commit 92faa59

Please sign in to comment.