Skip to content

Commit

Permalink
feat: restart failing controllers automatically with exp backoff
Browse files Browse the repository at this point in the history
On controller failure new reconcile event is scheduler and controller is
restarted with exponential backoff to prevent crash looping.

Errors and restarts are logged for easier debugging.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Jan 19, 2021
1 parent 98acf0d commit b64f477
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/AlekSi/pointer v1.1.0
github.com/cenkalti/backoff/v4 v4.1.0
github.com/hashicorp/go-memdb v1.3.0
github.com/hashicorp/go-multierror v1.1.0
github.com/stretchr/testify v1.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/AlekSi/pointer v1.1.0 h1:SSDMPcXD9jSl8FPy9cRzoRaMJtm9g9ggGTxecRUbQoI=
github.com/AlekSi/pointer v1.1.0/go.mod h1:y7BvfRI3wXPWKXEBhU71nbnIEEZX0QTSB2Bj48UJIZE=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down
45 changes: 43 additions & 2 deletions pkg/controller/runtime/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package runtime

import (
"context"
"errors"
"fmt"
"log"
"sort"
"time"

"github.com/AlekSi/pointer"
"github.com/cenkalti/backoff/v4"

"github.com/talos-systems/os-runtime/pkg/controller"
"github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency"
Expand All @@ -31,6 +34,8 @@ type adapter struct {
managedType resource.Type

dependencies []controller.Dependency

backoff *backoff.ExponentialBackOff
}

// EventCh implements controller.Runtime interface.
Expand Down Expand Up @@ -271,14 +276,50 @@ func (adapter *adapter) triggerReconcile() {
}
}

func (adapter *adapter) run(ctx context.Context) (err error) {
func (adapter *adapter) run(ctx context.Context) {
logger := log.New(adapter.runtime.logger.Writer(), fmt.Sprintf("%s %s: ", adapter.runtime.logger.Prefix(), adapter.name), adapter.runtime.logger.Flags())

for {
err := adapter.runOnce(ctx, logger)
if err == nil {
return
}

interval := adapter.backoff.NextBackOff()

logger.Printf("restarting controller in %s", interval)

select {
case <-ctx.Done():
return
case <-time.After(interval):
}

// schedule reconcile after restart
adapter.triggerReconcile()
}
}

func (adapter *adapter) runOnce(ctx context.Context, logger *log.Logger) (err error) {
defer func() {
if err != nil && errors.Is(err, context.Canceled) {
err = nil
}

if err != nil {
logger.Printf("controller failed: %s", err)
} else {
logger.Printf("controller finished")
}
}()

defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("controller %q panicked: %s", adapter.name, p)
}
}()

logger := log.New(adapter.runtime.logger.Writer(), fmt.Sprintf("%s %s: ", adapter.runtime.logger.Prefix(), adapter.name), adapter.runtime.logger.Flags())
logger.Printf("controller starting")

err = adapter.ctrl.Run(ctx, adapter, logger)

Expand Down
25 changes: 14 additions & 11 deletions pkg/controller/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync"

"github.com/AlekSi/pointer"
"github.com/hashicorp/go-multierror"
"github.com/cenkalti/backoff/v4"

"github.com/talos-systems/os-runtime/pkg/controller"
"github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency"
Expand Down Expand Up @@ -74,8 +74,13 @@ func (runtime *Runtime) RegisterController(ctrl controller.Controller) error {

ctrl: ctrl,
ch: make(chan controller.ReconcileEvent, 1),

backoff: backoff.NewExponentialBackOff(),
}

// disable number of retries limit
adapter.backoff.MaxElapsedTime = 0

if err := adapter.initialize(); err != nil {
return fmt.Errorf("error initializing controller %q adapter: %w", name, err)
}
Expand All @@ -95,29 +100,27 @@ func (runtime *Runtime) Run(ctx context.Context) error {

go runtime.processWatched()

errCh := make(chan error)
var wg sync.WaitGroup

runtime.controllersMu.RLock()

for _, adapter := range runtime.controllers {
adapter := adapter

wg.Add(1)

go func() {
errCh <- adapter.run(runtime.runCtx)
defer wg.Done()

adapter.run(runtime.runCtx)
}()
}

n := len(runtime.controllers)

runtime.controllersMu.RUnlock()

var multiErr *multierror.Error

for i := 0; i < n; i++ {
multiErr = multierror.Append(multiErr, <-errCh)
}
wg.Wait()

return multiErr.ErrorOrNil()
return nil
}

func (runtime *Runtime) watch(resourceNamespace resource.Namespace, resourceType resource.Type) error {
Expand Down
38 changes: 38 additions & 0 deletions pkg/controller/runtime/runtime_controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,41 @@ func (ctrl *SumController) Run(ctx context.Context, r controller.Runtime, logger
}
}
}

// FailingController fails on each iteration creating new resources each time.
type FailingController struct {
TargetNamespace resource.Namespace

count int
}

// Name implements controller.Controller interface.
func (ctrl *FailingController) Name() string {
return "FailingController"
}

// ManagedResources implements controller.Controller interface.
func (ctrl *FailingController) ManagedResources() (resource.Namespace, resource.Type) {
return ctrl.TargetNamespace, IntResourceType
}

// Run implements controller.Controller interface.
func (ctrl *FailingController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

if err := r.Update(ctx, NewIntResource(ctrl.TargetNamespace, strconv.Itoa(ctrl.count), 0), func(r resource.Resource) error {
r.(*IntResource).value = ctrl.count

return nil
}); err != nil {
return fmt.Errorf("error updating value")
}

ctrl.count++

return fmt.Errorf("failing here")
}
16 changes: 15 additions & 1 deletion pkg/controller/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (suite *RuntimeSuite) assertStrObjects(ns resource.Namespace, typ resource.
}
}

//nolint: dupl
//nolint: dupl, unparam
func (suite *RuntimeSuite) assertIntObjects(ns resource.Namespace, typ resource.Type, ids []string, values []int) retry.RetryableFunc {
return func() error {
items, err := suite.state.List(suite.ctx, resource.NewMetadata(ns, typ, "", resource.VersionUndefined))
Expand Down Expand Up @@ -259,6 +259,20 @@ func (suite *RuntimeSuite) TestSumControllers() {
Retry(suite.assertIntObjects("target", IntResourceType, []string{"sum"}, []int{2})))
}

func (suite *RuntimeSuite) TestFailingController() {
suite.Require().NoError(suite.runtime.RegisterController(&FailingController{
TargetNamespace: "target",
}))

suite.startRuntime()

suite.Assert().NoError(retry.Constant(5*time.Second, retry.WithUnits(10*time.Millisecond)).
Retry(suite.assertIntObjects("target", IntResourceType, []string{"0"}, []int{0})))

suite.Assert().NoError(retry.Constant(5*time.Second, retry.WithUnits(10*time.Millisecond)).
Retry(suite.assertIntObjects("target", IntResourceType, []string{"0", "1"}, []int{0, 1})))
}

func TestRuntime(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit b64f477

Please sign in to comment.