Skip to content

Commit

Permalink
Merge pull request #1 from mfojtik/plugin-error-handler
Browse files Browse the repository at this point in the history
controller: add pluggable error and panic handlers
  • Loading branch information
mfojtik committed Aug 7, 2023
2 parents 05c834f + b148061 commit eeb6c19
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 82 deletions.
74 changes: 24 additions & 50 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
//operatorv1 "github.com/openshift/api/operator/v1"

"github.com/robfig/cron"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand All @@ -32,14 +31,16 @@ type baseController struct {
sync func(ctx context.Context, controllerContext framework.Context) error
syncContext framework.Context

//syncDegradedClient operatorv1helpers.OperatorClient
resyncEvery time.Duration
resyncSchedules []cron.Schedule

postStartHooks []framework.PostStartHook

informerSynced []cache.InformerSynced
informerSyncedTimeout time.Duration

syncPanicHandler framework.ControllerSyncPanicFn
syncErrorHandler framework.ControllerSyncErrorFn
}

func New(
Expand Down Expand Up @@ -94,7 +95,14 @@ func waitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheS

func (c *baseController) Run(ctx context.Context, workers int) {
// HandleCrash recovers panics
defer utilruntime.HandleCrash(c.degradedPanicHandler)
defer utilruntime.HandleCrash(func(in interface{}) {
if c.syncPanicHandler == nil {
panic(in)
}
if err := c.syncPanicHandler(in); err != nil {
klog.Warningf("PANIC: Detected panic() in controller %q failed to run panic handler: %v\n\n%s", c.Name(), err, in)
}
})

// give caches 10 minutes to sync
cacheSyncCtx, cacheSyncCancel := context.WithTimeout(ctx, c.informerSyncedTimeout)
Expand Down Expand Up @@ -200,7 +208,14 @@ func (c *baseController) runWorker(queueCtx context.Context) {
wait.UntilWithContext(
queueCtx,
func(queueCtx context.Context) {
defer utilruntime.HandleCrash(c.degradedPanicHandler)
defer utilruntime.HandleCrash(func(in interface{}) {
if c.syncPanicHandler == nil {
panic(in)
}
if err := c.syncPanicHandler(in); err != nil {
klog.Warningf("PANIC: Detected panic() in controller %q failed to run panic handler: %v\n\n%s", c.Name(), err, in)
}
})
for {
select {
case <-queueCtx.Done():
Expand All @@ -213,60 +228,19 @@ func (c *baseController) runWorker(queueCtx context.Context) {
1*time.Second)
}

func (c *baseController) reportDegraded(ctx context.Context, err error) error {
return err
}

// reconcile wraps the sync() call and if operator client is set, it handle the degraded condition if sync() returns an error.
func (c *baseController) reconcile(ctx context.Context, syncCtx framework.Context) error {
err := c.sync(ctx, syncCtx)
degradedErr := c.reportDegraded(ctx, err)
if apierrors.IsNotFound(degradedErr) { // && management.IsOperatorRemovable() {
// The operator tolerates missing CR, therefore don't report it up.
if errors.Is(err, SyntheticRequeueError) {
return err
}
return degradedErr
}

// degradedPanicHandler will go degraded on failures, then we should catch potential panics and covert them into bad status.
func (c *baseController) degradedPanicHandler(panicVal interface{}) {
return
/*
if c.syncDegradedClient == nil {
// if we don't have a client for reporting degraded condition, then let the existing panic handler do the work
return
}
_ = c.reportDegraded(context.TODO(), fmt.Errorf("panic caught:\n%v", panicVal))
*/
}

// reportDegraded updates status with an indication of degraded-ness
/*
func (c *baseController) reportDegraded(ctx context.Context, reportedError error) error {
if c.syncDegradedClient == nil {
return reportedError
}
if reportedError != nil {
_, _, updateErr := v1helpers.UpdateStatus(ctx, c.syncDegradedClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: c.name + "Degraded",
Status: operatorv1.ConditionTrue,
Reason: "SyncError",
Message: reportedError.Error(),
}))
if updateErr != nil {
klog.Warningf("Updating status of %q failed: %v", c.Name(), updateErr)
if c.syncErrorHandler != nil {
if handlerErr := c.syncErrorHandler(err); handlerErr != nil {
panic(handlerErr)
}
return reportedError
}
_, _, updateErr := v1helpers.UpdateStatus(ctx, c.syncDegradedClient,
v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: c.name + "Degraded",
Status: operatorv1.ConditionFalse,
Reason: "AsExpected",
}))
return updateErr
return err
}
*/

func (c *baseController) processNextWorkItem(queueCtx context.Context) {
key, quit := c.syncContext.Queue().Get()
Expand Down
72 changes: 40 additions & 32 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"errors"
"fmt"
context2 "github.com/mfojtik/controller-framework/pkg/context"

Expand Down Expand Up @@ -90,16 +91,8 @@ func TestBaseController_ReturnOnGracefulShutdownWhileWaitingForCachesToSync(t *t
}

func TestBaseController_Reconcile(t *testing.T) {
/*
operatorClient := v1helpers.NewFakeOperatorClient(
&operatorv1.OperatorSpec{},
&operatorv1.OperatorStatus{},
nil,
)
*/
c := &baseController{
name: "TestController",
//syncDegradedClient: operatorClient,
}

c.sync = func(ctx context.Context, controllerContext framework.Context) error {
Expand All @@ -108,37 +101,52 @@ func TestBaseController_Reconcile(t *testing.T) {
if err := c.reconcile(context.TODO(), context2.New("TestController", eventstesting.NewTestingEventRecorder(t))); err != nil {
t.Fatal(err)
}
/*
_, status, _, err := operatorClient.GetOperatorState()
if err != nil {
t.Fatal(err)
}
if !v1helpers.IsOperatorConditionPresentAndEqual(status.Conditions, "TestControllerDegraded", "False") {
t.Fatalf("expected TestControllerDegraded to be False, got %#v", status.Conditions)
}
*/
c.sync = func(ctx context.Context, controllerContext framework.Context) error {
return fmt.Errorf("error")
}
if err := c.reconcile(context.TODO(), context2.New("TestController", eventstesting.NewTestingEventRecorder(t))); err == nil {
t.Fatal("expected error, got none")
}
/*
_, status, _, err = operatorClient.GetOperatorState()
if err != nil {
t.Fatal(err)
}
if !v1helpers.IsOperatorConditionPresentAndEqual(status.Conditions, "TestControllerDegraded", "True") {
t.Fatalf("expected TestControllerDegraded to be False, got %#v", status.Conditions)
}
condition := v1helpers.FindOperatorCondition(status.Conditions, "TestControllerDegraded")
if condition.Reason != "SyncError" {
t.Errorf("expected condition reason 'SyncError', got %q", condition.Reason)
}
if condition.Message != "error" {
t.Errorf("expected condition message 'error', got %q", condition.Message)
}

func TestBaseController_ReconcileErrorAndPanicHandlers(t *testing.T) {
errsHandled := []error{}
c := &baseController{
name: "TestController",
syncErrorHandler: func(err error) error {
errsHandled = append(errsHandled, err)
return nil
},
}

syncErr := errors.New("sync error")

c.sync = func(ctx context.Context, controllerContext framework.Context) error {
return syncErr
}
if err := c.reconcile(context.TODO(), context2.New("TestController", eventstesting.NewTestingEventRecorder(t))); !errors.Is(err, syncErr) {
t.Fatalf("expected sync() to return original error, got %v", err)
}
c.reconcile(context.TODO(), context2.New("TestController", eventstesting.NewTestingEventRecorder(t)))
if len(errsHandled) != 2 {
t.Fatalf("expected 2 errors, got %d (%#v)", len(errsHandled), errsHandled)
}

c.syncErrorHandler = func(err error) error {
return err
}
panicCount := 0
func() {
defer func() {
if r := recover(); r != nil {
panicCount++
}
*/
}()
c.reconcile(context.TODO(), context2.New("TestController", eventstesting.NewTestingEventRecorder(t)))
}()
if panicCount == 0 {
t.Fatalf("expected to panic when sync error handler error out")
}
}

func TestBaseController_Run(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Factory struct {

postStartHooks []framework.PostStartHook
interestingNamespaces sets.Set[string]

controllerPanicHandler framework.ControllerSyncPanicFn
controllerErrorHandler framework.ControllerSyncErrorFn
}

type namespaceInformer struct {
Expand Down Expand Up @@ -240,6 +243,25 @@ func (f *Factory) WithSyncContext(ctx framework.Context) *Factory {
return f
}

// WithSyncErrorHandler allows in case the sync() function return error to additionally handle the error.
// This allows to build error handling mechanisms that for example report operator status or provide error count metrics for controller.
// NOTE: The original error is always returned from sync() call (so it is retried as usual).
// NOTE2: If the error is SyntheticRequeueError this error is not being handled and the sync() is simply retried.
// NOTE3: If an error is returned from the handler, this error causes panic()
func (f *Factory) WithSyncErrorHandler(fn func(err error) error) *Factory {
f.controllerErrorHandler = fn
return f
}

// WithSyncPanicHandler allows to register a panic() handler for Run() method of controller. This handler will recover from
// any panic inside the sync() and pass the panic into given function. This allows to update operator status OR it allows specific
// controller metrics to be created.
// If the panic handler is not specified, the util.HandleCrash() is called as usual.
func (f *Factory) WithSyncPanicHandler(fn func(panicInterface interface{}) error) *Factory {
f.controllerPanicHandler = fn
return f
}

// WithSyncDegradedOnError encapsulate the controller sync() function, so when this function return an error, the operator client
// is used to set the degraded condition to (eg. "ControllerFooDegraded"). The degraded condition name is set based on the controller name.
/*
Expand Down
3 changes: 3 additions & 0 deletions pkg/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Context interface {
Recorder() events.Recorder
}

type ControllerSyncPanicFn func(interface{}) error
type ControllerSyncErrorFn func(error) error

// ControllerSyncFn is a function that contain main controller logic.
// The syncContext.syncContext passed is the main controller syncContext, when cancelled it means the controller is being shut down.
// The syncContext provides access to controller name, queue and event recorder.
Expand Down

0 comments on commit eeb6c19

Please sign in to comment.