Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
scheduler: adds backoff to RecoveryDescriptor
Browse files Browse the repository at this point in the history
The RecoveryDescriptor calls delegate.Run continuously and forever with
no backoff or max. This commit adds netutil.ExponentialBackoff and a
timer that resets the retry count of delegate.Run runs for longer than
the configured reset retries duration. This is useful for delegates that
might crash once in a while due to transient network errors or temporary
3rd party api issues.
  • Loading branch information
kyleterry committed Jan 16, 2020
1 parent 410cd51 commit 61f9de4
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 3 deletions.
87 changes: 84 additions & 3 deletions scheduler/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,64 @@ package scheduler

import (
"context"
"errors"
"reflect"
"time"

"github.com/puppetlabs/horsehead/v2/netutil"
)

const (
defaultBackoffMultiplier = time.Millisecond * 5
defaultMaxRetries = 10
defaultResetRetriesTimerDuration = time.Second * 10
)

// ErrMaxRetriesReached is the error returned by RecoveryDescriptor if the max retries
// have been reached.
var ErrMaxRetriesReached = errors.New("RecoveryDescriptor: max retries have been reached")

// RecoveryDescriptorOptions contains fields that allow backoff and retry parameters
// to be set.
type RecoveryDescriptorOptions struct {
// BackoffMultiplier is the timing multiplier between attempts using netutil.Backoff.
BackoffMultiplier time.Duration
// MaxRetries is the max times the RecoveryDescriptor should attempt to run the delegate
// descriptor during a reset retries duration.
MaxRetries int
// ResetRetriesTimerDuration is the time it takes to reset the retry count when running
// a delegate descriptor.
ResetRetriesTimerDuration time.Duration
}

// RecoveryDescriptor wraps a given descriptor so that it restarts if the
// descriptor itself fails. This is useful for descriptors that work off of
// external information (APIs, events, etc.).
type RecoveryDescriptor struct {
delegate Descriptor
delegate Descriptor
backoff netutil.Backoff
maxRetries int
currentRetries int
resetDuration time.Duration
}

var _ Descriptor = &RecoveryDescriptor{}

// runOnce attempts to run the delegate descriptor. It starts a timer that waits for resetRetriesTimerDuration
// that will reset the retry attempt count to 0 if the delegate runs for the duration without an error. This is
// to prevent hours or days from going by, then an error happens again incrementing the count. If this happens 10
// times, the descriptor will seemingly shutdown after 1 retry, causing confusion.
func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (bool, error) {
timer := time.AfterFunc(rd.resetDuration, func() {
rd.currentRetries = 0
})

err := rd.delegate.Run(ctx, pc)

// If the timer is already triggered, then this will just return false. So it's
// fine to call blindly here.
timer.Stop()

select {
case <-ctx.Done():
return false, err
Expand All @@ -40,13 +83,51 @@ func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error
} else if !cont {
break
}

if rd.currentRetries == rd.maxRetries {
log(ctx).Error("max retries reached; stopping descriptor", "descriptor", reflect.TypeOf(rd.delegate).String())
return ErrMaxRetriesReached
}

rd.currentRetries++

if err := rd.backoff.Backoff(ctx, rd.currentRetries); err != nil {
return err
}
}

return nil
}

// NewRecoveryDescriptor creates a new recovering descriptor wrapping the given
// delegate descriptor.
// delegate descriptor. Default backoff and retry parameters will be used.
func NewRecoveryDescriptor(delegate Descriptor) *RecoveryDescriptor {
return &RecoveryDescriptor{delegate: delegate}
return NewRecoveryDescriptorWithOptions(delegate, RecoveryDescriptorOptions{})
}

// NewRecoveryDescriptorWithOptions creates a new recovering descriptor wrapping the
// given delegate descriptor. It takes RecoveryDescriptorOptions to tune backoff and retry
// parameters.
func NewRecoveryDescriptorWithOptions(delegate Descriptor, opts RecoveryDescriptorOptions) *RecoveryDescriptor {
if opts.BackoffMultiplier == 0 {
opts.BackoffMultiplier = defaultBackoffMultiplier
}

if opts.MaxRetries == 0 {
opts.MaxRetries = defaultMaxRetries
}

if opts.ResetRetriesTimerDuration == 0 {
opts.ResetRetriesTimerDuration = defaultResetRetriesTimerDuration
}

// TODO migrate to backoff's NextRun once implemented
backoff := &netutil.ExponentialBackoff{Multiplier: opts.BackoffMultiplier}

return &RecoveryDescriptor{
delegate: delegate,
backoff: backoff,
maxRetries: opts.MaxRetries,
resetDuration: opts.ResetRetriesTimerDuration,
}
}
88 changes: 88 additions & 0 deletions scheduler/recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package scheduler

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type mockErrorDescriptor struct {
errCount int
successAfterCount int
}

func (d *mockErrorDescriptor) Run(ctx context.Context, pc chan<- Process) error {
if d.errCount >= 0 && d.successAfterCount != 0 {
err := fmt.Errorf("err count %d", d.errCount)
d.errCount--
d.successAfterCount--

return err
}

return nil
}

func TestRecoverySchedulerStops(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

descriptor := NewRecoveryDescriptor(&mockErrorDescriptor{
errCount: 10,
// make sure we never succeed
successAfterCount: 15,
})

pc := make(chan Process)

defer cancel()

require.Error(t, descriptor.Run(ctx, pc))
require.Equal(t, 10, descriptor.currentRetries)
}

type mockRetryResetDescriptor struct {
count int
successDuration time.Duration
cancel context.CancelFunc
}

func (d *mockRetryResetDescriptor) Run(ctx context.Context, pc chan<- Process) error {
if d.count == 0 {
<-time.After(d.successDuration)
d.cancel()
return nil
}

err := fmt.Errorf("err count %d", d.count)
d.count--

return err
}

func TestRecoverySchedulerRetryCountReset(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

successDuration := time.Second * 1

mock := &mockRetryResetDescriptor{
count: 5,
cancel: cancel,
successDuration: successDuration,
}

descriptor := NewRecoveryDescriptorWithOptions(mock, RecoveryDescriptorOptions{
ResetRetriesTimerDuration: successDuration - (time.Millisecond * 500),
})

pc := make(chan Process)

defer cancel()

require.NoError(t, descriptor.Run(ctx, pc))

require.Equal(t, 0, descriptor.currentRetries)
require.Equal(t, 0, mock.count)
}

0 comments on commit 61f9de4

Please sign in to comment.