Skip to content

Commit

Permalink
stop the backoff once the defaultBackoffMaxElapsedTime has elapsed
Browse files Browse the repository at this point in the history
  • Loading branch information
charleszheng44 committed Mar 22, 2023
1 parent 43deb89 commit 34add91
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

Expand Down Expand Up @@ -73,8 +74,8 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor
// MaxElapsedTime=0 means the backoff never stops
f.errBackoff.MaxElapsedTime = 0
// backoff will stop once the defaultBackoffMaxElapsedTime has elapsed.
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)
Expand Down Expand Up @@ -502,12 +503,25 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
m.patchState(model.StateError)
} else {
oldBackoffInterval := m.backoffInterval
// NextBackOff will never return -1 because the backoff never stops
// with `MaxElapsedTime=0`
// ref: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L121-L123

m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// NextBackOff() will return -1 once the MaxElapsedTime has elapsed.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
return
}

log.Info("changefeed restart backoff interval is changed",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
Expand Down

0 comments on commit 34add91

Please sign in to comment.