Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Update: Scheduler: adds backoff and max retries to RecoveryDescriptor
Browse files Browse the repository at this point in the history
The RecoveryDescriptor previously had no delay between retries and would do this infinitely. This
can cause resource usage to spike tremendously so a configurable backoff delay was added as well as
an optional max retries.
  • Loading branch information
kyleterry committed Jan 17, 2020
1 parent 61f9de4 commit 9f7d392
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 52 deletions.
55 changes: 55 additions & 0 deletions scheduler/errors/build_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,58 @@ func NewProcessPanicErrorBuilder() *ProcessPanicErrorBuilder {
func NewProcessPanicError() Error {
return NewProcessPanicErrorBuilder().Build()
}

// RecoveryDescriptorSection defines a section of errors with the following scope:
// Recovery descriptor errors
var RecoveryDescriptorSection = &impl.ErrorSection{
Key: "recovery_descriptor",
Title: "Recovery descriptor errors",
}

// RecoveryDescriptorMaxRetriesReachedCode is the code for an instance of "max_retries_reached".
const RecoveryDescriptorMaxRetriesReachedCode = "hsch_recovery_descriptor_max_retries_reached"

// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached".
func IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool {
return err != nil && err.Is(RecoveryDescriptorMaxRetriesReachedCode)
}

// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached".
func (External) IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool {
return IsRecoveryDescriptorMaxRetriesReached(err)
}

// RecoveryDescriptorMaxRetriesReachedBuilder is a builder for "max_retries_reached" errors.
type RecoveryDescriptorMaxRetriesReachedBuilder struct {
arguments impl.ErrorArguments
}

// Build creates the error for the code "max_retries_reached" from this builder.
func (b *RecoveryDescriptorMaxRetriesReachedBuilder) Build() Error {
description := &impl.ErrorDescription{
Friendly: "The max retries ({{max_retries}} have been reached.",
Technical: "The max retries ({{max_retries}} have been reached.",
}

return &impl.Error{
ErrorArguments: b.arguments,
ErrorCode: "max_retries_reached",
ErrorDescription: description,
ErrorDomain: Domain,
ErrorMetadata: &impl.ErrorMetadata{},
ErrorSection: RecoveryDescriptorSection,
ErrorSensitivity: errawr.ErrorSensitivityNone,
ErrorTitle: "Max retries reached",
Version: 1,
}
}

// NewRecoveryDescriptorMaxRetriesReachedBuilder creates a new error builder for the code "max_retries_reached".
func NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries int64) *RecoveryDescriptorMaxRetriesReachedBuilder {
return &RecoveryDescriptorMaxRetriesReachedBuilder{arguments: impl.ErrorArguments{"max_retries": impl.NewErrorArgument(maxRetries, "the configured max retries")}}
}

// NewRecoveryDescriptorMaxRetriesReached creates a new error with the code "max_retries_reached".
func NewRecoveryDescriptorMaxRetriesReached(maxRetries int64) Error {
return NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries).Build()
}
15 changes: 15 additions & 0 deletions scheduler/errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ sections:
type:
description: the type of this descriptor

#
# Recovery descriptor
#
recovery_descriptor:
title: Recovery descriptor errors
errors:
max_retries_reached:
title: Max retries reached
description: >
The max retries ({{max_retries}} have been reached.
arguments:
max_retries:
type: integer
description: the configured max retries

#
# Process errors
#
Expand Down
81 changes: 37 additions & 44 deletions scheduler/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,53 @@ package scheduler

import (
"context"
"errors"
"reflect"
"time"

"github.com/puppetlabs/horsehead/v2/netutil"
"github.com/puppetlabs/horsehead/v2/scheduler/errors"
)

const (
defaultBackoffMultiplier = time.Millisecond * 5
defaultMaxRetries = 10
defaultResetRetriesTimerDuration = time.Second * 10
defaultBackoffMultiplier = time.Millisecond * 5
defaultResetRetriesAfter = time.Second * 10
)

// ErrMaxRetriesReached is the error returned by RecoveryDescriptor if the max retries
// have been reached.
var ErrMaxRetriesReached = errors.New("RecoveryDescriptor: max retries have been reached")

// RecoveryDescriptorOptions contains fields that allow backoff and retry parameters
// to be set.
type RecoveryDescriptorOptions struct {
// BackoffMultiplier is the timing multiplier between attempts using netutil.Backoff.
//
// Default: 5ms
BackoffMultiplier time.Duration
// MaxRetries is the max times the RecoveryDescriptor should attempt to run the delegate
// descriptor during a reset retries duration.
// descriptor during a reset retries duration. If this option is <= 0 then it means
// retry inifinitly; however, the backoff multiplier still applies.
//
// Default: 0
MaxRetries int
// ResetRetriesTimerDuration is the time it takes to reset the retry count when running
// ResetRetriesAfter is the time it takes to reset the retry count when running
// a delegate descriptor.
ResetRetriesTimerDuration time.Duration
//
// Default: 10s
ResetRetriesAfter time.Duration
}

// RecoveryDescriptor wraps a given descriptor so that it restarts if the
// descriptor itself fails. This is useful for descriptors that work off of
// external information (APIs, events, etc.).
type RecoveryDescriptor struct {
delegate Descriptor
backoff netutil.Backoff
maxRetries int
currentRetries int
resetDuration time.Duration
delegate Descriptor
backoff netutil.Backoff
maxRetries int
resetAfter time.Duration
}

var _ Descriptor = &RecoveryDescriptor{}

// runOnce attempts to run the delegate descriptor. It starts a timer that waits for resetRetriesTimerDuration
// that will reset the retry attempt count to 0 if the delegate runs for the duration without an error. This is
// to prevent hours or days from going by, then an error happens again incrementing the count. If this happens 10
// times, the descriptor will seemingly shutdown after 1 retry, causing confusion.
func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (bool, error) {
timer := time.AfterFunc(rd.resetDuration, func() {
rd.currentRetries = 0
})

err := rd.delegate.Run(ctx, pc)

// If the timer is already triggered, then this will just return false. So it's
// fine to call blindly here.
timer.Stop()

select {
case <-ctx.Done():
return false, err
Expand All @@ -74,24 +63,32 @@ func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (b
}

// Run delegates work to another descriptor, catching any errors are restarting
// the descriptor immediately if an error occurs. It never returns an error. It
// only terminates when the context is done.
// the descriptor immediately if an error occurs. It might return a max retries error.
// It only terminates when the context is done or the max retries have been exceeded.
func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error {
var retries int

for {
start := time.Now()

if cont, err := rd.runOnce(ctx, pc); err != nil {
return err
} else if !cont {
break
}

if rd.currentRetries == rd.maxRetries {
if time.Now().Sub(start) >= rd.resetAfter {
retries = 0
}

if rd.maxRetries > 0 && retries == rd.maxRetries {
log(ctx).Error("max retries reached; stopping descriptor", "descriptor", reflect.TypeOf(rd.delegate).String())
return ErrMaxRetriesReached
return errors.NewRecoveryDescriptorMaxRetriesReached(int64(rd.maxRetries))
}

rd.currentRetries++
retries++

if err := rd.backoff.Backoff(ctx, rd.currentRetries); err != nil {
if err := rd.backoff.Backoff(ctx, retries); err != nil {
return err
}
}
Expand All @@ -113,21 +110,17 @@ func NewRecoveryDescriptorWithOptions(delegate Descriptor, opts RecoveryDescript
opts.BackoffMultiplier = defaultBackoffMultiplier
}

if opts.MaxRetries == 0 {
opts.MaxRetries = defaultMaxRetries
}

if opts.ResetRetriesTimerDuration == 0 {
opts.ResetRetriesTimerDuration = defaultResetRetriesTimerDuration
if opts.ResetRetriesAfter == 0 {
opts.ResetRetriesAfter = defaultResetRetriesAfter
}

// TODO migrate to backoff's NextRun once implemented
backoff := &netutil.ExponentialBackoff{Multiplier: opts.BackoffMultiplier}

return &RecoveryDescriptor{
delegate: delegate,
backoff: backoff,
maxRetries: opts.MaxRetries,
resetDuration: opts.ResetRetriesTimerDuration,
delegate: delegate,
backoff: backoff,
maxRetries: opts.MaxRetries,
resetAfter: opts.ResetRetriesAfter,
}
}
20 changes: 12 additions & 8 deletions scheduler/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@ func (d *mockErrorDescriptor) Run(ctx context.Context, pc chan<- Process) error
func TestRecoverySchedulerStops(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

descriptor := NewRecoveryDescriptor(&mockErrorDescriptor{
mock := &mockErrorDescriptor{
errCount: 10,
// make sure we never succeed
successAfterCount: 15,
})
}

opts := RecoveryDescriptorOptions{
MaxRetries: 10,
}
descriptor := NewRecoveryDescriptorWithOptions(mock, opts)

pc := make(chan Process)

defer cancel()

require.Error(t, descriptor.Run(ctx, pc))
require.Equal(t, 10, descriptor.currentRetries)
}

type mockRetryResetDescriptor struct {
Expand Down Expand Up @@ -73,16 +77,16 @@ func TestRecoverySchedulerRetryCountReset(t *testing.T) {
successDuration: successDuration,
}

descriptor := NewRecoveryDescriptorWithOptions(mock, RecoveryDescriptorOptions{
ResetRetriesTimerDuration: successDuration - (time.Millisecond * 500),
})
opts := RecoveryDescriptorOptions{
MaxRetries: 10,
ResetRetriesAfter: successDuration - (time.Millisecond * 500),
}
descriptor := NewRecoveryDescriptorWithOptions(mock, opts)

pc := make(chan Process)

defer cancel()

require.NoError(t, descriptor.Run(ctx, pc))

require.Equal(t, 0, descriptor.currentRetries)
require.Equal(t, 0, mock.count)
}

0 comments on commit 9f7d392

Please sign in to comment.